Skip to content

drill_db

钻孔数据库连接管理器 提供简洁的钻孔数据库操作接口

DrillDBError

Bases: RuntimeError

钻孔数据库操作异常基类

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
25
26
27
class DrillDBError(RuntimeError):
    """钻孔数据库操作异常基类"""
    pass

DrillDBLoadError

Bases: DrillDBError

加载钻孔数据库失败

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
30
31
32
class DrillDBLoadError(DrillDBError):
    """加载钻孔数据库失败"""
    pass

DrillDBManager

钻孔数据库管理器

管理单个钻孔数据库的加载和操作。 提供 DataFrame 化的表访问体验:

db = DrillDBManager("钻孔数据.dmf")
db.collar['HoleID']      # 取整列
db.collar[0]             # 取单行(TableRow)
db.collar[0:10]          # 切片
len(db.collar)           # 记录数
db.collar.columns        # 字段名列表
Source code in dimine_python_sdk\lib\prospecting\drill_db.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
class DrillDBManager:
    """
    钻孔数据库管理器

    管理单个钻孔数据库的加载和操作。
    提供 DataFrame 化的表访问体验:

        db = DrillDBManager("钻孔数据.dmf")
        db.collar['HoleID']      # 取整列
        db.collar[0]             # 取单行(TableRow)
        db.collar[0:10]          # 切片
        len(db.collar)           # 记录数
        db.collar.columns        # 字段名列表
    """

    def __init__(self, file_path: str, base_path: Optional[str] = None):
        """
        初始化并加载钻孔数据库

        Args:
            file_path: DMF 文件路径(可以是相对路径)
            base_path: 基础路径,用于相对路径解析
        """
        if base_path:
            full_path = Path(base_path) / file_path
            if not full_path.exists():
                full_path = Path(file_path)
            self.file_path = str(full_path)
        else:
            self.file_path = file_path

        self._geo_drill = Dm.dmGeoDrillData()
        self._manager = None
        self._collar: Optional[DmDataTable] = None
        self._survey: Optional[DmDataTable] = None
        self._lithology: Optional[DmDataTable] = None

        self._load_file()

    # ------------------------------------------------------------------
    # 内部加载
    # ------------------------------------------------------------------
    def _load_file(self):
        """加载 DMF 文件"""
        result = self._geo_drill.Load(self.file_path)
        if not result:
            raise DrillDBLoadError(f"加载钻孔数据库失败: {self.file_path}")

        manager = self._geo_drill.GetDrillDBManager()
        if not manager:
            raise DrillDBLoadError("钻孔数据库管理器获取失败")

        self._manager = manager

    def _invalidate_cache(self):
        """使缓存的表对象失效"""
        self._collar = None
        self._survey = None
        self._lithology = None

    # ------------------------------------------------------------------
    # 生命周期
    # ------------------------------------------------------------------
    def reload(self):
        """重新加载文件"""
        self._load_file()
        self._invalidate_cache()

    def save(self, output_path: Optional[str] = None) -> bool:
        """
        保存钻孔数据库

        注意:由于底层 C++ 接口对路径的特殊处理,
        此处会临时切换工作目录到盘符根目录。
        """
        save_path = output_path or self.file_path
        save_path = os.path.normpath(save_path)

        directory = os.path.dirname(save_path)
        filename = os.path.basename(save_path)
        original_cwd = os.getcwd()

        try:
            if directory:
                drive = os.path.splitdrive(save_path)[0]
                if drive:
                    os.chdir(drive + "\\")
                    relative_path = os.path.relpath(save_path, drive + "\\")
                    return self._geo_drill.Save(relative_path)
                else:
                    os.chdir(directory)
                    return self._geo_drill.Save(filename)
            return self._geo_drill.Save(save_path)
        except Exception as e:
            print(f"保存失败: {e}")
            return False
        finally:
            os.chdir(original_cwd)

    def close(self):
        """释放资源"""
        self._invalidate_cache()
        self._manager = None
        self._geo_drill = None

    # ------------------------------------------------------------------
    # 表访问(DataFrame 化)
    # ------------------------------------------------------------------
    @property
    def collar(self) -> DmDataTable:
        """钻孔孔口表"""
        if self._collar is None:
            cpp_result = self._geo_drill.GetCollarTable()
            self._collar = DmDataTable._from_obj(cpp_result)
        return self._collar

    @property
    def survey(self) -> DmDataTable:
        """钻孔测斜表"""
        if self._survey is None:
            cpp_result = self._geo_drill.GetSurveyTable()
            self._survey = DmDataTable._from_obj(cpp_result)
        return self._survey

    @property
    def lithology(self) -> DmDataTable:
        """钻孔岩性表"""
        if self._lithology is None:
            cpp_result = self._geo_drill.GetLithologyTable()
            self._lithology = DmDataTable._from_obj(cpp_result)
        return self._lithology

    def __getitem__(self, table_name: str) -> DmDataTable:
        """统一表访问:db['collar'] / db['survey'] / db['lithology']"""
        if table_name == "collar":
            return self.collar
        if table_name == "survey":
            return self.survey
        if table_name == "lithology":
            return self.lithology
        raise KeyError(f"未知表: {table_name}。支持的表: 'collar', 'survey', 'lithology'")

    # ------------------------------------------------------------------
    # 基础信息(属性化)
    # ------------------------------------------------------------------
    @property
    def drill_count(self) -> int:
        """钻孔数量"""
        if not self._manager:
            raise DrillDBError("钻孔数据库未加载")
        return self._manager.GetDrillCount()

    @property
    def total_length(self) -> float:
        """钻孔总长度"""
        if not self._manager:
            raise DrillDBError("钻孔数据库未加载")
        return self._manager.GetTotalLength()

    @property
    def info(self) -> Dict[str, Any]:
        """数据库基本信息"""
        return {
            "drill_count": self.drill_count,
            "total_length": self.total_length,
            "file_path": str(self.file_path),
        }

    # ------------------------------------------------------------------
    # 底层原始访问(给高级用户)
    # ------------------------------------------------------------------
    def get_base_table(self, table_mark: str) -> DmDataTable:
        """获取基础表"""
        if not self._manager:
            raise DrillDBError("钻孔数据库未加载")
        return self._manager.GetBaseTable(table_mark)

    def get_add_table(self, table_mark: str) -> DmDataTable:
        """获取附加表"""
        if not self._manager:
            raise DrillDBError("钻孔数据库未加载")
        return self._manager.GetAddedTable(table_mark)

    def get_added_table_count(self) -> int:
        """获取附加表数量"""
        return self._manager.GetAddedTableCount()

    def get_collar_table_def(self) -> DmDataTable:
        """获取孔口表结构定义"""
        return self._manager.GetCollarTableDefine()

    def get_survey_table_def(self) -> DmDataTable:
        """获取测斜表结构定义"""
        return self._manager.GetSurveryTableDefine()

    def get_lithology_table_def(self) -> DmDataTable:
        """获取岩性表结构定义"""
        return self._manager.GetLithologyTableDefine()

    # ------------------------------------------------------------------
    # 钻孔处理功能
    # ------------------------------------------------------------------
    def process_samples(self, params: SampleLengthCombineParam) -> dict:
        """处理样长组合"""
        return DrillFunctionWrapper.sample_length_combine(params)

    def process_steps(self, params: StepCombineParam) -> dict:
        """处理台阶组合"""
        return DrillFunctionWrapper.step_combine(params)

    def process_high_grade(self, params: HighGradeProcessParam) -> dict:
        """处理特高品位"""
        return DrillFunctionWrapper.extra_high_grade_process(params)

    # ------------------------------------------------------------------
    # 已弃用兼容方法(将在未来版本中移除)
    # ------------------------------------------------------------------

    def _warn_deprecated(self, old: str, new: str):
        warnings.warn(
            f"{old}() 已弃用,请使用 {new}",
            DeprecationWarning,
            stacklevel=3,
        )

    def get_collar_table(self) -> DmDataTable:
        self._warn_deprecated("get_collar_table", "db.collar")
        return self.collar

    def get_survey_table(self) -> DmDataTable:
        self._warn_deprecated("get_survey_table", "db.survey")
        return self.survey

    def get_lithology_table(self) -> DmDataTable:
        self._warn_deprecated("get_lithology_table", "db.lithology")
        return self.lithology

    def get_info(self) -> Dict[str, Any]:
        self._warn_deprecated("get_info()", "db.info")
        return self.info

    def get_drill_count(self) -> int:
        self._warn_deprecated("get_drill_count()", "db.drill_count")
        return self.drill_count

    def get_total_length(self) -> float:
        self._warn_deprecated("get_total_length()", "db.total_length")
        return self.total_length

    def get_collar_table_stats(self) -> dict:
        self._warn_deprecated("get_collar_table_stats()", "{'field_count': len(db.collar.columns), 'record_count': len(db.collar)}")
        return {"field_count": len(self.collar.columns), "record_count": len(self.collar)}

    def get_survey_table_stats(self) -> dict:
        self._warn_deprecated("get_survey_table_stats()", "len(db.survey)")
        return {"field_count": len(self.survey.columns), "record_count": len(self.survey)}

    def get_lithology_table_stats(self) -> dict:
        self._warn_deprecated("get_lithology_table_stats()", "len(db.lithology)")
        return {"field_count": len(self.lithology.columns), "record_count": len(self.lithology)}

    def get_table_stats(self, table_type: str) -> dict:
        self._warn_deprecated("get_table_stats()", "len(db[table_type]) / db[table_type].columns")
        table = self[table_type]
        return {"field_count": len(table.columns), "record_count": len(table)}

    def get_collar_field_count(self) -> int:
        self._warn_deprecated("get_collar_field_count()", "len(db.collar.columns)")
        return len(self.collar.columns)

    def get_survey_field_count(self) -> int:
        self._warn_deprecated("get_survey_field_count()", "len(db.survey.columns)")
        return len(self.survey.columns)

    def get_lithology_field_count(self) -> int:
        self._warn_deprecated("get_lithology_field_count()", "len(db.lithology.columns)")
        return len(self.lithology.columns)

    def get_collar_record_count(self) -> int:
        self._warn_deprecated("get_collar_record_count()", "len(db.collar)")
        return len(self.collar)

    def get_survey_record_count(self) -> int:
        self._warn_deprecated("get_survey_record_count()", "len(db.survey)")
        return len(self.survey)

    def get_lithology_record_count(self) -> int:
        self._warn_deprecated("get_lithology_record_count()", "len(db.lithology)")
        return len(self.lithology)

    def get_collar_record_data(self, record_index: int) -> dict:
        self._warn_deprecated("get_collar_record_data()", "db.collar[record_index].to_dict()")
        return self.collar[record_index].to_dict()

    def get_survey_record_data(self, record_index: int) -> dict:
        self._warn_deprecated("get_survey_record_data()", "db.survey[record_index].to_dict()")
        return self.survey[record_index].to_dict()

    def get_lithology_record_data(self, record_index: int) -> dict:
        self._warn_deprecated("get_lithology_record_data()", "db.lithology[record_index].to_dict()")
        return self.lithology[record_index].to_dict()

    def get_collar_filed_id(self, field_name: str) -> int:
        self._warn_deprecated("get_collar_filed_id()", "db.collar.get_field_id()")
        return self.collar.get_field_id(field_name)

    def get_survey_filed_id(self, field_name: str) -> int:
        self._warn_deprecated("get_survey_filed_id()", "db.survey.get_field_id()")
        return self.survey.get_field_id(field_name)

    def get_lithology_filed_id(self, field_name: str) -> int:
        self._warn_deprecated("get_lithology_filed_id()", "db.lithology.get_field_id()")
        return self.lithology.get_field_id(field_name)

    def get_collar_filed_name(self, field_index: int) -> str:
        self._warn_deprecated("get_collar_filed_name()", "db.collar.get_field_name()")
        return self.collar.get_field_name(field_index)

    def get_survey_filed_name(self, field_index: int) -> str:
        self._warn_deprecated("get_survey_filed_name()", "db.survey.get_field_name()")
        return self.survey.get_field_name(field_index)

    def get_lithology_filed_name(self, field_index: int) -> str:
        self._warn_deprecated("get_lithology_filed_name()", "db.lithology.get_field_name()")
        return self.lithology.get_field_name(field_index)

    def get_collar_display_name(self, field_index: int) -> str:
        self._warn_deprecated("get_collar_display_name()", "db.collar.get_field_display_name()")
        return self.collar.get_field_display_name(field_index)

    def get_survey_display_name(self, field_index: int) -> str:
        self._warn_deprecated("get_survey_display_name()", "db.survey.get_field_display_name()")
        return self.survey.get_field_display_name(field_index)

    def get_lithology_display_name(self, field_index: int) -> str:
        self._warn_deprecated("get_lithology_display_name()", "db.lithology.get_field_display_name()")
        return self.lithology.get_field_display_name(field_index)

    def get_collar_field_type(self, field_index: int) -> int:
        self._warn_deprecated("get_collar_field_type()", "db.collar.get_field_type()")
        return self.collar.get_field_type(field_index)

    def get_survey_field_type(self, field_index: int) -> int:
        self._warn_deprecated("get_survey_field_type()", "db.survey.get_field_type()")
        return self.survey.get_field_type(field_index)

    def get_lithology_field_type(self, field_index: int) -> int:
        self._warn_deprecated("get_lithology_field_type()", "db.lithology.get_field_type()")
        return self.lithology.get_field_type(field_index)

    def get_collar_from_index(self, field_index: int):
        self._warn_deprecated("get_collar_from_index()", "db.collar.get_record_from_index()")
        return self.collar.get_record_from_index(field_index)

    def get_survey_from_index(self, field_index: int):
        self._warn_deprecated("get_survey_from_index()", "db.survey.get_record_from_index()")
        return self.survey.get_record_from_index(field_index)

    def get_lithology_from_index(self, field_index: int):
        self._warn_deprecated("get_lithology_from_index()", "db.lithology.get_record_from_index()")
        return self.lithology.get_record_from_index(field_index)

    def get_collar_count_by_condition(self, field_name: str, field_value: str) -> int:
        self._warn_deprecated("get_collar_count_by_condition()", "db.collar.get_record_count_by_condition()")
        return self.collar.get_record_count_by_condition(field_name, field_value)

    def get_survey_count_by_condition(self, field_name: str, field_value: str) -> int:
        self._warn_deprecated("get_survey_count_by_condition()", "db.survey.get_record_count_by_condition()")
        return self.survey.get_record_count_by_condition(field_name, field_value)

    def get_lithology_count_by_condition(self, field_name: str, field_value: str) -> int:
        self._warn_deprecated("get_lithology_count_by_condition()", "db.lithology.get_record_count_by_condition()")
        return self.lithology.get_record_count_by_condition(field_name, field_value)

    def get_collar_save(self) -> bool | int:
        self._warn_deprecated("get_collar_save()", "db.collar.save()")
        return self.collar.save()

    def get_survey_save(self) -> bool | int:
        self._warn_deprecated("get_survey_save()", "db.survey.save()")
        return self.survey.save()

    def get_lithology_save(self) -> bool | int:
        self._warn_deprecated("get_lithology_save()", "db.lithology.save()")
        return self.lithology.save()

    def add_collar_count(self):
        self._warn_deprecated("add_collar_count()", "db.collar.add_record()")
        return self.collar.add_record()

    def add_survey_count(self):
        self._warn_deprecated("add_survey_count()", "db.survey.add_record()")
        return self.survey.add_record()

    def add_lithology_count(self):
        self._warn_deprecated("add_lithology_count()", "db.lithology.add_record()")
        return self.lithology.add_record()

