Skip to content

数据库操作

dimine_python_sdk.lib.dbdimine_python_sdk.lib.prospecting 模块提供对 Dimine 本地数据库(.dmf / .dmd)的原生访问能力,无需启动 Dimine GUI 即可读取和处理数据。

前置条件: - 环境变量 DIMINE_HOME 指向包含 DmPyBindInterface.pyd 的目录 - Python >= 3.12


模块定位

SDK 的数据处理层目前分为三个概念清晰的模块:

模块 用途 典型类
lib.db 数据库/数据表业务封装 DmDbDatabaseDmDbLayerDmDataTableTableRow
models 对外 Pydantic 纯数据模型 PointLineShell

本文档覆盖 lib.db 中的通用数据库/数据表操作,以及 lib.prospecting 中的钻孔数据库专用操作。


一、通用数据库操作

1.1 几何数据库(DmDbDatabase)

DmDbDatabase 用于加载和操作本地 .dmf 几何数据库文件。

直接加载

from dimine_python_sdk.lib.db import DmDbDatabase

db = DmDbDatabase("E:/data/mine.dmf")
print(f"图层数: {db.get_layers_count()}")

使用上下文管理器(推荐)

from dimine_python_sdk.lib.db import data_database_conn

with data_database_conn("E:/data/mine.dmf") as db:
    print(f"图层数: {db.get_layers_count()}")

访问图层和实体

from dimine_python_sdk.lib.db import data_database_conn

with data_database_conn("E:/data/mine.dmf") as db:
    # 遍历所有图层
    for i in range(db.get_layers_count()):
        layer = db.get_layer_by_index(i)
        if layer is None:
            continue

        print(f"图层 {i} 实体数量: {layer.get_entities_count()}")

        # 遍历图层内实体
        layer.start_query_entity()
        entity = layer.open_next_entity()
        while entity is not None:
            print(f"  实体类型: {entity.get_entity_type_name()}")
            entity = layer.open_next_entity()

常用操作

from dimine_python_sdk.lib.db import DmDbDatabase

db = DmDbDatabase("E:/data/mine.dmf")

# 插入新图层
new_layer = db.insert_layer("新图层")

# 在当前图层插入几何实体
new_layer.insert_line(start_point, end_point)
new_layer.insert_polyline(points)

# 保存(覆盖原文件或另存为)
db.save()
db.save("E:/data/mine_backup.dmf")

# 创建空白本地数据库
empty_db = DmDbDatabase.create_local_db()

1.2 数据表(DmDataTable / TableRow)

DmDataTablelib.db 提供的通用数据表类,支持类似 pandas 的多态索引和遍历。钻孔数据库的三大表(孔口表、测斜表、岩性表)返回的也是 DmDataTable 实例。

加载数据表

from dimine_python_sdk.lib.db import DmDataTable, data_table_conn

# 方式一:直接加载
dt = DmDataTable("E:/data/table.dmd")

# 方式二:上下文管理器(退出时自动保存)
with data_table_conn("E:/data/table.dmd") as dt:
    print(f"字段数: {dt.get_field_count()}")
    print(f"记录数: {len(dt)}")

获取字段信息

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    # 所有字段名
    print(f"字段: {dt.columns}")

    # 记录数
    print(f"记录数: {len(dt)}")

按列访问

通过字段名获取整列数据:

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    # 获取整列
    names = dt["Name"]
    print(f"记录数量: {len(names)}")
    print(f"前5条: {names[:5]}")

按行访问

通过索引获取单行数据,返回 TableRow 对象:

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    # 获取第一条记录
    row = dt[0]

    # 双访问模式
    print(row["Name"])     # dict 风格
    print(row.Name)        # 属性风格
    print(row.to_dict())   # 转为原生 dict
    print(f"行索引: {row.index}")

TableRow 是轻量级字典代理,为数据表每一行提供双访问模式:

from dimine_python_sdk.lib.db import TableRow

row = TableRow({"HoleID": "ZK001", "Depth": 500.0, "Au": 3.5}, index=0)

# dict 风格
row["HoleID"]        # "ZK001"

# 属性风格
row.Depth            # 500.0
row.Au               # 3.5

# 工具方法
row.to_dict()        # {"HoleID": "ZK001", "Depth": 500.0, "Au": 3.5}
row.keys()
row.values()
row.items()
row.get("Missing", "default")
len(row)             # 3

切片访问

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    # 获取前10条记录
    first_10 = dt[0:10]
    for row in first_10:
        print(f"{row.Name}: {row.Value}")

