Metadata-Version: 2.3
Name: duowen-workflow
Version: 0.1.0
Summary: 
Author: liurui
Author-email: liurui@asiainfo.com
Requires-Python: >=3.11
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: jinja2 (>=3.1.6)
Requires-Dist: pydantic (>=2.10.6)
Description-Content-Type: text/markdown

# Duowen Workflow Engine
Duowen Workflow Engine 是一个强大的Python工作流引擎，专为构建和执行复杂业务流程而设计。它提供了节点化执行、变量管理、状态跟踪等功能，支持自定义节点扩展，适用于自动化任务、数据处理、决策引擎等多种场景。

## 核心特性

- **可视化工作流**：通过节点连接构建复杂业务流程
- **变量管理系统**：支持多种数据类型和线程安全操作
- **执行跟踪**：完整记录工作流执行过程
- **插件系统**：支持自定义节点扩展
- **表达式引擎**：支持Python表达式和Jinja模板
- **类型安全**：基于Pydantic的强类型验证
- **线程安全**：所有关键操作均使用锁保护

## 安装

```bash
pip install duowen_workflow
```

## 快速开始

### 定义工作流

```python
from duowen_workflow.entities import TapeSchema, BaseStep, VariablePool
from duowen_workflow.engine import EngineFlow

# 1. 定义工作流蓝图
tape = TapeSchema(
    tape_id="demo_workflow",
    tape_name="示例工作流",
    steps=[
        BaseStep(
            stepSeq="1",
            stepLabel="开始",
            stepInst="Start",
            stepConfig={
                "input": {
                    "vars_name": [
                        {"type": "str", "value": "username"}
                    ]
                }
            }
        ),
        BaseStep(
            stepSeq="2",
            stepLabel="打印欢迎",
            stepInst="Print",
            preStepId="1",
            stepConfig={
                "input": {
                    "text": {
                        "type": "jinja",
                        "value": "你好, {{ username }}!"
                    }
                }
            }
        )
    ]
)

# 2. 初始化变量池
variables = VariablePool({"username": "张三"})

# 3. 执行工作流
engine = EngineFlow(tape_schema=tape)
result = engine.run(
    session_id="session_20240617",
    variable_pool=variables
)

# 4. 查看执行轨迹
for trace_item in engine.trace.trace_status.deque:
    print(trace_item.to_dict())
```

## 核心概念

### 工作流蓝图 (TapeSchema)
定义工作流的整体结构：
```python
from duowen_workflow.entities import TapeSchema, BaseStep

tape = TapeSchema(
    tape_id="order_processing",
    tape_name="订单处理工作流",
    steps=[
        BaseStep(
            stepSeq="1",
            stepLabel="开始",
            stepInst="Start"
        ),
        # 更多步骤...
    ]
)
```

### 变量池 (VariablePool)
线程安全的变量存储容器：
```python
from duowen_workflow.entities import VariablePool

# 初始化变量池
variables = VariablePool({
    "user_id": 12345,
    "order_items": [/*...*/]
})

# 添加变量
variables.append_variable("discount_rate", 0.9)

# 获取变量
user_id = variables.get_variable_value("user_id")
```

### 节点系统
工作流的基本执行单元：

#### 内置节点
- **Start**: 工作流起始节点，验证输入变量
- **Print**: 打印变量内容
- **If**: 条件分支节点
- **For**: 循环节点（开发中）
- **CodeExec**: 执行Python代码

#### 使用内置节点
```python
from duowen_workflow.nodes.builtin import Print
from duowen_workflow.entities import JinjaInput

printer = Print(
    input=PrintInput(
        text=JinjaInput(value="用户ID: {{ user_id }}")
    )
)
```

### 跟踪系统
完整记录工作流执行过程：
```python
{
    "tape_id": "order_processing",
    "stepSeq": "2",
    "stepLabel": "验证订单",
    "node_exec_id": "a1b2c3d4e5",
    "type": "in",  # 节点开始执行
    "data": {
        "order_id": "ORD20240617",
        "user_id": 12345
    },
    "date": 1718611200000
}
```

## 自定义节点开发