collar property

钻孔孔口表

drill_count property

钻孔数量

info property

数据库基本信息

lithology property

钻孔岩性表

survey property

钻孔测斜表

total_length property

钻孔总长度

__getitem__(table_name)

统一表访问:db['collar'] / db['survey'] / db['lithology']

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
218
219
220
221
222
223
224
225
226
def __getitem__(self, table_name: str) -> DmDataTable:
    """统一表访问:db['collar'] / db['survey'] / db['lithology']"""
    if table_name == "collar":
        return self.collar
    if table_name == "survey":
        return self.survey
    if table_name == "lithology":
        return self.lithology
    raise KeyError(f"未知表: {table_name}。支持的表: 'collar', 'survey', 'lithology'")

__init__(file_path, base_path=None)

初始化并加载钻孔数据库

Parameters:

Name Type Description Default
file_path str

DMF 文件路径(可以是相对路径)

required
base_path Optional[str]

基础路径,用于相对路径解析

None
Source code in dimine_python_sdk\lib\prospecting\drill_db.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(self, file_path: str, base_path: Optional[str] = None):
    """
    初始化并加载钻孔数据库

    Args:
        file_path: DMF 文件路径(可以是相对路径)
        base_path: 基础路径,用于相对路径解析
    """
    if base_path:
        full_path = Path(base_path) / file_path
        if not full_path.exists():
            full_path = Path(file_path)
        self.file_path = str(full_path)
    else:
        self.file_path = file_path

    self._geo_drill = Dm.dmGeoDrillData()
    self._manager = None
    self._collar: Optional[DmDataTable] = None
    self._survey: Optional[DmDataTable] = None
    self._lithology: Optional[DmDataTable] = None

    self._load_file()

