Metadata-Version: 2.4
Name: hello-datap-component-base
Version: 0.2.0
Summary: A unified server management framework for data processing component
Author-email: zhaohaidong <zhaohaidong389@hellobike.com>
License: MIT
Project-URL: Homepage, https://gitlab.hellorobotaxi.top/hdata/hello-datap-component-base
Keywords: data,hello,management,microservice
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: click>=8.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: python-json-logger>=2.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: aliyun-mns>=1.1.5
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"

# 数据处理平台组件基类

统一的数据处理平台组件开发框架，提供标准化的服务管理，统一用户代码的入参和出参以及程序执行入口。

## 功能特性

- ✅ **统一的接口规范**：标准化的入参和出参格式
- ✅ **灵活的执行方式**：可以运行在集群上，也可以独立运行
- ✅ **灵活的配置加载**：支持本地JSON文件和HTTP远程JSON文件
- ✅ **完整的生命周期管理**：预处理、处理、后处理钩子
- ✅ **内置日志系统**：结构化的日志输出，自动包含服务名称和版本信息
- ✅ **服务发现机制**：自动发现和加载服务类
- ✅ **动态依赖安装**：支持在配置文件中指定pip包，服务启动前自动安装
- ✅ **Ray兼容**：日志系统兼容Ray分布式计算框架

## 安装

```bash
# 从源码安装
pip install -e .
```

## 文档导航

- **[用户介入手册](USER_GUIDE.md)** - 详细的用户使用指南，包含安装、开发、配置等完整流程
- **[快速开始指南](QUICKSTART.md)** - 快速上手指南

## 快速开始

### 1. 创建服务类

创建一个继承自 `BaseService` 的服务类：

```python
from hello_datap_component_base import BaseService, ServiceConfig
import asyncio

class MyDataService(BaseService):
    """我的数据处理服务"""
    
    async def process(self, data: dict) -> dict:
        """处理业务逻辑"""
        # 你的业务逻辑 here
        result = {
            "status": "success",
            "processed_data": data
        }
        return result
```

### 2. 创建配置文件

创建配置文件 `config.json`：

```json
{
  "name": "my-service",
  "version": "1.0.0",
  "runtime_env": {
    "pip": ["requests>=2.25.0"],
    "env_vars": {
      "LOG_LEVEL": "INFO"
    }
  },
  "params": {
    "key": "value",
    "field1": "data1"
  }
}
```

**说明**：
- `runtime_env.pip`: 指定需要安装的Python包，服务启动前会自动安装
- `params`: 这些参数会作为输入数据传递给 `process` 方法

### 3. 运行服务

#### 方式一：使用命令行工具

```bash
# 启动服务并执行一次处理（输入数据从配置文件的 params 获取）
component_manager start config.json

# 或使用HTTP远程配置
component_manager start http://example.com/config.json
```

**注意**：服务启动后会执行一次处理，输出结果后自动退出。输入数据来自配置文件的 `params` 字段，整个 `params` 字典会作为输入数据传递给 `process` 方法。

#### 方式二：在代码中直接使用

```python
import asyncio
from hello_datap_component_base import BaseService, ServiceConfig

# 创建配置
config = ServiceConfig(
    name="my-service",
    params={"custom_param": "value"}
)

# 创建服务实例
service = MyDataService(config)

# 处理请求
result = await service.handle_request({"input": "data"})
```

## 配置说明

### 配置文件格式

配置文件支持JSON格式，可以存储在本地文件或通过HTTP URL访问。

#### 本地文件配置

```json
{
  "name": "service-name",
  "version": "1.0.0",
  "work_flow_id": 123,
  "work_flow_instance_id": 456,
  "task_id": "task-12345",
  "runtime_env": {
    "pip": ["emoji==2.15.0", "requests>=2.25.0"],
    "env_vars": {
      "LOG_LEVEL": "INFO",
      "CUSTOM_VAR": "value",
      "MNS_ENDPOINT": "https://123456789.mns.cn-shanghai.aliyuncs.com",
      "MNS_ACCESS_KEY_ID": "your-access-key-id",
      "MNS_ACCESS_KEY_SECRET": "your-access-key-secret"
    }
  },
  "params": {
    "field1": "value1",
    "field2": 100
  }
}
```