### 创建自定义节点
```python
from duowen_workflow.entities import NodeBase, VariablePool, Trace
from pydantic import BaseModel
import uuid

class ValidatorInput(BaseModel):
    input_param: str

class CustomValidator(NodeBase):
    input: ValidatorInput

    def run(self, variable_pool: VariablePool, stepSeq: str, trace: Trace, **kwargs):
        node_exec_id = f"{stepSeq}_{uuid.uuid4().hex[:8]}"
        
        # 记录节点开始
        trace.add_trace_in(stepSeq=stepSeq, node_exec_id=node_exec_id)
        
        try:
            # 获取输入参数
            param = variable_pool.get_variable_value(self.input.input_param)
            
            # 执行业务逻辑
            result = self.validate(param)
            
            # 存储结果
            variable_pool.append_variable("validation_result", result)
            
            # 记录节点完成
            trace.add_trace_out(stepSeq=stepSeq, node_exec_id=node_exec_id)
            return result
        except Exception as e:
            trace.add_trace_log(stepSeq=stepSeq, node_exec_id=node_exec_id, 
                                error=str(e), level="error")
            raise
    
    def validate(self, data):
        """自定义验证逻辑"""
        return len(data) > 5
```

### 注册自定义节点
1. 设置环境变量指定插件目录：
```bash
export WORKFLOW_NODES_PLUGINS_DIR=/path/to/plugins
```

2. 在插件目录创建节点文件（如`custom_nodes.py`）

## 表达式引擎

### 支持表达式类型

1. **Jinja模板**：
   ```python
   JinjaInput(value="欢迎, {{ user.name }}! 您的余额是{{ user.balance }}")
   ```

2. **Python表达式**：
   ```python
   ExprInput(value="total_price * discount_rate if is_vip else total_price")
   ```

3. **变量引用**：
   ```python
   VarsInput(value="order_items")
   ```

### 使用示例
```python
from duowen_workflow.utils import execute_expression
from duowen_workflow.entities import JinjaInput, ExprInput

# 渲染Jinja模板
welcome_msg = execute_expression(
    JinjaInput(value="欢迎, {{ user.name }}!"),
    variable_pool
)

# 执行Python表达式
total_price = execute_expression(
    ExprInput(value="sum(item['price'] for item in order_items"),
    variable_pool
)
```

## 高级用法

### 变量操作指南

```python
# 添加基本类型变量
variables.append_variable("username", "john_doe")

# 添加列表
variables.append_variable("scores", [85, 92, 78])

# 追加列表元素
variables.append_to_variable("scores", 95)

# 存储Pydantic对象
variables.append_obj_variable("user_profile", user_model)

# 读取Pydantic对象
profile = variables.get_obj_variable_value("user_profile", UserProfile)
```

### 条件分支示例
```python
# If节点配置
stepConfig={
    "input": {
        "cond": [
            {
                "next_id": "4",
                "expr": {
                    "type": "expr",
                    "value": "user['is_vip']"
                }
            },
            {
                "next_id": "5",
                "expr": {
                    "type": "expr",
                    "value": "not user['is_vip']"
                }
            }
        ]
    }
}
```

### 代码执行节点
```python
stepConfig={
    "input": {
        "code_text": {
            "type": "str",
            "value": """
def main(a, b):
    return a + b
"""
        }
    },
    "output": {
        "result1": {"name": "sum_result"}  # 输出变量名
    }
}
```

## 错误处理

工作流引擎定义了以下异常类型：

1. **WorkFlowExecError**：工作流执行异常
2. **VarNameError**：非法变量名异常
3. **ValidationError**：输入参数验证失败

## 最佳实践

1. **命名规范**：
   - 变量名使用蛇形命名法（snake_case）
   - 节点类名使用驼峰命名法（CamelCase）
   - 步骤ID使用简单字符串（"step1", "validate"）

2. **错误处理**：
   - 在节点中使用try-except捕获异常
   - 通过trace.add_trace_log记录错误详情
   - 对关键操作添加重试机制

3. **性能优化**：
   - 避免在循环中频繁访问变量池
   - 对大文件使用流式处理
   - 对数据库查询添加缓存机制