close()

释放资源

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
185
186
187
188
189
def close(self):
    """释放资源"""
    self._invalidate_cache()
    self._manager = None
    self._geo_drill = None

get_add_table(table_mark)

获取附加表

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
263
264
265
266
267
def get_add_table(self, table_mark: str) -> DmDataTable:
    """获取附加表"""
    if not self._manager:
        raise DrillDBError("钻孔数据库未加载")
    return self._manager.GetAddedTable(table_mark)

get_added_table_count()

获取附加表数量

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
269
270
271
def get_added_table_count(self) -> int:
    """获取附加表数量"""
    return self._manager.GetAddedTableCount()

get_base_table(table_mark)

获取基础表

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
257
258
259
260
261
def get_base_table(self, table_mark: str) -> DmDataTable:
    """获取基础表"""
    if not self._manager:
        raise DrillDBError("钻孔数据库未加载")
    return self._manager.GetBaseTable(table_mark)

get_collar_table_def()

获取孔口表结构定义

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
273
274
275
def get_collar_table_def(self) -> DmDataTable:
    """获取孔口表结构定义"""
    return self._manager.GetCollarTableDefine()

get_lithology_table_def()

获取岩性表结构定义

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
281
282
283
def get_lithology_table_def(self) -> DmDataTable:
    """获取岩性表结构定义"""
    return self._manager.GetLithologyTableDefine()

