Metadata-Version: 2.4
Name: smartinno
Version: 1.0.5
Summary: Unified Redis Python SDK for the Safari Pro architecture.
Author: Safari Pro Engineering
License: MIT
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: redis>=5.0.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: pulsar-client>=3.0.0
Requires-Dist: temporalio>=1.0.0
Requires-Dist: requests>=2.31.0
Requires-Dist: pydantic>=2.0.0
Dynamic: author
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: license
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# Enterprise Messaging broker SDK Master API Reference

Welcome to the **Enterprise Messaging broker SDK Master API Reference**. This guide is designed to help developers—both new and experienced—understand, configure, and operate the core infrastructure of the Safari Pro Super App. 

This SDK unifies **Temporal Orchestration**, **Enterprise Custom Redis Connectivity**, **Resilient Messaging (Pulsar/Redis Failover)**, and **Observability Monitoring** into a single, high-performance toolkit with built-in failover capabilities.

---

## 📐 Core SDK Architecture Overview

The Safari Pro SDK suite provides a high-reliability abstraction layer over databases, message buses, and workflow systems. It simplifies building microservices by exposing unified clients with built-in resiliency patterns.

```mermaid
graph TD
    User[Application Code] --> tc[TemporalClient]
    User --> rc[RedisClientFactory]
    User --> mc[MessageClient]
    User --> mon[Observability Monitors]
    
    tc -->|Orchestration| TempServer[Temporal Cluster]
    rc -->|Key-Value / Streams| RedisCluster[Redis Sentinel / Cluster]
    mc -->|Resilient Messaging| PubSub[Pulsar / Redis PubSub]
```

---

## 🛠️ Complete Enums & Configurations Guide

### 1. Architectural & Routing Enums

#### `smartinno.redis_custom.config.Architecture`
Defines the backend deployment topology for Redis connections.
*   `SENTINEL` ("sentinel"): High-Availability Sentinel setup with active-passive replication and automated master failover.
*   `CLUSTER` ("cluster"): Cryptographically sharded cluster distributing keys across 16,384 slots over multiple master nodes.
*   `HYBRID` ("hybrid"): Routes through a Predixy/Envoy proxy gateway to abstract cluster topology away from clients.
*   `SMART_ROUTER` ("smart_router"): Meta-architecture routing commands dynamically to different Redis topologies.

#### `smartinno.redis_custom.config.UseCase`
Guides the factory to auto-select the optimal Redis architecture for a given workload.
*   `HIGH_CONCURRENCY`: Maps to `CLUSTER` (Ideal for massive horizontal read/write scale).
*   `HEAVY_TRANSACTIONS`: Maps to `SENTINEL` (Ideal for multi-key pipelines and transactions).
*   `MICROSERVICES`: Maps to `HYBRID` (Ideal for polyglot systems and proxy gateways).
*   `AUTO`: Maps to `SMART_ROUTER` (Dynamically selects based on payload structure).

#### `smartinno.temporal.client.Frequency`
Defines scheduling intervals for recurring cron orchestrations.
*   `ONCE` ("once"): Fires exactly once.
*   `REPEATED` ("repeated"): Fires every minute.
*   `HOURLY` ("hourly"): Fires at the start of every hour.
*   `DAILY` ("daily"): Fires at midnight every day.
*   `WEEKLY` ("weekly"): Fires at midnight every Sunday.
*   `MONTHLY` ("monthly"): Fires at midnight on the first day of every month.

#### `smartinno.redis_custom.smart_router.DataSensitivity`
Categorizes data payloads based on PII/critical parameters to dictate caching TTL policies.
*   `CRITICAL` (Matches `passport`, `visa`): Bypasses persistent storage; cached for 60 seconds max.
*   `HIGH` (Matches `password`, `payment`, `card`, `cvv`, `secret`, `ssn`, `token`): Cached for 300 seconds.
*   `MEDIUM` (Matches `email`, `phone`, `user_name`): Standard caching rules apply.
*   `LOW`: Default category for standard operational logs or cache elements.

---

## 🔑 Redis Key Generation & Data Serialization Envelopes

To prevent key collision and ensure clean data parsing across heterogeneous systems (e.g. Go, Python, and Node.js microservices), the SDK enforces standardized key structures and JSON packaging envelopes.

### 1. Standardized Key Formatting Pattern
All keys written to Redis are transparently ran through the `BaseRedisAdapter._format_key()` handler. This prefixes keys according to a strict namespace convention:

$$\text{redis\_key} = \text{service\_name} : \text{action} : \text{original\_key}$$

*   **Parameters**:
    *   `service_name` (`str`, default `"safari_pro"`): Declared in `RedisConfig`. Prevents cross-app database contamination.
    *   `action` (`str`, default `"data"`): Scopes key usage (e.g., `data`, `cache`, `stream`, `queue`).
*   **Example Generation**:
    *   Original Key: `"booking:8848"`
    *   Generated Key: `safari_pro:data:booking:8848`

### 2. Cache Data envelope Serialization
For caching payloads (strings, hashes, simple JSON targets), keys are packed into a typed JSON envelope containing contextual metadata.

```json
{
    "request_id": "4020a6e3-fcf3-40e1-b4f6-6cbe702e5b85",
    "event": "update",
    "type": "cache",
    "service": "safari_pro",
    "tenant_name": null,
    "payload": {
        "booking_id": "BK-9989",
        "total_amount": 1500.0
    },
    "timestamp": "2026-06-29T10:24:18.123456"
}
```

*   **Envelope Fields**:
    *   `request_id` (`str`): UUIDv4 tracking identifier.
    *   `event` (`str`): Action trigger description (e.g., `update`, `invalidate`).
    *   `type` (`str`): Target cache category type.
    *   `service` (`str`): Originating application namespace name.
    *   `tenant_name` (`Optional[str]`): Partition indicator for multi-tenancy.
    *   `payload` (`Any`): The actual payload data structure.
    *   `timestamp` (`str`): UTC timestamp of serialization in ISO-8601 formatting.

*   *Note: Read operations automatically deserialize the envelope, returning only the underlying `payload` value.*

### 3. Messaging Emergency Fallback Envelopes
When circuit breakers trip or connection drops occur, message payloads are wrapped with failover metadata.

#### Redis Emergency Fallback
Stranded messages waiting for Pulsar recovery are written to a Redis Stream with the key pattern:
$$\text{key} = \text{emergency-fallback} : \{topic\}$$

*   **Envelope Schema**:
    ```json
    {
        "origin_topic": "booking-requests",
        "failed_at": 1782728658.452,
        "data": { ... }
    }
    ```

#### Pulsar Emergency Fallback
When Redis is down and fallback is routed directly to Pulsar, messages are packaged with origin metadata:
*   **Target Topic**: `persistent://public/default/emergency-fallback-redis-{topic}`
*   **Envelope Schema**:
    ```json
    {
        "origin": "booking-requests",
        "time": 1782728658.452,
        "data": { ... }
    }
    ```

---

## ⏳ 1. Temporal Orchestration SDK

### `TemporalConfig`
A configuration dataclass used to initialize the `TemporalClient`.

| Field Name | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| `host` | `str` | `"localhost"` | Endpoint hostname of the Temporal frontend service. |
| `port` | `int` | `7233` | Endpoint port of the Temporal frontend service. |
| `namespace` | `str` | `"default"` | Target namespace partition. |
| `ssl` | `bool` | `False` | Set to `True` to enable secure TLS communication. |
| `client_cert_path` | `Optional[str]` | `None` | Absolute path to the client certificate (.crt/.pem). |
| `client_key_path` | `Optional[str]` | `None` | Absolute path to the client private key (.key). |
| `auto_start_worker`| `bool` | `False` | Automatically boots background worker thread on execution. |
| `queues` | `List[str]` | `["default"]` | List of task queues the background worker monitors. |
| `mock_mode` | `Optional[bool]`| `None` | Explicit override. If omitted, mode is auto-detected from environment variables. |