**注意**：
- `runtime_env.pip` 中的包会在服务启动前自动安装
- `params` 中的内容会作为输入数据传递给 `process` 方法
- `work_flow_id`、`work_flow_instance_id`、`task_id` 会包含在返回结果中
- 如果配置了 MNS 相关环境变量，处理结果会自动发送到 MNS 队列

#### HTTP远程配置

配置文件可以通过HTTP URL访问：

```bash
component_manager start http://example.com/configs/my-service.json
```

### 配置字段说明

- **name** (必需): 服务名称
- **version** (可选): 服务版本，会在日志中体现
- **work_flow_id** (可选): 工作流ID，用于结果追踪
- **work_flow_instance_id** (可选): 工作流实例ID，用于结果追踪
- **task_id** (可选): 任务ID，用于结果追踪
- **runtime_env** (可选): 运行时环境配置
  - **pip**: Python包列表，服务启动前会自动安装这些包。例如：`["emoji==2.15.0", "requests>=2.25.0"]`
  - **env_vars**: 环境变量字典
    - **MNS_ENDPOINT**: 阿里云MNS服务端点（可选，用于发送结果到队列）
    - **MNS_ACCESS_KEY_ID**: 阿里云MNS访问密钥ID（可选）
    - **MNS_ACCESS_KEY_SECRET**: 阿里云MNS访问密钥（可选）
    - **MNS_QUEUE_NAME**: MNS队列名称（可选，默认为 aiinfra-data-process-component-result-queue）
- **params** (可选): 服务参数，会作为输入数据传递给 `process` 方法，也可以通过 `self.params` 访问

## API 参考

### BaseService

所有服务类必须继承自 `BaseService`。

#### 必需实现的方法

- `async def process(self, data: Dict[str, Any]) -> Dict[str, Any]`
  - 处理请求的核心业务逻辑

#### 可选重写的方法

- `async def pre_process(self, data: Dict[str, Any]) -> Dict[str, Any]`
  - 预处理钩子，在 `process` 之前调用
  - 默认返回原始数据

- `async def post_process(self, data: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]`
  - 后处理钩子，在 `process` 之后调用
  - 默认返回处理结果

#### 属性

- `self.config`: 服务配置对象 (`ServiceConfig`)
- `self.params`: 服务参数字典
- `self.logger`: 日志器对象

### ServiceConfig

服务配置类：

```python
ServiceConfig(
    name: str,
    version: Optional[str] = None,
    params: Dict[str, Any] = {},
    runtime_env: Optional[Dict[str, Any]] = None,
    work_flow_id: Optional[int] = None,
    work_flow_instance_id: Optional[int] = None,
    task_id: Optional[str] = None
)
```

### 返回结果格式

`handle_request` 方法会自动封装返回结果，格式如下：

**正常情况：**
```json
{
    "code": 0,
    "message": "success",
    "processing_time": 0.123,
    "data": {
        "work_flow_id": 123,
        "work_flow_instance_id": 456,
        "task_id": "task-12345",
        "out_put": {
            // 用户程序 process 方法返回的结果
        }
    }
}
```

**异常情况：**
```json
{
    "code": -1,
    "message": "错误消息",
    "processing_time": 0.045,
    "data": {
        "work_flow_id": 123,
        "work_flow_instance_id": 456,
        "task_id": "task-12345",
        "out_put": null
    }
}
```

**注意**：无论正常还是异常，结果都会自动发送到 MNS 队列（如果配置了 MNS 环境变量）。

### ServiceRunner

服务运行器，用于启动和管理服务：

```python
runner = ServiceRunner(config_path: str, class_name: Optional[str] = None)
runner.run()
```

## 命令行工具

### 使用方式

**方式1：直接使用命令（需要 PATH 配置）**
```bash
component_manager start <config_path>
component_manager list
component_manager validate <config_path>
component_manager test <config_path>
```

**方式2：使用 Python 模块方式（推荐，不依赖 PATH）**
```bash
python -m hello_datap_component_base.cli start <config_path>
python -m hello_datap_component_base.cli list
python -m hello_datap_component_base.cli validate <config_path>
python -m hello_datap_component_base.cli test <config_path>
```

### 启动服务

```bash
component_manager start <config_path> [--class-name <class_name>]
# 或
python -m hello_datap_component_base.cli start <config_path> [--class-name <class_name>]
```

- `config_path`: 配置文件路径（本地文件或HTTP URL）
- `--class-name, -c`: 指定要使用的服务类名（可选）

### 列出服务类