遍历数据表

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    for row in dt:
        print(f"索引 {row.index}: {row.to_dict()}")

转换为 pandas / numpy

数据表可直接转换为 pandas DataFrame 或 numpy 数组,便于数据分析。

转为 pandas DataFrame

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    df = dt.to_pandas()
    print(df.head())
    print(df.describe())

注意:使用 to_pandas() 前需安装 pandas:

uv add pandas

转为 numpy 数组

from dimine_python_sdk.lib.db import data_table_conn

with data_table_conn("E:/data/table.dmd") as dt:
    # 整表转为二维 numpy 数组
    arr = dt.to_numpy()
    print(arr.shape)

    # 单列转为一维数组
    values = dt.to_numpy(column="Value")
    print(f"平均值: {values.mean()}")

二、钻孔数据库操作

dimine_python_sdk.lib.prospecting.drill_db 提供对 Dimine 钻孔数据库(.dmf)的专用访问能力。钻孔数据库包含三大核心表:孔口表、测斜表、岩性表,均返回 DmDataTable 实例,支持第一节介绍的所有数据表操作。

2.1 加载钻孔数据库

直接加载

from dimine_python_sdk.lib.prospecting.drill_db import DrillDBManager

# 加载钻孔数据库
db = DrillDBManager("C:/Projects/Mine/钻孔数据.dmf")

# 支持相对路径
db = DrillDBManager("钻孔数据.dmf", base_path="C:/Projects/Mine")

使用上下文管理器(推荐)

使用 drill_conn 可确保资源自动释放,避免内存泄漏:

from dimine_python_sdk.lib.prospecting.drill_db import drill_conn

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    print(db.info)
    # 离开 with 块后自动释放资源

2.2 访问钻孔数据表

钻孔数据库包含三大核心表,通过属性即可访问:

属性 说明
db.collar 孔口表(钻孔基本信息:孔号、坐标、深度等)
db.survey 测斜表(钻孔轨迹测斜数据)
db.lithology 岩性表(钻孔岩性分层信息)
from dimine_python_sdk.lib.prospecting.drill_db import drill_conn

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    # 访问三大表
    collar = db.collar       # 孔口表
    survey = db.survey       # 测斜表
    lithology = db.lithology # 岩性表

    # 也可以使用 dict 风格访问
    collar = db["collar"]
    survey = db["survey"]
    lithology = db["lithology"]

    print(f"孔口表记录数: {len(collar)}")
    print(f"测斜表记录数: {len(survey)}")
    print(f"岩性表记录数: {len(lithology)}")

三大表返回的对象均为 DmDataTable,支持第一节介绍的所有数据表操作(按列/行/切片访问、遍历、to_pandas()to_numpy() 等)。

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    # 获取所有钻孔孔号
    hole_ids = db.collar["HoleID"]
    print(f"钻孔数量: {len(hole_ids)}")

    # 查看第一条孔口记录
    row = db.collar[0]
    print(row.HoleID, row.Depth)

    # 切片获取前5条
    for row in db.collar[0:5]:
        print(row.to_dict())

    # 转为 pandas DataFrame
    df = db.collar.to_pandas()
    print(df.head())

2.3 获取数据库统计信息

from dimine_python_sdk.lib.prospecting.drill_db import drill_conn

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    # 钻孔数量
    print(f"钻孔数量: {db.drill_count}")

    # 钻孔总长度
    print(f"总长度: {db.total_length:.2f} m")

    # 完整信息
    print(db.info)
    # 输出示例:
    # {'drill_count': 50, 'total_length': 12500.5, 'file_path': 'C:/Projects/Mine/钻孔数据.dmf'}

2.4 钻孔数据处理

钻孔数据库模块提供三种常用数据处理方法,均通过 Pydantic 参数模型传入。

样长组合

将不等长样品按指定长度重新组合:

from dimine_python_sdk.lib.prospecting.drill_db import drill_conn
from dimine_python_sdk.lib.prospecting.models import SampleLengthCombineParam

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    params = SampleLengthCombineParam(
        input_file="C:/Projects/Mine/样品数据.dmf",
        combine_length=1.0,      # 组合长度 1m
        combine_percent=1.0,     # 组合百分比 100%
        output_file="C:/Projects/Mine/组合后样品.dmf",
    )

    result = db.process_samples(params)
    print(f"处理结果: {result}")

参数说明

