Metadata-Version: 2.4
Name: hello-datap-component-base
Version: 0.3.1
Summary: A unified server management framework for data processing component
Author: Data Processing Team
License: MIT
Keywords: data,hello,management,microservice
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: click>=8.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: python-json-logger>=2.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: aliyun-mns>=1.1.5
Requires-Dist: oss2>=2.18.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"

# 数据处理平台组件基类

统一的数据处理组件开发框架，标准化入参、出参和执行入口。

## 功能特性

- 统一的接口规范（继承 `BaseService`，实现 `process` 方法）
- 配置加载（本地 JSON / HTTP 远程 JSON / base64 编码 URL）
- 动态 pip 包安装（`runtime_env.pip`）
- 结构化日志（自动包含服务名称和版本，兼容 Ray）
- 结果投递（上传 OSS + MNS 通知，自动重试）
- 日志上传 OSS（上传后自动删除本地文件）
- 数据跟踪（`track` 模块，两段式上报到 MNS 顺序队列）
- 工具函数（`tools` 模块，环境变量提取、bag 路径解析）

## 安装

```bash
pip install hello-datap-component-base
```

## 快速上手

详见 **[快速开始](QUICKSTART.md)** | **[用户手册](USER_GUIDE.md)**

```python
from hello_datap_component_base import BaseService

class MyService(BaseService):
    async def process(self, data: dict) -> dict:
        return {"result": data.get("value", 0) * 2}
```

```bash
component_manager start config.json
```

## 配置文件

```json
{
  "name": "my-service",
  "version": "1.0.0",
  "work_flow_id": 123,
  "work_flow_instance_id": 456,
  "task_id": "task-001",
  "runtime_env": {
    "pip": ["requests>=2.25.0"],
    "env_vars": {
      "LOG_LEVEL": "INFO",
      "MNS_ENDPOINT": "https://your-account.mns.cn-shanghai.aliyuncs.com",
      "MNS_ACCESS_KEY_ID": "your-key-id",
      "MNS_ACCESS_KEY_SECRET": "your-key-secret",
      "OSS_ENDPOINT_FOR_LOG": "https://oss-cn-shanghai.aliyuncs.com",
      "OSS_ACCESS_KEY_ID_FOR_LOG": "your-key-id",
      "OSS_ACCESS_KEY_SECRET_FOR_LOG": "your-key-secret",
      "OSS_BUCKET_NAME_FOR_LOG": "your-bucket"
    }
  },
  "params": {
    "field1": "value1"
  }
}
```

| 字段 | 类型 | 必需 | 说明 |
|------|------|:----:|------|
| `name` | string | 是 | 服务名称 |
| `version` | string | 否 | 服务版本 |
| `work_flow_id` | int | 否 | 工作流 ID |
| `work_flow_instance_id` | int | 否 | 工作流实例 ID |
| `task_id` | string | 否 | 任务 ID |
| `runtime_env.pip` | list | 否 | 启动前自动安装的 pip 包 |
| `runtime_env.env_vars` | dict | 否 | 注入到进程的环境变量 |
| `params` | dict | 否 | 传递给 `process` 方法的输入数据 |

## 返回结果格式

```json
{
    "code": 0,
    "message": "success",
    "processing_time": 0.123,
    "data": {
        "work_flow_id": 123,
        "work_flow_instance_id": 456,
        "task_id": "task-001",
        "out_put": { "...用户返回的结果..." }
    }
}
```

异常时 `code=-1`，`message` 为错误信息，`out_put` 为 `null`。

## 结果投递（MNS + OSS）

配置 MNS 和 OSS 环境变量后，框架自动完成：
1. 完整结果 JSON → 上传到 `oss://infra-hads-artifacts/data-process-platform/task-result/{task_id}.json`
2. OSS 地址 → 发送到 MNS 队列

避免 MNS 64KB 消息限制。未配置则静默跳过。

## 数据跟踪（track）

```python
from hello_datap_component_base import track

track.log(data=bag_path)                  # 任务开始
track.log(data=bag_path, status=1)        # 任务成功（status: 1=成功 2=失败）
```

通过 `data + data_type + pipeline_instance_id + node_task_id` 四元组判断插入/更新。

环境变量：`DATA_TRACK_PIPELINE_ID`、`DATA_TRACK_PIPELINE_INSTANCE_ID`、`DATA_TRACK_NODE_ID`、`DATA_TRACK_NODE_TASK_ID`、`DATA_TRACK_COMPONENT_VERSION`、`DATA_TRACK_PRIORITY`、`DATA_TRACK_EXECUTOR`、`DATA_TRACK_RUN_TYPE`、`DATA_TRACK_MNS_QUEUE_NAME`。MNS 认证复用 `MNS_ENDPOINT` 等。

## 工具函数（tools）

```python
from hello_datap_component_base import tools

env_vars = tools.extract_specific_env_vars()           # 提取框架相关环境变量
vehicle  = tools.get_vehicle_id_from_bag(bag_path)     # "1_002"
date     = tools.get_date_from_bag(bag_path)           # "20260119"
package  = tools.get_package_name_from_bag(bag_path)   # "1_002_20260119-151708"
```

## 日志

```python
from hello_datap_component_base import logger

logger.info("处理中...")
logger.error(f"失败: {e}")
```

或在服务类内使用 `self.logger`。日志自动包含服务名称和版本，兼容 Ray。

## 命令行

```bash
component_manager start <config_path>       # 启动服务
component_manager list                       # 列出服务类
component_manager validate <config_path>     # 验证配置
component_manager test <config_path> [data]  # 测试服务
```

也可用 `python -m hello_datap_component_base.cli` 替代 `component_manager`。

## 项目结构

```
hello_datap_component_base/
├── __init__.py        # 模块导出
├── base.py            # BaseService 基类
├── config.py          # 配置管理
├── runner.py          # 服务运行器
├── cli.py             # 命令行工具
├── discover.py        # 服务发现
├── logger.py          # 日志管理
├── mns_client.py      # MNS 队列客户端
├── oss_client.py      # OSS 客户端
├── track.py           # 数据跟踪
├── tools.py           # 工具函数
└── data/
    └── bag_data_service.py  # Bag 数据服务
```

## 许可证

MIT License