get_survey_table_def()

获取测斜表结构定义

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
277
278
279
def get_survey_table_def(self) -> DmDataTable:
    """获取测斜表结构定义"""
    return self._manager.GetSurveryTableDefine()

process_high_grade(params)

处理特高品位

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
296
297
298
def process_high_grade(self, params: HighGradeProcessParam) -> dict:
    """处理特高品位"""
    return DrillFunctionWrapper.extra_high_grade_process(params)

process_samples(params)

处理样长组合

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
288
289
290
def process_samples(self, params: SampleLengthCombineParam) -> dict:
    """处理样长组合"""
    return DrillFunctionWrapper.sample_length_combine(params)

process_steps(params)

处理台阶组合

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
292
293
294
def process_steps(self, params: StepCombineParam) -> dict:
    """处理台阶组合"""
    return DrillFunctionWrapper.step_combine(params)

reload()

重新加载文件

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
149
150
151
152
def reload(self):
    """重新加载文件"""
    self._load_file()
    self._invalidate_cache()

save(output_path=None)

保存钻孔数据库

注意:由于底层 C++ 接口对路径的特殊处理, 此处会临时切换工作目录到盘符根目录。

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def save(self, output_path: Optional[str] = None) -> bool:
    """
    保存钻孔数据库

    注意:由于底层 C++ 接口对路径的特殊处理,
    此处会临时切换工作目录到盘符根目录。
    """
    save_path = output_path or self.file_path
    save_path = os.path.normpath(save_path)

    directory = os.path.dirname(save_path)
    filename = os.path.basename(save_path)
    original_cwd = os.getcwd()

    try:
        if directory:
            drive = os.path.splitdrive(save_path)[0]
            if drive:
                os.chdir(drive + "\\")
                relative_path = os.path.relpath(save_path, drive + "\\")
                return self._geo_drill.Save(relative_path)
            else:
                os.chdir(directory)
                return self._geo_drill.Save(filename)
        return self._geo_drill.Save(save_path)
    except Exception as e:
        print(f"保存失败: {e}")
        return False
    finally:
        os.chdir(original_cwd)