### `@tc.workflow` (Alias: `@tc.process`)
Marks an async function as a stateful workflow process coordinator.

*   **Parameters**:
    *   `queue` (`str`, default `"default"`): The task queue where this workflow registration and its tasks reside.
*   **Attributes Attached to Wrapper**:
    *   `.delay(*args, **kwargs)`: Triggers the workflow asynchronously, returning a `TaskHandle`.
    *   `.workflow_name`: Registered string identifier (the decorated function's name).
    *   `._queue`: Target queue.

```python
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Orchestration logic goes here...
    return {"status": "success"}
```

### `@tc.task` (Alias: `@tc.step`)
Marks a function (sync or async) as an activity step. Activity steps execute real network operations, queries, or API calls.

*   **Parameters**:
    *   `queue` (`str`, default `"default"`): Task queue where this step executes.
    *   `retry_attempts` (`Optional[int]`, default `None`): Maximum times to retry if an exception occurs.
    *   `backoff_seconds` (`Optional[int]`, default `None`): Initial backoff delay (seconds) before retrying.
    *   `fallback` (`Optional[Callable]`, default `None`): Backup function run if all retries fail.
    *   `timeout` (`int`, default `300`): Maximum time allowed for a single run in seconds.
    *   `heartbeat_timeout` (`Optional[int]`, default `None`): Time limit between activity status heartbeats.

```python
def log_failed_payment(booking_id: str, amount: float):
    print(f"Payment failed for {booking_id} after retries.")
    return {"status": "payment_failed_logged"}

@tc.step(
    queue="billing-queue",
    retry_attempts=3,
    backoff_seconds=5,
    fallback=log_failed_payment,
    timeout=60
)
def process_payment(booking_id: str, amount: float):
    # Charge gateway API call...
    return {"txn_id": "TXN-8848"}
```

### `tc.execute_step(step_func, *args, **kwargs)`
Runs an activity step durably within the workflow context.

*   **Parameters**:
    *   `step_func` (`Callable`): The decorated `@tc.step` function to execute.
    *   `*args`: Positional arguments forwarded to `step_func`.
    *   `**kwargs`: Keyword arguments forwarded to `step_func`.
*   **Returns**: `Any` return value from `step_func`.

```python
result = await tc.execute_step(process_payment, "booking-id-123", 450.0)
```

### `tc.execute_local_step(step_func, *args, **kwargs)`
Runs an activity step locally inside the worker process running the workflow. Bypasses the Temporal server queues.

*   **Parameters**:
    *   `step_func` (`Callable`): Decorated `@tc.step` function.
    *   `*args` / `**kwargs`: Forwarded arguments.
    *   `timeout` (`int`, default `5`): Maximum local timeout in seconds.
    *   `retry_attempts` (`int`, default `3`): Maximum retries.
*   **Returns**: `Any` return value from `step_func`.

```python
is_valid = await tc.execute_local_step(check_promo_validity, "SAFARI10", 450.0)
```

### `tc.execute_in_parallel(*calls)` (Alias: `fan_out`)
Executes multiple step functions concurrently (Fan-Out/Fan-In).

*   **Parameters**:
    *   `*calls` (`tuple`): Varargs of tuples. Each tuple is formatted as:
        1. `step_func` (`Callable`): The `@tc.step` function to run.
        2. `args_list` (`list`): List of positional arguments.
        3. `kwargs_dict` (`dict`, optional): Dict of keyword arguments.
*   **Returns**: `list` containing results in corresponding order.

```python
results = await tc.execute_in_parallel(
    (search_hotels, ["Arusha"]),
    (search_flights, ["JRO"])
)
hotels, flights = results[0], results[1]
```

### `tc.execute_in_series(*calls)`
Executes multiple step functions sequentially.

*   **Parameters**: Same format as `execute_in_parallel`.
*   **Returns**: `list` containing results in order of completion.

```python
results = await tc.execute_in_series(
    (create_invoice, ["book-99"]),
    (send_receipt, ["book-99"])
)
```

### `tc.start_process(process_name, queue, *args, **kwargs)`
Launches a workflow process asynchronously and returns a `TaskHandle`.

*   **Parameters**:
    *   `process_name` (`str`): Name of the `@tc.process` workflow.
    *   `queue` (`str`): Target queue.
    *   `*args` / `**kwargs`: Inputs forwarded to the workflow.
*   **Returns**: `TaskHandle` to query or await workflow status and results.

```python
handle = tc.start_process("safari_booking_workflow", "booking-queue", {"tour_id": "T1"})
```

### `TaskHandle` Methods
*   `get(timeout: Optional[float] = None) -> Any`: Blocks until completion and returns workflow output.
*   `cancel() -> None`: Sends a cancellation request to the workflow.
*   `then(next_process_func: Callable, queue: Optional[str] = None, *args, **kwargs) -> TaskHandle`: Chains another process that starts automatically with the current workflow's result.
*   `status` (`str`): Returns `"RUNNING"`, `"COMPLETED"`, or `"CANCELLED"`.

```python
result = handle.get(timeout=60.0)
```

### `tc.set_state(process_id, key, value)`
Signals a running workflow to update its state dict out-of-band.

*   **Parameters**:
    *   `process_id` (`str`): The workflow's ID.
    *   `key` (`str`): State key name.
    *   `value` (`Any`): Value to set.

```python
tc.set_state(process_id, "guide_assigned", "Alex Kibona")
```

### `tc.get_state(process_id, key) -> Any`
Queries a running workflow for a state value.

```python
guide = tc.get_state(process_id, "guide_assigned")
```

### `tc.side_effect(func, *args) -> Any`
Safely records non-deterministic operations (UUIDs, timestamps, database lookups) into workflow history.

*   **Parameters**:
    *   `func` (`Callable`): A function returning a non-deterministic value.
    *   `*args`: Positional arguments passed to `func`.
*   **Returns**: The result of `func(*args)`.

```python
txn_ref = tc.side_effect(lambda: f"REF-{uuid.uuid4().hex[:8].upper()}")
```

### `tc.continue_process_as_new(process_func, *args, **kwargs)`
Restarts the active workflow with a clean history to prevent unbounded log growth.

```python
tc.continue_process_as_new(safari_booking_workflow, new_payload)
```

### Saga Transaction Coordinator
Rolls back actions in Last-In-First-Out (LIFO) order if an error is encountered.

```python
from smartinno.temporal.client import Saga

@tc.process(queue="booking-queue")
async def booking_saga_workflow(payload: dict):
    saga = Saga(tc)
    try:
        # Step 1: Reserve Hotel
        hotel = await tc.execute_step(reserve_hotel, payload["hotel_id"])
        saga.add_compensation(release_hotel_reservation, hotel["reservation_id"])
        
        # Step 2: Book Tour (might fail)
        await tc.execute_step(book_safari_tour, payload["tour_id"])
        
    except Exception as e:
        await saga.compensate()
        raise e
```

### Dynamic Queue Routing (`tc.route_step`)
Routes steps dynamically to specific queues at runtime.

*   **Parameters**:
    *   `step_func` (`Callable`): `@tc.step` function to execute.
    *   `router` (`Callable[..., str]`): Function returning the target queue string.
    *   `*args` / `**kwargs`: Arguments passed to `step_func` and `router`.

```python
def select_billing_queue(amount: float):
    return "high-value-payments" if amount >= 10000 else "standard-payments"

await tc.route_step(process_payment, select_billing_queue, amount=12500)
```

### Namespace Management APIs
*   `create_namespace(namespace_name: str, retention_days: int = 3) -> bool`
*   `describe_namespace(namespace_name: str) -> dict`
*   `update_namespace(namespace_name: str, retention_days: Optional[int] = None, description: Optional[str] = None, owner_email: Optional[str] = None) -> bool`
*   `delete_namespace(namespace_name: str) -> bool`

```python
tc.create_namespace("tenant-serengeti-lodges", retention_days=5)
```

### Scheduler (`tc.create_schedule`)
*   `schedule_id` (`str`): Unique identifier for the scheduled task.
*   `process_func` (`Callable`): Workflow process to launch.
*   `frequency` (`Frequency`): Pre-defined frequency (`ONCE`, `REPEATED`, `HOURLY`, `DAILY`, `WEEKLY`, `MONTHLY`).
*   `cron_expression` (`Optional[str]`): Raw cron string (e.g., `0 0 * * 1-5`).
*   `count` (`Optional[int]`): Maximum execution runs allowed before termination.
*   `interval_minutes` (`Optional[int]`): Run workflow at specific minute intervals.

```python
await tc.create_schedule(
    schedule_id="daily-reconciliation",
    process_func=reconciliation_workflow,
    frequency=Frequency.DAILY
)
```

---

## 2. 🔌 Custom Redis Connectivity SDK

Provides a unified interface `IRedisClient` with support for Sentinel, Cluster, Hybrid proxies (e.g., Predixy), and an auto-routing Intelligent Router.

### `RedisConfig`
Configuration settings for Redis instances. Full configuration properties:

| Parameter | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| `host` | `str` | `"localhost"` | Redis hostname or IP address. |
| `port` | `int` | `6379` | Redis port number. |
| `password` | `Optional[str]` | `None` | Password authentication string. |
| `db` | `int` | `0` | Redis logical database index. |
| `service_name` | `str` | `"safari_pro"` | Key prefix namespace prefix to prevent collisions. |
| `max_connections` | `int` | `100` | Maximum size of the client connection pool. |
| `socket_timeout` | `float` | `0.5` | Socket read/write timeout in seconds (500ms). |
| `socket_connect_timeout` | `float` | `0.2` | Initial socket connection timeout in seconds (200ms). |
| `socket_keepalive` | `bool` | `True` | Sends TCP keepalive probes to keep connection open. |
| `health_check_interval` | `int` | `30` | Liveness check interval in seconds. |
| `retry_on_timeout` | `bool` | `True` | Retries commands automatically when socket timeouts happen. |
| `retry_backoff` | `bool` | `True` | Enforce exponential backoff on retry operations. |
| `retry_backoff_retries` | `int` | `3` | Maximum retry attempts under backoff (Range: 3-5). |
| `retry_backoff_min` | `float` | `0.2` | Minimum backoff delay in seconds (200ms). |
| `retry_backoff_max` | `float` | `0.5` | Maximum backoff delay in seconds (500ms). |
| `handle_moved_redirects`| `bool` | `True` | Redirects `MOVED` slot errors transparently in Cluster mode. |
| `handle_ask_redirects` | `bool` | `True` | Redirects `ASK` slot migration errors in Cluster mode. |
| `slot_map_caching` | `bool` | `True` | Cache cluster slots locally to avoid routing hops. |
| `slot_refresh_automatic`| `bool` | `True` | Triggers slot map refreshes when node redirections fail. |
| `reinitialize_steps` | `int` | `5` | Redirection failures before performing full slot refreshes. |
| `failover_awareness` | `bool` | `True` | Monitors topology changes during master failover. |
| `read_from_replicas` | `bool` | `True` | Reads from replica nodes to distribute cluster read load. |
| `require_full_coverage` | `bool` | `False` | Continues execution even if slot slots are partially down. |
| `ssl` | `bool` | `False` | Enables TLS connection protection. |
| `ssl_cert_reqs` | `str` | `"required"` | Validation requirement for server certificate. |
| `ssl_keyfile` | `Optional[str]` | `None` | Path to client TLS private key file. |
| `ssl_certfile` | `Optional[str]` | `None` | Path to client TLS certificate file. |
| `ssl_ca_certs` | `Optional[str]` | `None` | Path to Certificate Authority bundle file. |
| `startup_nodes` | `Optional[List[dict]]`| `None` | Seeds list of Cluster startup nodes: `[{"host": "...", "port": 7001}]`. |
| `sentinel_nodes` | `Optional[List[tuple]]`| `None` | List of Sentinel endpoints: `[("host1", 26379), ("host2", 26379)]`. |
| `sentinel_master_name`| `Optional[str]` | `None` | Master group name monitored by Sentinel. |
| `sentinel_port` | `Optional[int]` | `None` | Custom Sentinel connection port override. |
| `hybrid_port` | `Optional[int]` | `None` | Custom Hybrid proxy connection port override. |
| `cluster_host_mapping` | `Optional[dict]` | `None` | NAT/Docker cluster node host map translation dictionary. |

### `RedisClientFactory`
The factory class used to resolve and instantiate specific client adapters matching the target architecture.

#### Method Signature:
```python
@staticmethod
def get_client(
    config: RedisConfig = None, 
    architecture: Architecture = None, 
    use_case: UseCase = None,
    connection: Any = None,
    sentinel_connection: Any = None,
    hybrid_connection: Any = None,
    native_connection: Any = None
) -> IRedisClient:
```

*   **Parameters**:
    *   `config` (`RedisConfig`): Custom connection configuration object.
    *   `architecture` (`Architecture`): Explicit architecture choice override (takes precedence).
    *   `use_case` (`UseCase`): Guiding workload type to automatically select target topology.
    *   `connection` / `sentinel_connection` / `hybrid_connection` / `native_connection` (`Any`): Optional pre-established connections to bypass client instantiations.
*   **Adapter Resolution Output**:
    *   `Architecture.SENTINEL` -> Returns `SentinelClientAdapter`
    *   `Architecture.CLUSTER` -> Returns `ClusterClientAdapter`
    *   `Architecture.HYBRID` -> Returns `HybridClientAdapter`
    *   `Architecture.SMART_ROUTER` -> Returns `IntelligentRouterAdapter` (holding nested config states for all adapters)

```python
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.redis_custom.factory import RedisClientFactory

# Retrieve high concurrency cluster client
redis_client = RedisClientFactory.get_client(
    config=RedisConfig(host="localhost", port=7001),
    use_case=UseCase.HIGH_CONCURRENCY
)
```

### Core Operations (`IRedisClient`)

*   `set(key: str, value: Any) -> bool`
*   `get(key: str) -> Any`
*   `mget(keys: List[str]) -> List[Any]`
*   `hset(name: str, key: str = None, value: str = None, mapping: dict = None, ttl: int = None) -> int`
*   `hget(name: str, key: str) -> Any`
*   `lpush(name: str, *values, ttl: int = None) -> int`
*   `rpop(name: str) -> Any`
*   `pipeline(transaction: bool = True, shard_key: str = None) -> Pipeline`

#### Multi-key Transaction Sharding
When using cluster architecture, transactions must route to the same slot using hash tags.

```python
# Shard key guarantees slots match in Redis Cluster
with redis_client.pipeline(transaction=True, shard_key="hotel:987") as pipe:
    pipe.set("hotel:987:name", "Serengeti Lodge")
    pipe.set("hotel:987:available_rooms", 12)
    pipe.execute()
```

### High Concurrency Redis Streams APIs

*   `xadd(name: str, fields: dict) -> str`: Appends a message to a stream.
*   `xread(streams: dict, count: int = None, block: int = None) -> List`: Reads entries from streams.
*   `xgroup_create(name: str, groupname: str, id: str = "$", mkstream: bool = False)`: Registers consumer groups.
*   `xreadgroup(groupname: str, consumername: str, streams: dict, count: int = None, block: int = None) -> List`: Reads streams inside a consumer group.
*   `xack(name: str, groupname: str, *ids)`: Acknowledges message receipt.

```python
# Publish to stream
msg_id = redis_client.xadd("safari-bookings", {"booking_id": "BK-99", "status": "pending"})

# Read group messages
entries = redis_client.xreadgroup("booking-group", "consumer-1", {"safari-bookings": ">"}, count=10)
```

---

## 3. 📡 Resilient Messaging Facade (Pulsar / Redis)

The messaging facade provides resilient event pub/sub. It supports dual-active routing, client-side circuit breakers, and automatic database-to-broker failover.

### `PulsarConfig`
Configuration parameters for Apache Pulsar.

| Field Name | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| `host` | `str` | `"localhost"` | Pulsar broker host. |
| `port` | `int` | `6650` | Pulsar broker port. |
| `connection_timeout_ms`| `int` | `5000` | Connect timeout (default 5s). |
| `operation_timeout_ms` | `int` | `10000` | Read/write timeout (default 10s). |

### `@msg.on(topic, strategy="load_balanced", retries=2)`
Subscribes a callback to a topic. Supports specific distribution strategies:

*   **Pulsar Subscription Types**:
    *   `broadcast`: Creates an exclusive subscription with a randomized ID so every node receives the message.
    *   `load_balanced`: Creates a shared subscription (`Shared`) to distribute work across available workers.
    *   `sticky`: Creates a key-shared subscription (`Key_Shared`) to route messages with matching partition keys to the same consumer.
    *   `failover`: Creates a failover subscription (`Failover`) with active-standby consumers.
*   **Redis Subsystem Adaptation**:
    *   `broadcast` routes through standard **PubSub**.
    *   `load_balanced` or `sticky` routes through **Redis Streams** consumer groups.
    *   `exclusive` routes through **Redis Lists** (`RPOP`/`LPUSH` queue patterns).

```python
@msg.on("payment-events", strategy="load_balanced")
def handle_payment(payload):
    print(f"Received payment notification: {payload}")
```

### `@msg.on_event(topic, queue="default")`
Wires an event topic to automatically start a Temporal workflow when a message arrives.

```python
# Event-driven trigger
@msg.on_event("booking-requests", queue="booking-queue")
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Workflow triggers automatically on message
    await tc.execute_step(confirm_booking, payload["booking_id"])
```

### `msg.smart_send(topic, payload, key=None)`
Publishes a message to a topic with automatic database-to-broker fallback. 

*   **Resiliency Workflow**:
    1. Attempts to publish to primary backend (e.g. **Redis Stream**).
    2. If the connection fails or throws errors, it trips the circuit breaker to `OPEN`.
    3. The message is automatically wrapped and published to the **Pulsar emergency fallback topic**: `persistent://public/default/emergency-fallback-redis-{topic}`.
    4. Safe recovery loops buffer stranded payloads until the primary server recovers.

```python
# Resilient send
msg.smart_send("booking-requests", {"booking_id": "BK-100", "user_id": "U5"})
```

### `msg.smart_listen(topic, callback, hint=None, criteria=None, explicit_type=None, block=True)`
Dynamically listens on a Redis or Pulsar topic. It automatically inspects the channel type (e.g. detecting if a Redis key is a Stream, List, or PubSub channel) and launches the corresponding listener loops.

*   **Parameters**:
    *   `topic` (`str`): Target channel to monitor.
    *   `callback` (`Callable`): Code executed on payload arrival.
    *   `hint` (`Any` / `explicit_type`): Used to guide target pattern selection.
    *   `block` (`bool`, default `True`): Blocks execution and polls continuously.

```python
# Dynamically resolves stream/list/pubsub loops
msg.smart_listen("booking-requests", process_incoming_booking, explicit_type="stream")
```

### `msg.smart_get(topic, hint=None, explicit_type=None, criteria=None)`
Intelligently queries Redis keys, dynamically casting results based on type detection.

*   **Supported Type Resolutions**:
    *   `string` / `cache` -> returned value parsed from JSON.
    *   `hash` -> returns all mapped fields via `hgetall`.
    *   `zset` / `geo` -> returns top score ranges via `zrange`.
    *   `stream` -> returns pending stream entries.

```python
booking_data = msg.smart_get("booking:987", explicit_type="hash")
```

### `msg.flush_emergency_fallbacks(topic, batch_size=100)`
Drains the Redis emergency fallback queues and replays stranded payloads back into the primary Pulsar cluster once connectivity has restored.

```python
# Recovers and replays stranded messages
recovered = msg.flush_emergency_fallbacks("booking-requests", batch_size=50)
print(f"Replayed {recovered} stranded messages.")
```

---

## 4. 📊 Observability & Monitoring SDK

Provides health checks, counters, and statistics from the different technological subsystems using the `MonitoringProvider` protocol.

```python
from typing import Dict, Any, Protocol

class MonitoringProvider(Protocol):
    def get_real_time_stats(self) -> Dict[str, Any]: ...
    def check_health(self) -> Dict[str, Any]: ...
```

### `TemporalMonitor`
Monitors workflow statuses, retry events, and active queues.

```python
from smartinno.monitoring.temporal_monitor import TemporalMonitor

monitor = TemporalMonitor(config=t_config, temporal_client=tc)

# Get statistics on active workflow runs
stats = monitor.get_real_time_stats(workflow_ids=["wf-1", "wf-2"])
print(stats["summary"])  # {'total': 2, 'completed': 1, 'running': 1, ...}

# Retrieve namespace liveness
health = monitor.check_health()
print(health["is_healthy"])  # True / False
```

### `RedisMonitor`
Monitors cluster topologies, slot redirections, and client pools.

```python
from smartinno.monitoring.redis_monitor import RedisMonitor

r_monitor = RedisMonitor(redis_client)
print(r_monitor.get_real_time_stats())
```

### `PulsarMonitor`
Checks Pulsar broker liveness and fallback subscriptions.

```python
from smartinno.monitoring.pulsar_monitor import PulsarMonitor

p_monitor = PulsarMonitor(pulsar_config)
print(p_monitor.check_health())
```

---

## 🎓 5. Step-by-Step Tutorial for New Developers

Follow this tutorial to build a fully resilient, event-driven Safari Tour booking pipeline from scratch.

### Step 1: Initialize the Configs
Create configuration settings for Redis (using the **High Concurrency** UseCase mapped to Native Cluster), messaging (with the Pulsar configuration), and Temporal orchestration.

```python
from smartinno.temporal.config import TemporalConfig
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.apache_pulsar.config import PulsarConfig

# 1. Config for Redis Shards
redis_cfg = RedisConfig(
    host="127.0.0.1",
    port=7001,
    cluster_host_mapping={
        "redis-node-1": ("127.0.0.1", 7001),
        "redis-node-2": ("127.0.0.1", 7002),
        "redis-node-3": ("127.0.0.1", 7003),
    }
)

# 2. Config for Pulsar Fallback
pulsar_cfg = PulsarConfig(
    host="164.68.120.77",
    port=6650,
    connection_timeout_ms=5000,
    operation_timeout_ms=10000
)

# 3. Config for Temporal Orchestrator
temporal_cfg = TemporalConfig(
    host="127.0.0.1",
    port=7233,
    namespace="tenant-safari-corp"
)
```

### Step 2: Instantiate SDK Clients
Create the client objects. The message client acts as the dispatcher, and the temporal client coordinates stateful workflows.

```python
from smartinno.temporal.client import TemporalClient
from smartinno.messaging.client import MessageClient

tc = TemporalClient(config=temporal_cfg)

msg = MessageClient(
    backend="redis",
    redis_config=redis_cfg,
    pulsar_config=pulsar_cfg,
    use_case=UseCase.HIGH_CONCURRENCY,
    temporal_client=tc
)
```

### Step 3: Define Workflows and Activity Steps
Decorate your operational tasks using the task decorators.

```python
# An activity step executing transactional calculations
@tc.step(queue="safari-queue", retry_attempts=3, backoff_seconds=2)
def create_booking_record(user_id: str, tour_id: str):
    # Perform database insertion or Redis write
    return {"booking_id": "BK-9092", "status": "reserved"}

# The main process orchestrator
@msg.on_event("booking-requests", queue="safari-queue") # Dynamic workflow start decorator
@tc.process(queue="safari-queue")
async def safari_booking_workflow(payload: dict):
    # Execute step durably
    booking = await tc.execute_step(create_booking_record, payload["user_id"], payload["tour_id"])
    print(f"Booking created: {booking}")
    return booking
```

### Step 4: Boot Up Workers & Start Routing
Instantiate and run the background execution threads.

```python
import threading
from smartinno.temporal.worker import TemporalWorkerManager

# 1. Start Temporal Orchestrator worker manager
worker_manager = TemporalWorkerManager(tc)
worker_manager.start()

# 2. Start Message Client routing loop in a separate thread
msg_thread = threading.Thread(target=lambda: msg.start(block=True), daemon=True)
msg_thread.start()

print("All background worker loops are listening...")
```

### Step 5: Trigger Event Execution
Publish an event to the stream topic. The messaging client intercepts it, auto-routes it to the correct Redis structure, starts the dynamic workflow process, and resolves the step activities durably.

```python
# Publish message -> triggers safari_booking_workflow automatically!
msg.smart_send("booking-requests", {"user_id": "user-88", "tour_id": "tour-maasai-mara"})
```
# 🦁 Safari Pro – Enterprise Unified SDK Master API Reference

Welcome to the **Safari Pro Unified SDK Master API Reference**. This guide is designed to help developers—both new and experienced—understand, configure, and operate the core infrastructure of the Safari Pro Super App. 

This SDK unifies **Temporal Orchestration**, **Enterprise Custom Redis Connectivity**, **Resilient Messaging (Pulsar/Redis Failover)**, and **Observability Monitoring** into a single, high-performance toolkit with built-in failover capabilities.

---

## 📐 Core SDK Architecture Overview

The Safari Pro SDK suite provides a high-reliability abstraction layer over databases, message buses, and workflow systems. It simplifies building microservices by exposing unified clients with built-in resiliency patterns.

```mermaid
graph TD
    User[Application Code] --> tc[TemporalClient]
    User --> rc[RedisClientFactory]
    User --> mc[MessageClient]
    User --> mon[Observability Monitors]
    
    tc -->|Orchestration| TempServer[Temporal Cluster]
    rc -->|Key-Value / Streams| RedisCluster[Redis Sentinel / Cluster]
    mc -->|Resilient Messaging| PubSub[Pulsar / Redis PubSub]
```

---

## 🛠️ Complete Enums & Configurations Guide

### 1. Architectural & Routing Enums

#### `smartinno.redis_custom.config.Architecture`
Defines the backend deployment topology for Redis connections.
*   `SENTINEL` ("sentinel"): High-Availability Sentinel setup with active-passive replication and automated master failover.
*   `CLUSTER` ("cluster"): Cryptographically sharded cluster distributing keys across 16,384 slots over multiple master nodes.
*   `HYBRID` ("hybrid"): Routes through a Predixy/Envoy proxy gateway to abstract cluster topology away from clients.
*   `SMART_ROUTER` ("smart_router"): Meta-architecture routing commands dynamically to different Redis topologies.

#### `smartinno.redis_custom.config.UseCase`
Guides the factory to auto-select the optimal Redis architecture for a given workload.
*   `HIGH_CONCURRENCY`: Maps to `CLUSTER` (Ideal for massive horizontal read/write scale).
*   `HEAVY_TRANSACTIONS`: Maps to `SENTINEL` (Ideal for multi-key pipelines and transactions).
*   `MICROSERVICES`: Maps to `HYBRID` (Ideal for polyglot systems and proxy gateways).
*   `AUTO`: Maps to `SMART_ROUTER` (Dynamically selects based on payload structure).

#### `smartinno.temporal.client.Frequency`
Defines scheduling intervals for recurring cron orchestrations.
*   `ONCE` ("once"): Fires exactly once.
*   `REPEATED` ("repeated"): Fires every minute.
*   `HOURLY` ("hourly"): Fires at the start of every hour.
*   `DAILY` ("daily"): Fires at midnight every day.
*   `WEEKLY` ("weekly"): Fires at midnight every Sunday.
*   `MONTHLY` ("monthly"): Fires at midnight on the first day of every month.

#### `smartinno.redis_custom.smart_router.DataSensitivity`
Categorizes data payloads based on PII/critical parameters to dictate caching TTL policies.
*   `CRITICAL` (Matches `passport`, `visa`): Bypasses persistent storage; cached for 60 seconds max.
*   `HIGH` (Matches `password`, `payment`, `card`, `cvv`, `secret`, `ssn`, `token`): Cached for 300 seconds.
*   `MEDIUM` (Matches `email`, `phone`, `user_name`): Standard caching rules apply.
*   `LOW`: Default category for standard operational logs or cache elements.

---

## 🔑 Redis Key Generation & Data Serialization Envelopes

To prevent key collision and ensure clean data parsing across heterogeneous systems (e.g. Go, Python, and Node.js microservices), the SDK enforces standardized key structures and JSON packaging envelopes.

### 1. Standardized Key Formatting Pattern
All keys written to Redis are transparently ran through the `BaseRedisAdapter._format_key()` handler. This prefixes keys according to a strict namespace convention:

$$\text{redis\_key} = \text{service\_name} : \text{action} : \text{original\_key}$$

*   **Parameters**:
    *   `service_name` (`str`, default `"safari_pro"`): Declared in `RedisConfig`. Prevents cross-app database contamination.
    *   `action` (`str`, default `"data"`): Scopes key usage (e.g., `data`, `cache`, `stream`, `queue`).
*   **Example Generation**:
    *   Original Key: `"booking:8848"`
    *   Generated Key: `safari_pro:data:booking:8848`

### 2. Cache Data envelope Serialization
For caching payloads (strings, hashes, simple JSON targets), keys are packed into a typed JSON envelope containing contextual metadata.

```json
{
    "request_id": "4020a6e3-fcf3-40e1-b4f6-6cbe702e5b85",
    "event": "update",
    "type": "cache",
    "service": "safari_pro",
    "tenant_name": null,
    "payload": {
        "booking_id": "BK-9989",
        "total_amount": 1500.0
    },
    "timestamp": "2026-06-29T10:24:18.123456"
}
```

*   **Envelope Fields**:
    *   `request_id` (`str`): UUIDv4 tracking identifier.
    *   `event` (`str`): Action trigger description (e.g., `update`, `invalidate`).
    *   `type` (`str`): Target cache category type.
    *   `service` (`str`): Originating application namespace name.
    *   `tenant_name` (`Optional[str]`): Partition indicator for multi-tenancy.
    *   `payload` (`Any`): The actual payload data structure.
    *   `timestamp` (`str`): UTC timestamp of serialization in ISO-8601 formatting.

*   *Note: Read operations automatically deserialize the envelope, returning only the underlying `payload` value.*

### 3. Messaging Emergency Fallback Envelopes
When circuit breakers trip or connection drops occur, message payloads are wrapped with failover metadata.

#### Redis Emergency Fallback
Stranded messages waiting for Pulsar recovery are written to a Redis Stream with the key pattern:
$$\text{key} = \text{emergency-fallback} : \{topic\}$$

*   **Envelope Schema**:
    ```json
    {
        "origin_topic": "booking-requests",
        "failed_at": 1782728658.452,
        "data": { ... }
    }
    ```

#### Pulsar Emergency Fallback
When Redis is down and fallback is routed directly to Pulsar, messages are packaged with origin metadata:
*   **Target Topic**: `persistent://public/default/emergency-fallback-redis-{topic}`
*   **Envelope Schema**:
    ```json
    {
        "origin": "booking-requests",
        "time": 1782728658.452,
        "data": { ... }
    }
    ```

---

## ⏳ 1. Temporal Orchestration SDK

### `TemporalConfig`
A configuration dataclass used to initialize the `TemporalClient`.

| Field Name | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| `host` | `str` | `"localhost"` | Endpoint hostname of the Temporal frontend service. |
| `port` | `int` | `7233` | Endpoint port of the Temporal frontend service. |
| `namespace` | `str` | `"default"` | Target namespace partition. |
| `ssl` | `bool` | `False` | Set to `True` to enable secure TLS communication. |
| `client_cert_path` | `Optional[str]` | `None` | Absolute path to the client certificate (.crt/.pem). |
| `client_key_path` | `Optional[str]` | `None` | Absolute path to the client private key (.key). |
| `auto_start_worker`| `bool` | `False` | Automatically boots background worker thread on execution. |
| `queues` | `List[str]` | `["default"]` | List of task queues the background worker monitors. |
| `mock_mode` | `Optional[bool]`| `None` | Explicit override. If omitted, mode is auto-detected from environment variables. |

### `@tc.workflow` (Alias: `@tc.process`)
Marks an async function as a stateful workflow process coordinator.

*   **Parameters**:
    *   `queue` (`str`, default `"default"`): The task queue where this workflow registration and its tasks reside.
*   **Attributes Attached to Wrapper**:
    *   `.delay(*args, **kwargs)`: Triggers the workflow asynchronously, returning a `TaskHandle`.
    *   `.workflow_name`: Registered string identifier (the decorated function's name).
    *   `._queue`: Target queue.

```python
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Orchestration logic goes here...
    return {"status": "success"}
```

### `@tc.task` (Alias: `@tc.step`)
Marks a function (sync or async) as an activity step. Activity steps execute real network operations, queries, or API calls.

*   **Parameters**:
    *   `queue` (`str`, default `"default"`): Task queue where this step executes.
    *   `retry_attempts` (`Optional[int]`, default `None`): Maximum times to retry if an exception occurs.
    *   `backoff_seconds` (`Optional[int]`, default `None`): Initial backoff delay (seconds) before retrying.
    *   `fallback` (`Optional[Callable]`, default `None`): Backup function run if all retries fail.
    *   `timeout` (`int`, default `300`): Maximum time allowed for a single run in seconds.
    *   `heartbeat_timeout` (`Optional[int]`, default `None`): Time limit between activity status heartbeats.

```python
def log_failed_payment(booking_id: str, amount: float):
    print(f"Payment failed for {booking_id} after retries.")
    return {"status": "payment_failed_logged"}

@tc.step(
    queue="billing-queue",
    retry_attempts=3,
    backoff_seconds=5,
    fallback=log_failed_payment,
    timeout=60
)
def process_payment(booking_id: str, amount: float):
    # Charge gateway API call...
    return {"txn_id": "TXN-8848"}
```

### `tc.execute_step(step_func, *args, **kwargs)`
Runs an activity step durably within the workflow context.

*   **Parameters**:
    *   `step_func` (`Callable`): The decorated `@tc.step` function to execute.
    *   `*args`: Positional arguments forwarded to `step_func`.
    *   `**kwargs`: Keyword arguments forwarded to `step_func`.
*   **Returns**: `Any` return value from `step_func`.

```python
result = await tc.execute_step(process_payment, "booking-id-123", 450.0)
```

### `tc.execute_local_step(step_func, *args, **kwargs)`
Runs an activity step locally inside the worker process running the workflow. Bypasses the Temporal server queues.

*   **Parameters**:
    *   `step_func` (`Callable`): Decorated `@tc.step` function.
    *   `*args` / `**kwargs`: Forwarded arguments.
    *   `timeout` (`int`, default `5`): Maximum local timeout in seconds.
    *   `retry_attempts` (`int`, default `3`): Maximum retries.
*   **Returns**: `Any` return value from `step_func`.

```python
is_valid = await tc.execute_local_step(check_promo_validity, "SAFARI10", 450.0)
```

### `tc.execute_in_parallel(*calls)` (Alias: `fan_out`)
Executes multiple step functions concurrently (Fan-Out/Fan-In).

*   **Parameters**:
    *   `*calls` (`tuple`): Varargs of tuples. Each tuple is formatted as:
        1. `step_func` (`Callable`): The `@tc.step` function to run.
        2. `args_list` (`list`): List of positional arguments.
        3. `kwargs_dict` (`dict`, optional): Dict of keyword arguments.
*   **Returns**: `list` containing results in corresponding order.

```python
results = await tc.execute_in_parallel(
    (search_hotels, ["Arusha"]),
    (search_flights, ["JRO"])
)
hotels, flights = results[0], results[1]
```

### `tc.execute_in_series(*calls)`
Executes multiple step functions sequentially.

*   **Parameters**: Same format as `execute_in_parallel`.
*   **Returns**: `list` containing results in order of completion.

```python
results = await tc.execute_in_series(
    (create_invoice, ["book-99"]),
    (send_receipt, ["book-99"])
)
```

### `tc.start_process(process_name, queue, *args, **kwargs)`
Launches a workflow process asynchronously and returns a `TaskHandle`.

*   **Parameters**:
    *   `process_name` (`str`): Name of the `@tc.process` workflow.
    *   `queue` (`str`): Target queue.
    *   `*args` / `**kwargs`: Inputs forwarded to the workflow.
*   **Returns**: `TaskHandle` to query or await workflow status and results.

```python
handle = tc.start_process("safari_booking_workflow", "booking-queue", {"tour_id": "T1"})
```

### `TaskHandle` Methods
*   `get(timeout: Optional[float] = None) -> Any`: Blocks until completion and returns workflow output.
*   `cancel() -> None`: Sends a cancellation request to the workflow.
*   `then(next_process_func: Callable, queue: Optional[str] = None, *args, **kwargs) -> TaskHandle`: Chains another process that starts automatically with the current workflow's result.
*   `status` (`str`): Returns `"RUNNING"`, `"COMPLETED"`, or `"CANCELLED"`.

```python
result = handle.get(timeout=60.0)
```

### `tc.set_state(process_id, key, value)`
Signals a running workflow to update its state dict out-of-band.

*   **Parameters**:
    *   `process_id` (`str`): The workflow's ID.
    *   `key` (`str`): State key name.
    *   `value` (`Any`): Value to set.

```python
tc.set_state(process_id, "guide_assigned", "Alex Kibona")
```

### `tc.get_state(process_id, key) -> Any`
Queries a running workflow for a state value.

```python
guide = tc.get_state(process_id, "guide_assigned")
```

### `tc.side_effect(func, *args) -> Any`
Safely records non-deterministic operations (UUIDs, timestamps, database lookups) into workflow history.

*   **Parameters**:
    *   `func` (`Callable`): A function returning a non-deterministic value.
    *   `*args`: Positional arguments passed to `func`.
*   **Returns**: The result of `func(*args)`.

```python
txn_ref = tc.side_effect(lambda: f"REF-{uuid.uuid4().hex[:8].upper()}")
```

### `tc.continue_process_as_new(process_func, *args, **kwargs)`
Restarts the active workflow with a clean history to prevent unbounded log growth.

```python
tc.continue_process_as_new(safari_booking_workflow, new_payload)
```

### Saga Transaction Coordinator
Rolls back actions in Last-In-First-Out (LIFO) order if an error is encountered.

```python
from smartinno.temporal.client import Saga

@tc.process(queue="booking-queue")
async def booking_saga_workflow(payload: dict):
    saga = Saga(tc)
    try:
        # Step 1: Reserve Hotel
        hotel = await tc.execute_step(reserve_hotel, payload["hotel_id"])
        saga.add_compensation(release_hotel_reservation, hotel["reservation_id"])
        
        # Step 2: Book Tour (might fail)
        await tc.execute_step(book_safari_tour, payload["tour_id"])
        
    except Exception as e:
        await saga.compensate()
        raise e
```

### Dynamic Queue Routing (`tc.route_step`)
Routes steps dynamically to specific queues at runtime.

*   **Parameters**:
    *   `step_func` (`Callable`): `@tc.step` function to execute.
    *   `router` (`Callable[..., str]`): Function returning the target queue string.
    *   `*args` / `**kwargs`: Arguments passed to `step_func` and `router`.

```python
def select_billing_queue(amount: float):
    return "high-value-payments" if amount >= 10000 else "standard-payments"

await tc.route_step(process_payment, select_billing_queue, amount=12500)
```

### Namespace Management APIs
*   `create_namespace(namespace_name: str, retention_days: int = 3) -> bool`
*   `describe_namespace(namespace_name: str) -> dict`
*   `update_namespace(namespace_name: str, retention_days: Optional[int] = None, description: Optional[str] = None, owner_email: Optional[str] = None) -> bool`
*   `delete_namespace(namespace_name: str) -> bool`

```python
tc.create_namespace("tenant-serengeti-lodges", retention_days=5)
```

### Scheduler (`tc.create_schedule`)
*   `schedule_id` (`str`): Unique identifier for the scheduled task.
*   `process_func` (`Callable`): Workflow process to launch.
*   `frequency` (`Frequency`): Pre-defined frequency (`ONCE`, `REPEATED`, `HOURLY`, `DAILY`, `WEEKLY`, `MONTHLY`).
*   `cron_expression` (`Optional[str]`): Raw cron string (e.g., `0 0 * * 1-5`).
*   `count` (`Optional[int]`): Maximum execution runs allowed before termination.
*   `interval_minutes` (`Optional[int]`): Run workflow at specific minute intervals.

```python
await tc.create_schedule(
    schedule_id="daily-reconciliation",
    process_func=reconciliation_workflow,
    frequency=Frequency.DAILY
)
```

---

## 2. 🔌 Custom Redis Connectivity SDK

Provides a unified interface `IRedisClient` with support for Sentinel, Cluster, Hybrid proxies (e.g., Predixy), and an auto-routing Intelligent Router.

### `RedisConfig`
Configuration settings for Redis instances. Key configuration fields:

| Field Name | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| `host` | `str` | `"localhost"` | Redis server host. |
| `port` | `int` | `6379` | Redis server port. |
| `password` | `Optional[str]`| `None` | Authentication password. |
| `db` | `int` | `0` | Database index. |
| `max_connections`| `int` | `100` | Max connection pool size. |
| `socket_timeout`| `float` | `0.5` | 500ms operations socket timeout. |
| `socket_connect_timeout`| `float`| `0.2` | 200ms connection establish timeout. |
| `retry_on_timeout`| `bool` | `True` | Automatically retry on socket timeout. |
| `cluster_host_mapping`| `Optional[dict]`| `None` | NAT/Docker cluster host maps. |

### `RedisClientFactory`
Instantiates the correct adapter according to specified architecture or use case.

```python
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.redis_custom.factory import RedisClientFactory

# Retrieve high concurrency cluster client
redis_client = RedisClientFactory.get_client(
    config=RedisConfig(host="localhost", port=7001),
    use_case=UseCase.HIGH_CONCURRENCY
)
```

### Core Operations (`IRedisClient`)

*   `set(key: str, value: Any) -> bool`
*   `get(key: str) -> Any`
*   `mget(keys: List[str]) -> List[Any]`
*   `hset(name: str, key: str = None, value: str = None, mapping: dict = None, ttl: int = None) -> int`
*   `hget(name: str, key: str) -> Any`
*   `lpush(name: str, *values, ttl: int = None) -> int`
*   `rpop(name: str) -> Any`
*   `pipeline(transaction: bool = True, shard_key: str = None) -> Pipeline`

#### Multi-key Transaction Sharding
When using cluster architecture, transactions must route to the same slot using hash tags.

```python
# Shard key guarantees slots match in Redis Cluster
with redis_client.pipeline(transaction=True, shard_key="hotel:987") as pipe:
    pipe.set("hotel:987:name", "Serengeti Lodge")
    pipe.set("hotel:987:available_rooms", 12)
    pipe.execute()
```

### High Concurrency Redis Streams APIs

*   `xadd(name: str, fields: dict) -> str`: Appends a message to a stream.
*   `xread(streams: dict, count: int = None, block: int = None) -> List`: Reads entries from streams.
*   `xgroup_create(name: str, groupname: str, id: str = "$", mkstream: bool = False)`: Registers consumer groups.
*   `xreadgroup(groupname: str, consumername: str, streams: dict, count: int = None, block: int = None) -> List`: Reads streams inside a consumer group.
*   `xack(name: str, groupname: str, *ids)`: Acknowledges message receipt.

```python
# Publish to stream
msg_id = redis_client.xadd("safari-bookings", {"booking_id": "BK-99", "status": "pending"})

# Read group messages
entries = redis_client.xreadgroup("booking-group", "consumer-1", {"safari-bookings": ">"}, count=10)
```

---

## 3. 📡 Resilient Messaging Facade (Pulsar / Redis)

The messaging facade provides resilient event pub/sub. It supports dual-active routing, client-side circuit breakers, and automatic database-to-broker failover.

### `PulsarConfig`
Configuration parameters for Apache Pulsar.

| Field Name | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| `host` | `str` | `"localhost"` | Pulsar broker host. |
| `port` | `int` | `6650` | Pulsar broker port. |
| `connection_timeout_ms`| `int` | `5000` | Connect timeout (default 5s). |
| `operation_timeout_ms` | `int` | `10000` | Read/write timeout (default 10s). |

### `@msg.on(topic, strategy="load_balanced", retries=2)`
Subscribes a callback to a topic. Supports specific distribution strategies:

*   **Pulsar Subscription Types**:
    *   `broadcast`: Creates an exclusive subscription with a randomized ID so every node receives the message.
    *   `load_balanced`: Creates a shared subscription (`Shared`) to distribute work across available workers.
    *   `sticky`: Creates a key-shared subscription (`Key_Shared`) to route messages with matching partition keys to the same consumer.
    *   `failover`: Creates a failover subscription (`Failover`) with active-standby consumers.
*   **Redis Subsystem Adaptation**:
    *   `broadcast` routes through standard **PubSub**.
    *   `load_balanced` or `sticky` routes through **Redis Streams** consumer groups.
    *   `exclusive` routes through **Redis Lists** (`RPOP`/`LPUSH` queue patterns).

```python
@msg.on("payment-events", strategy="load_balanced")
def handle_payment(payload):
    print(f"Received payment notification: {payload}")
```

### `@msg.on_event(topic, queue="default")`
Wires an event topic to automatically start a Temporal workflow when a message arrives.

```python
# Event-driven trigger
@msg.on_event("booking-requests", queue="booking-queue")
@tc.process(queue="booking-queue")
async def safari_booking_workflow(payload: dict):
    # Workflow triggers automatically on message
    await tc.execute_step(confirm_booking, payload["booking_id"])
```

### `msg.smart_send(topic, payload, key=None)`
Publishes a message to a topic with automatic database-to-broker fallback. 

*   **Resiliency Workflow**:
    1. Attempts to publish to primary backend (e.g. **Redis Stream**).
    2. If the connection fails or throws errors, it trips the circuit breaker to `OPEN`.
    3. The message is automatically wrapped and published to the **Pulsar emergency fallback topic**: `persistent://public/default/emergency-fallback-redis-{topic}`.
    4. Safe recovery loops buffer stranded payloads until the primary server recovers.

```python
# Resilient send
msg.smart_send("booking-requests", {"booking_id": "BK-100", "user_id": "U5"})
```

### `msg.smart_listen(topic, callback, hint=None, criteria=None, explicit_type=None, block=True)`
Dynamically listens on a Redis or Pulsar topic. It automatically inspects the channel type (e.g. detecting if a Redis key is a Stream, List, or PubSub channel) and launches the corresponding listener loops.

*   **Parameters**:
    *   `topic` (`str`): Target channel to monitor.
    *   `callback` (`Callable`): Code executed on payload arrival.
    *   `hint` (`Any` / `explicit_type`): Used to guide target pattern selection.
    *   `block` (`bool`, default `True`): Blocks execution and polls continuously.

```python
# Dynamically resolves stream/list/pubsub loops
msg.smart_listen("booking-requests", process_incoming_booking, explicit_type="stream")
```

### `msg.smart_get(topic, hint=None, explicit_type=None, criteria=None)`
Intelligently queries Redis keys, dynamically casting results based on type detection.

*   **Supported Type Resolutions**:
    *   `string` / `cache` -> returned value parsed from JSON.
    *   `hash` -> returns all mapped fields via `hgetall`.
    *   `zset` / `geo` -> returns top score ranges via `zrange`.
    *   `stream` -> returns pending stream entries.

```python
booking_data = msg.smart_get("booking:987", explicit_type="hash")
```

### `msg.flush_emergency_fallbacks(topic, batch_size=100)`
Drains the Redis emergency fallback queues and replays stranded payloads back into the primary Pulsar cluster once connectivity has restored.

```python
# Recovers and replays stranded messages
recovered = msg.flush_emergency_fallbacks("booking-requests", batch_size=50)
print(f"Replayed {recovered} stranded messages.")
```

---

## 4. 📊 Observability & Monitoring SDK

Provides health checks, counters, and statistics from the different technological subsystems using the `MonitoringProvider` protocol.

```python
from typing import Dict, Any, Protocol

class MonitoringProvider(Protocol):
    def get_real_time_stats(self) -> Dict[str, Any]: ...
    def check_health(self) -> Dict[str, Any]: ...
```

### `TemporalMonitor`
Monitors workflow statuses, retry events, and active queues.

```python
from smartinno.monitoring.temporal_monitor import TemporalMonitor

monitor = TemporalMonitor(config=t_config, temporal_client=tc)

# Get statistics on active workflow runs
stats = monitor.get_real_time_stats(workflow_ids=["wf-1", "wf-2"])
print(stats["summary"])  # {'total': 2, 'completed': 1, 'running': 1, ...}

# Retrieve namespace liveness
health = monitor.check_health()
print(health["is_healthy"])  # True / False
```

### `RedisMonitor`
Monitors cluster topologies, slot redirections, and client pools.

```python
from smartinno.monitoring.redis_monitor import RedisMonitor

r_monitor = RedisMonitor(redis_client)
print(r_monitor.get_real_time_stats())
```

### `PulsarMonitor`
Checks Pulsar broker liveness and fallback subscriptions.

```python
from smartinno.monitoring.pulsar_monitor import PulsarMonitor

p_monitor = PulsarMonitor(pulsar_config)
print(p_monitor.check_health())
```

---

## 🎓 5. Step-by-Step Tutorial for New Developers

Follow this tutorial to build a fully resilient, event-driven Safari Tour booking pipeline from scratch.

### Step 1: Initialize the Configs
Create configuration settings for Redis (using the **High Concurrency** UseCase mapped to Native Cluster), messaging (with the Pulsar configuration), and Temporal orchestration.

```python
from smartinno.temporal.config import TemporalConfig
from smartinno.redis_custom.config import RedisConfig, UseCase
from smartinno.apache_pulsar.config import PulsarConfig

# 1. Config for Redis Shards
redis_cfg = RedisConfig(
    host="127.0.0.1",
    port=7001,
    cluster_host_mapping={
        "redis-node-1": ("127.0.0.1", 7001),
        "redis-node-2": ("127.0.0.1", 7002),
        "redis-node-3": ("127.0.0.1", 7003),
    }
)

# 2. Config for Pulsar Fallback
pulsar_cfg = PulsarConfig(
    host="164.68.120.77",
    port=6650,
    connection_timeout_ms=5000,
    operation_timeout_ms=10000
)

# 3. Config for Temporal Orchestrator
temporal_cfg = TemporalConfig(
    host="127.0.0.1",
    port=7233,
    namespace="tenant-safari-corp"
)
```

### Step 2: Instantiate SDK Clients
Create the client objects. The message client acts as the dispatcher, and the temporal client coordinates stateful workflows.

```python
from smartinno.temporal.client import TemporalClient
from smartinno.messaging.client import MessageClient

tc = TemporalClient(config=temporal_cfg)

msg = MessageClient(
    backend="redis",
    redis_config=redis_cfg,
    pulsar_config=pulsar_cfg,
    use_case=UseCase.HIGH_CONCURRENCY,
    temporal_client=tc
)
```

### Step 3: Define Workflows and Activity Steps
Decorate your operational tasks using the task decorators.

```python
# An activity step executing transactional calculations
@tc.step(queue="safari-queue", retry_attempts=3, backoff_seconds=2)
def create_booking_record(user_id: str, tour_id: str):
    # Perform database insertion or Redis write
    return {"booking_id": "BK-9092", "status": "reserved"}

# The main process orchestrator
@msg.on_event("booking-requests", queue="safari-queue") # Dynamic workflow start decorator
@tc.process(queue="safari-queue")
async def safari_booking_workflow(payload: dict):
    # Execute step durably
    booking = await tc.execute_step(create_booking_record, payload["user_id"], payload["tour_id"])
    print(f"Booking created: {booking}")
    return booking
```

### Step 4: Boot Up Workers & Start Routing
Instantiate and run the background execution threads.

```python
import threading
from smartinno.temporal.worker import TemporalWorkerManager

# 1. Start Temporal Orchestrator worker manager
worker_manager = TemporalWorkerManager(tc)
worker_manager.start()

# 2. Start Message Client routing loop in a separate thread
msg_thread = threading.Thread(target=lambda: msg.start(block=True), daemon=True)
msg_thread.start()

print("All background worker loops are listening...")
```

### Step 5: Trigger Event Execution
Publish an event to the stream topic. The messaging client intercepts it, auto-routes it to the correct Redis structure, starts the dynamic workflow process, and resolves the step activities durably.

```python
# Publish message -> triggers safari_booking_workflow automatically!
msg.smart_send("booking-requests", {"user_id": "user-88", "tour_id": "tour-maasai-mara"})
```