```bash
component_manager list [--json]
# 或
python -m hello_datap_component_base.cli list [--json]
```

列出所有可用的服务类。

### 验证配置

```bash
component_manager validate <config_path>
# 或
python -m hello_datap_component_base.cli validate <config_path>
```

验证配置文件的有效性（支持本地文件或HTTP URL）。

### 测试服务

```bash
component_manager test <config_path> [data] [--file <file>]
# 或
python -m hello_datap_component_base.cli test <config_path> [data] [--file <file>]
```

测试服务功能。

## 使用示例

查看 `example_service.py` 了解完整的使用示例。

运行示例：

```bash
# 直接运行示例代码
python example_service.py

# 或使用配置文件启动（方式1：直接命令）
component_manager start example_config.json

# 或使用配置文件启动（方式2：Python 模块方式，推荐）
python -m hello_datap_component_base.cli start example_config.json
```

## 项目结构

```
hello-datap-component-base/
├── hello_datap_component_base/
│   ├── __init__.py          # 模块导出
│   ├── base.py              # 基础服务类
│   ├── config.py            # 配置管理
│   ├── runner.py            # 服务运行器
│   ├── cli.py               # 命令行工具
│   ├── discover.py          # 服务发现
│   ├── logger.py            # 日志管理
│   └── mns_client.py        # MNS 队列客户端
├── example_service.py        # 使用示例
├── example_config.json      # 示例配置
├── README.md                # 本文档
├── USER_GUIDE.md            # 用户介入手册
├── QUICKSTART.md            # 快速开始指南
└── pyproject.toml           # 项目配置
```

## 开发指南

### 创建自定义服务

1. 继承 `BaseService` 类
2. 实现 `process` 方法
3. （可选）重写 `pre_process`、`post_process` 方法

### 日志使用

基类提供了两种使用日志的方式：

#### 方式1：使用全局 logger（推荐）

直接导入全局 logger，使用简单方便：

```python
from hello_datap_component_base import logger

# 在服务初始化后，全局 logger 会自动包含服务名称和版本信息
logger.info("处理请求", extra={"data": data})
logger.error("处理失败", extra={"error": str(e)})
```

#### 方式2：使用服务实例的 logger

在服务类内部，可以使用 `self.logger`：

```python
class YourService(BaseService):
    async def process(self, data: dict) -> dict:
        self.logger.info("处理请求", extra={"data": data})
        return {"result": "success"}
```

**注意**：
- 所有日志都会自动包含服务名称和版本信息
- 日志系统兼容Ray分布式计算框架，在Ray环境中日志会输出到stdout/stderr，Ray会自动收集
- 在非Ray环境中，日志会同时输出到控制台和文件
- 推荐使用全局 logger，可以在服务类外部（如工具函数、辅助模块）使用

### 环境变量

通过配置文件中的 `runtime_env.env_vars` 设置环境变量，这些变量会在服务初始化时自动设置。

### 动态安装依赖包

通过配置文件中的 `runtime_env.pip` 指定需要安装的Python包，服务启动前会自动安装：

```json
{
  "runtime_env": {
    "pip": ["emoji==2.15.0", "requests>=2.25.0"]
  }
}
```

**注意**：如果服务类文件在导入时需要这些包（如 `import emoji`），建议将导入改为可选，避免 `list` 命令失败。

### MNS 队列集成

如果配置了 MNS 相关环境变量，处理结果会自动发送到阿里云 MNS 队列：

**配置方式：**
在配置文件的 `runtime_env.env_vars` 中设置：
```json
{
  "runtime_env": {
    "env_vars": {
      "MNS_ENDPOINT": "https://123456789.mns.cn-shanghai.aliyuncs.com",
      "MNS_ACCESS_KEY_ID": "your-access-key-id",
      "MNS_ACCESS_KEY_SECRET": "your-access-key-secret",
      "MNS_QUEUE_NAME": "aiinfra-data-process-component-result-queue"
    }
  }
}
```

**功能特性：**
- ✅ 自动发送处理结果到队列（正常和异常情况）
- ✅ 结果格式统一封装（包含 work_flow_id、work_flow_instance_id、task_id）
- ✅ 如果 MNS 未配置或发送失败，不会影响主流程，只记录警告日志
- ✅ **自动重试机制**：网络异常时自动重试，确保消息发送成功
  - 默认重试 3 次，使用指数退避策略（1秒、2秒、4秒）
  - 仅对网络异常进行重试（如连接重置、超时等）
  - 可通过环境变量自定义重试参数