DrillDBProcessError

Bases: DrillDBError

钻孔数据处理失败

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
35
36
37
38
39
class DrillDBProcessError(DrillDBError):
    """钻孔数据处理失败"""
    def __init__(self, message: str, response: Optional[dict] = None):
        super().__init__(message)
        self.response = response

DrillFunctionWrapper

钻孔相关功能的封装器,提供更简洁的接口

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DrillFunctionWrapper:
    """
    钻孔相关功能的封装器,提供更简洁的接口
    """

    @staticmethod
    def sample_length_combine(params: SampleLengthCombineParam) -> dict:
        """样长组合"""
        json_param = params.model_dump_json()
        raw_response = Dm.SampleLengthCombine(json_param)
        response = __import__("json").loads(raw_response)
        if response.get("state") == "failed":
            raise DrillDBProcessError(f"样长组合失败: {response.get('message')}", response)
        return response

    @staticmethod
    def step_combine(params: StepCombineParam) -> dict:
        """台阶组合"""
        json_param = params.model_dump_json()
        raw_response = Dm.StepCombine(json_param)
        response = __import__("json").loads(raw_response)
        if response.get("state") == "failed":
            raise DrillDBProcessError(f"台阶组合失败: {response.get('message')}", response)
        return response

    @staticmethod
    def extra_high_grade_process(params: HighGradeProcessParam) -> dict:
        """特高品位处理"""
        json_param = params.model_dump_json()
        raw_response = Dm.ExtraHighGradeProcess(json_param)
        response = __import__("json").loads(raw_response)
        if response.get("state") == "failed":
            raise DrillDBProcessError(f"特高品位处理失败: {response.get('message')}", response)
        return response

