Metadata-Version: 2.4
Name: rainbow-rb-flow-manager
Version: 0.0.9.dev14
Summary: Rainbow Python Flow Manager
Author-email: Derek <dfd1123@rainbow-robotics.com>
Requires-Python: <3.13,>=3.12
Description-Content-Type: text/markdown
Requires-Dist: rainbow-rb-utils==0.0.9.dev14
Requires-Dist: rainbow-rb-zenoh==0.0.9.dev14
Requires-Dist: rainbow-rb-flat-buffers==0.0.9.dev14
Requires-Dist: rainbow-rb-schemas==0.0.9.dev14
Requires-Dist: rainbow-rb-sdk==0.0.9.dev14
Requires-Dist: rainbow-rb-log==0.0.9.dev14

# rb_flow_manager 사용 설명서

`rb_flow_manager`는 `Step` 트리를 멀티프로세스로 실행하고, 실행 상태/일시정지/재개/중지 제어를 제공하는 워크플로우 엔진입니다.

## 1. 핵심 구성요소

- `Step` 계열 (`step.py`)
  - 기본: `Step`
  - 제어 구조: `FolderStep`, `RepeatStep`, `ConditionStep`, `BreakStep`, `JumpToStep`
  - 고급: `SubTaskStep`, `CallEventStep`, `SyncStep`
- 실행기: `ScriptExecutor` (`executor.py`)
  - 다중 프로세스 실행
  - 상태 조회/제어 API 제공
- 컨트롤러 인터페이스: `BaseController`
  - 실행 이벤트 훅(`on_start`, `on_pause`, `on_error` 등) 구현 가능
- 기본 컨트롤러 구현 예시: `Zenoh_Controller`

## 2. 최소 실행 예제

```python
from rb_flow_manager.step import Step
from rb_flow_manager.executor import ScriptExecutor
from rb_flow_manager.schema import MakeProcessArgs

def hello_step():
    print("hello step")

root = Step(
    step_id="root",
    name="Root",
    children=[
        Step(step_id="s1", name="Hello", func=hello_step),
    ],
)

executor = ScriptExecutor()

args: MakeProcessArgs = {
    "process_id": "script_1",
    "step": root,
    "repeat_count": 1,
    "robot_model": "C500920",
    "category": "manipulate",
    "step_mode": False,
    "min_step_interval": 0.0,
    "is_ui_execution": False,
    "post_tree": None,
    "parent_process_id": None,
    "event_sub_tree_list": [],
}

executor.start(args)
executor.wait_all()
print(executor.get_all_states())
```

## 3. Step 작성 방식

`Step`는 두 방식으로 함수 실행을 지원합니다.

- 직접 함수 연결
  - `func=<callable>`
- 문자열 함수명 방식
  - `func_name="rb_manipulate_sdk.point"`
  - 런타임에 import/호출

또한 `args`를 통해 스텝 인자를 전달하고, `children`으로 트리를 구성합니다.

## 4. 실행 제어 API

`ScriptExecutor` 주요 메서드:

- `start(args: MakeProcessArgs | list[MakeProcessArgs]) -> bool`
- `pause(process_id: str) -> bool`
- `resume(process_id: str) -> bool`
- `stop(process_id: str) -> bool`
- `wait_all(timeout: float | None = None) -> bool`
- `get_all_states() -> dict[str, dict]`

## 5. 상태 모델

`RB_Flow_Manager_ProgramState`:

- `IDLE`
- `RUNNING`
- `PAUSED`
- `WAITING`
- `STOPPED`
- `POST_START`
- `COMPLETED`
- `ERROR`

각 프로세스별 상세 실행 상태는 `ExecutorState` TypedDict 구조로 관리됩니다.

## 6. 컨트롤러 연동

커스텀 컨트롤러를 사용하려면 `BaseController`를 구현한 뒤 `ScriptExecutor(controller=...)`로 전달합니다.

```python
from rb_flow_manager.controller.base_controller import BaseController

class MyController(BaseController):
    def on_init(self, state_dicts): ...
    def on_start(self, task_id): ...
    def on_stop(self, task_id, step_id): ...
    def on_pause(self, task_id, step_id): ...
    def on_wait(self, task_id, step_id): ...
    def on_resume(self, task_id, step_id): ...
    def on_next(self, task_id, step_id): ...
    def on_sub_task_start(self, task_id, sub_task_id, sub_task_type): ...
    def on_sub_task_done(self, task_id, sub_task_id, sub_task_type): ...
    def on_done(self, task_id, step_id): ...
    def on_error(self, task_id, step_id, error): ...
    def on_post_start(self, task_id): ...
    def on_complete(self, task_id): ...
    def on_close(self): ...
    def on_all_complete(self): ...
    def on_all_stop(self): ...
    def on_all_pause(self): ...
    def on_all_wait(self): ...
    def on_all_resume(self): ...
```

## 7. 실무 팁

- `process_id`는 서비스 내에서 유일하게 관리하세요.
- 긴 실행 트리는 `min_step_interval`을 사용해 과도한 호출을 방지하세요.
- 다중 트리 병렬 실행 시 `start([...])` 형태를 사용하면 일괄 시작 제어가 쉽습니다.