## 常见问题

**Q: 如何支持HTTP远程配置？**

A: 支持三种方式：

**方式1：直接使用 URL（简单 URL）**
```bash
component_manager start http://example.com/config.json
```

**方式2：使用 base64 编码的 URL（推荐，避免 shell 解析问题）**
```bash
# 先对 URL 进行 base64 编码
python3 -c "import base64; print(base64.b64encode('https://example.com/config.json?key=value&token=abc'.encode()).decode())"

# 使用编码后的字符串（不需要引号，避免 shell 解析问题）
component_manager start aHR0cHM6Ly9leGFtcGxlLmNvbS9jb25maWcuanNvbj9rZXk9dmFsdWUm dG9rZW49YWJj
```

**方式3：使用引号包裹 URL（包含特殊字符时）**
```bash
# URL 包含特殊字符（必须用引号）
component_manager start "https://example.com/config.json?key=value&token=abc"
# 或使用单引号
component_manager start 'https://example.com/config.json?key=value&token=abc'
```

**推荐使用 base64 编码方式**，因为：
- 避免 shell 解析问题（不需要引号）
- 避免特殊字符转义
- 更安全可靠

**Q: HTTPS 远程配置报 SSL 证书验证失败？**

A: 如果使用的是内部服务（如 OSS、内网服务），SSL 证书可能无法验证。可以通过环境变量跳过 SSL 验证：

```bash
# 临时设置（仅当前会话有效）
export SKIP_SSL_VERIFY=true
component_manager start 'https://example.com/config.json'

# 或使用 base64 编码的 URL（推荐，避免引号问题）
export SKIP_SSL_VERIFY=true
component_manager start aHR0cHM6Ly9leGFtcGxlLmNvbS9jb25maWcuanNvbg==

# 或使用 Python 模块方式
SKIP_SSL_VERIFY=true python -m hello_datap_component_base.cli start 'https://example.com/config.json'
```

**注意**：
- 仅在内部服务或测试环境使用 `SKIP_SSL_VERIFY=true`
- 生产环境建议安装正确的 CA 证书
- 如果安全要求不高，可以使用 HTTP 代替 HTTPS
- 使用 base64 编码的 URL 可以避免引号和特殊字符问题

**Q: 服务可以独立运行吗？**

A: 是的，服务可以独立运行，不依赖任何集群环境。可以直接在代码中创建服务实例并调用。

**Q: 如何处理错误？**

A: 在 `process` 方法中抛出异常，框架会自动记录错误日志。你也可以在 `pre_process` 或 `post_process` 中进行错误处理。

**Q: 输入数据从哪里来？**

A: 输入数据来自配置文件的 `params` 字段，整个 `params` 字典会作为输入数据传递给 `process` 方法。示例：
```json
{
  "params": {
    "field1": "value1",
    "field2": 100
  }
}
```

**Q: 如何动态安装Python包？**

A: 在配置文件的 `runtime_env.pip` 中指定需要安装的包列表，服务启动前会自动安装。示例：
```json
{
  "runtime_env": {
    "pip": ["emoji==2.15.0", "requests>=2.25.0"]
  }
}
```

**Q: 如何配置 MNS 队列发送结果？**

A: 在配置文件的 `runtime_env.env_vars` 中设置 MNS 相关环境变量：
```json
{
  "runtime_env": {
    "env_vars": {
      "MNS_ENDPOINT": "https://123456789.mns.cn-shanghai.aliyuncs.com",
      "MNS_ACCESS_KEY_ID": "your-access-key-id",
      "MNS_ACCESS_KEY_SECRET": "your-access-key-secret",
      "MNS_QUEUE_NAME": "aiinfra-data-process-component-result-queue"
    }
  }
}
```

配置后，处理结果会自动发送到队列。如果未配置或发送失败，不会影响主流程。

**Q: 返回结果格式是什么？**

A: `handle_request` 方法会自动封装返回结果：
- 正常情况：`code=0, message="success", data.out_put=用户程序返回结果`
- 异常情况：`code=-1, message=错误消息, data.out_put=null`
- 所有结果都包含 `work_flow_id`、`work_flow_instance_id`、`task_id`

## 许可证

MIT License

## 作者

zhaohaidong (zhaohaidong389@hellobike.com)