参数 类型 说明
input_file str 输入样品数据文件路径
combine_length float 组合长度,必须 > 0
combine_percent float 组合百分比,0-1 之间,默认 1.0
output_file str 输出文件路径

台阶组合

按台阶高程对样品数据进行组合:

from dimine_python_sdk.lib.prospecting.drill_db import drill_conn
from dimine_python_sdk.lib.prospecting.models import StepCombineParam

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    params = StepCombineParam(
        input_file="C:/Projects/Mine/样品数据.dmf",
        step_height=10.0,     # 台阶高度 10m
        start_height=0.0,     # 起始高程
        end_height=100.0,     # 结束高程
        calculate_model=0,    # 计算方式
        low_dip=5.0,          # 忽略最小倾角 5°
        output_file="C:/Projects/Mine/台阶组合后.dmf",
    )

    result = db.process_steps(params)
    print(f"处理结果: {result}")

特高品位处理

对超过阈值的特高品位样品进行处理:

from dimine_python_sdk.lib.prospecting.drill_db import drill_conn
from dimine_python_sdk.lib.prospecting.models import HighGradeProcessParam

with drill_conn("C:/Projects/Mine/钻孔数据.dmf") as db:
    params = HighGradeProcessParam(
        input_file="C:/Projects/Mine/样品数据.dmf",
        grade_field="Cu",           # 品位字段
        process_mode=0,             # 0:国内标准
        average_multiple=5.0,       # 平均倍数
        frequency=0.1,              # 频率阈值
        replace_method=1,           # 1:给定值替换
        assign_value=2.5,           # 替换给定值
        result_field="Cu_processed", # 结果字段名
    )

    result = db.process_high_grade(params)
    print(f"处理结果: {result}")

replace_method 枚举

说明
0 剔除法
1 给定值
2 相邻样品平均值
3 矿体平均品位法
4 单一工程法
5 截止品位法

2.5 异常处理

from dimine_python_sdk.lib.prospecting.drill_db import (
    drill_conn,
    DrillDBError,
    DrillDBLoadError,
    DrillDBProcessError,
)

try:
    with drill_conn("不存在的文件.dmf") as db:
        print(db.info)
except DrillDBLoadError as e:
    print(f"加载失败: {e}")

# 处理异常
try:
    from dimine_python_sdk.lib.prospecting.models import SampleLengthCombineParam

    params = SampleLengthCombineParam(
        input_file="无效路径.dmf",
        combine_length=1.0,
        output_file="输出.dmf",
    )
    result = db.process_samples(params)
except DrillDBProcessError as e:
    print(f"处理失败: {e}")
    print(f"原始响应: {e.response}")
except DrillDBError as e:
    print(f"数据库错误: {e}")

三、完整示例

from dimine_python_sdk.lib.prospecting.drill_db import drill_conn
from dimine_python_sdk.lib.prospecting.models import SampleLengthCombineParam


def main():
    file_path = "C:/Projects/Mine/钻孔数据.dmf"

    with drill_conn(file_path) as db:
        # 1. 打印数据库概况
        print("=" * 40)
        print("钻孔数据库概况")
        print("=" * 40)
        print(f"文件路径: {db.info['file_path']}")
        print(f"钻孔数量: {db.drill_count}")
        print(f"总长度: {db.total_length:.2f} m")

        # 2. 查看孔口表结构
        print("\n" + "=" * 40)
        print("孔口表信息")
        print("=" * 40)
        print(f"字段: {db.collar.columns}")
        print(f"记录数: {len(db.collar)}")

        # 3. 查看前3条孔口记录
        print("\n前3条孔口记录:")
        for row in db.collar[0:3]:
            print(f"  {row.to_dict()}")

        # 4. 提取所有孔号
        hole_ids = db.collar["HoleID"]
        print(f"\n所有钻孔: {hole_ids}")

        # 5. 转换为 pandas DataFrame 进行分析
        try:
            df = db.collar.to_pandas()
            print(f"\npandas DataFrame 形状: {df.shape}")
            print(df.head())
        except ImportError:
            print("\n未安装 pandas,跳过 DataFrame 转换")

        # 6. 样长组合示例
        print("\n" + "=" * 40)
        print("样长组合处理")
        print("=" * 40)
        params = SampleLengthCombineParam(
            input_file="C:/Projects/Mine/样品数据.dmf",
            combine_length=1.0,
            combine_percent=1.0,
            output_file="C:/Projects/Mine/组合后样品.dmf",
        )
        result = db.process_samples(params)
        print(f"处理结果: {result}")


if __name__ == "__main__":
    main()