extra_high_grade_process(params) staticmethod

特高品位处理

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
71
72
73
74
75
76
77
78
79
@staticmethod
def extra_high_grade_process(params: HighGradeProcessParam) -> dict:
    """特高品位处理"""
    json_param = params.model_dump_json()
    raw_response = Dm.ExtraHighGradeProcess(json_param)
    response = __import__("json").loads(raw_response)
    if response.get("state") == "failed":
        raise DrillDBProcessError(f"特高品位处理失败: {response.get('message')}", response)
    return response

sample_length_combine(params) staticmethod

样长组合

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
51
52
53
54
55
56
57
58
59
@staticmethod
def sample_length_combine(params: SampleLengthCombineParam) -> dict:
    """样长组合"""
    json_param = params.model_dump_json()
    raw_response = Dm.SampleLengthCombine(json_param)
    response = __import__("json").loads(raw_response)
    if response.get("state") == "failed":
        raise DrillDBProcessError(f"样长组合失败: {response.get('message')}", response)
    return response

step_combine(params) staticmethod

台阶组合

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
61
62
63
64
65
66
67
68
69
@staticmethod
def step_combine(params: StepCombineParam) -> dict:
    """台阶组合"""
    json_param = params.model_dump_json()
    raw_response = Dm.StepCombine(json_param)
    response = __import__("json").loads(raw_response)
    if response.get("state") == "failed":
        raise DrillDBProcessError(f"台阶组合失败: {response.get('message')}", response)
    return response

drill_conn(file_path, base_path=None)

钻孔数据库连接上下文管理器

Parameters:

Name Type Description Default
file_path str

DMF 文件路径

required
base_path Optional[str]

基础路径,用于相对路径解析

None
Usage

with drill_conn("database.dmf") as db: print(db.info) print(db.collar[0]) df = db.collar.to_pandas()

Source code in dimine_python_sdk\lib\prospecting\drill_db.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
@contextmanager
def drill_conn(file_path: str, base_path: Optional[str] = None) -> Generator[DrillDBManager, None, None]:
    """
    钻孔数据库连接上下文管理器

    Args:
        file_path: DMF 文件路径
        base_path: 基础路径,用于相对路径解析

    Usage:
        with drill_conn("database.dmf") as db:
            print(db.info)
            print(db.collar[0])
            df = db.collar.to_pandas()
    """
    if base_path:
        full_path = Path(base_path) / file_path
        if not full_path.exists():
            full_path = Path(file_path)
        actual_file_path = str(full_path)
    else:
        actual_file_path = file_path

    db = DrillDBManager(actual_file_path)
    try:
        yield db
    finally:
        db.close()