Metadata-Version: 2.4
Name: cawlib
Version: 0.1.2
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Operating System :: POSIX :: Linux
Classifier: Topic :: Scientific/Engineering
Classifier: Topic :: System :: Hardware
Classifier: Intended Audience :: Developers
Classifier: Development Status :: 3 - Alpha
Summary: A high-performance Rust-powered Python library for CAN bus communication and FOC motor control in robotics
Keywords: robotics,can-bus,motor-control,foc,socketcan
Author-email: noxrick91 <noxrick91@gmail.com>
License: Proprietary - All rights reserved
Requires-Python: >=3.8
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Bug Tracker, https://github.com/noxrick91/cawlib/issues
Project-URL: Homepage, https://github.com/noxrick91/cawlib
Project-URL: Repository, https://github.com/noxrick91/cawlib

# cawlib

A Rust-backed Python library for CAN bus communication and FOC motor control, exposed via [PyO3](https://pyo3.rs/). Built on the [Tokio](https://tokio.rs/) async runtime for high-precision timers, non-blocking CAN I/O, and lifecycle management.

## Requirements

- **OS**: Linux with SocketCAN (`can` / `vcan` driver loaded)
- **Python**: >= 3.8
- **Hardware**: A CAN-capable network interface (e.g. USB-CAN adapter); the interface name (e.g. `can0`) must already exist on the system

## Installation

```bash
pip install cawlib
```

## Automatic CAN interface setup

When you construct `CanDevice`, `CanDeviceNonBlocking`, or `Motor`, the library checks whether the named interface is **UP** at **1 Mbps**. If not, it runs:

```
ip link set <iface> down
ip link set <iface> type can bitrate 1000000
ip link set <iface> up
```

If that fails without privileges, it retries with `sudo` (you may be prompted). The interface must already exist (driver loaded); the library does not create device nodes.

---

## Quick start

```python
import cawlib

motor = cawlib.Motor("can0", host=0x100)
d1 = motor.add_device(slot_index=0, node_id=0x201, mode="speed")
d2 = motor.add_device(slot_index=1, node_id=0x202, mode="speed")

d1.set_target(10.0)   # 10 rad/s
d2.set_target(-5.0)   # -5 rad/s
motor.broadcast()

@cawlib.timer(100.0)
def loop():
    motor.broadcast()
    fb = d1.get_feedback()
    print(f"pos={fb['position']:.2f} rad  spd={fb['speed']:.2f} rad/s")

loop()
cawlib.block_all(timeout_ms=5000)
```

---

## API reference

### CAN devices

#### `CanDevice(interface: str)`

Blocking CAN device for synchronous use.

```python
can = cawlib.CanDevice("can0")
can.send(0x100, [0x01, 0x02, 0x03])
frame_id, data = can.receive()
```

| Method | Description |
|--------|-------------|
| `send(id: int, data: list[int])` | Send a CAN frame (blocking) |
| `receive() -> tuple[int, bytes]` | Receive a CAN frame (blocking) |
| `interface() -> str` | Interface name |

#### `CanDeviceNonBlocking(interface: str)`

Non-blocking CAN device with background I/O.

```python
can = cawlib.CanDeviceNonBlocking("can0")
can.send(0x100, [0x01, 0x02])
msg = can.try_receive()            # None if no frame available
msg = can.receive_timeout(1000)    # Wait up to 1000 ms
```

| Method | Description |
|--------|-------------|
| `send(id: int, data: list[int])` | Non-blocking send |
| `try_receive() -> tuple[int, bytes] \| None` | Non-blocking receive |
| `receive_timeout(timeout_ms: int) -> tuple[int, bytes] \| None` | Receive with timeout |
| `interface() -> str` | Interface name |

---

### Motor control

#### `Motor(interface: str, host=None, host_id=None)`

Broadcast-style motor controller for up to 4 drives per CAN bus. `host` and `host_id` are equivalent and set the host-side control-frame CAN ID.

```python
motor = cawlib.Motor("can0", host=0x100)
# Equivalent:
# motor = cawlib.Motor("can0", 0x100)
# motor = cawlib.Motor("can0", host_id=0x100)
```

`add_device` automatically: enters the DR configuration session via unicast, applies the FOC mode, then exits the session (no Flash write, no encoder zeroing).

```python
d1 = motor.add_device(slot_index=0, node_id=0x201, mode="current")
d2 = motor.add_device(slot_index=1, node_id=0x202, mode="speed")
d3 = motor.add_device(slot_index=2, node_id=0x203, mode="position")

d1.set_target(1.5)    # 1.5 A
d2.set_target(10.0)   # 10 rad/s
d3.set_target(3.14)   # 3.14 rad
motor.broadcast()

fb = d1.get_feedback()
# {"position": float, "speed": float, "current": float}
```

**`Motor` methods:**

| Method / property | Description |
|-----------------|-------------|
| `add_device(slot_index, node_id, mode) -> Device` | Register a drive; raises `OSError` on send failure |
| `set_target(slot_index: int, value: float)` | Set target for a slot |
| `broadcast()` | Pack and broadcast all targets (8 bytes, 4×i16 LE) |
| `get_feedback(slot_index: int) -> dict \| None` | Feedback dict for a slot, or `None` |
| `host` / `host_id` | Host-side CAN ID for control / DR frames |
| `enter_configure_broadcast()` | DR: broadcast enter configuration session |
| `exit_configure_broadcast()` | DR: broadcast exit configuration session |
| `calibrate_broadcast()` | DR: broadcast encoder electrical zeroing (**blocks ~200 ms after send**) |
| `set_foc_mode_broadcast(mode: str)` | DR: broadcast set FOC mode |

#### `Device`

Returned by `motor.add_device()`; one drive in the broadcast group.

**`Device` methods / properties:**

| Method / property | Description |
|-------------------|-------------|
| `set_target(value: float)` | Set target (units depend on mode; see table below) |
| `get_target() -> float` | Current target |
| `get_feedback() -> dict` | Feedback: `{"position", "speed", "current"}` |
| `slot_index` | Broadcast slot (0–3) |
| `node_id` | Drive node ID (feedback-frame CAN ID) |
| `enter_configure()` | DR: unicast enter configuration session |
| `exit_configure()` | DR: unicast exit configuration session |
| `start_calibration()` | DR: unicast encoder electrical zeroing (**blocks ~200 ms after send**) |
| `set_foc_mode(mode: str)` | DR: unicast set FOC mode |

**Control modes and units:**

| Mode | Unit | Resolution | Range |
|------|------|------------|-------|
| `"current"` | A | 0.01 A | ±327.67 A |
| `"speed"` | rad/s | 0.1 rad/s | ±3276.7 rad/s |
| `"position"` | rad | 0.01 rad | ±327.67 rad |

---

### Timers

#### `@timer(frequency_hz: float)`

Periodic timer on the Tokio runtime; resolution roughly 1–10 ms.

```python
@cawlib.timer(10.0)  # 10 Hz
def update():
    motor.broadcast()

handle = update()
print(handle.actual_frequency)
handle.stop()
```

#### `@precision_timer(frequency_hz: float, realtime: bool = False)`

High-precision timer; two modes:

| Mode | Implementation | Precision | Notes |
|------|----------------|-----------|-------|
| `realtime=False` (default) | Busy-wait loop | ~10–100 μs | Uses CPU |
| `realtime=True` | Dedicated OS thread | ~1–10 μs | CPU-intensive, highest precision |

```python
@cawlib.precision_timer(1000.0)  # 1 kHz
def fast_control():
    motor.broadcast()

handle = fast_control()
```

#### `TimerHandle`

Returned when you call a timer-decorated function.

| Property / method | Description |
|-------------------|-------------|
| `stop()` | Stop the timer |
| `is_running() -> bool` | Whether it is running |
| `frequency` | Target frequency (Hz) |
| `actual_frequency` | Achieved average frequency (Hz) |
| `tick_count` | Number of ticks |
| `elapsed_secs` | Elapsed seconds |
| `precision_mode` | `"normal"`, `"high"`, or `"realtime"` |
| `task_id` | Task ID |

---

### Async tasks

#### `@run_async`

Run a Python function as a background task without blocking the caller.

```python
@cawlib.run_async
def heavy():
    import time
    time.sleep(3)
    return 42

handle = heavy()
handle.wait(timeout_ms=5000)
print(handle.try_result())  # 42
```

#### `AsyncHandle`

| Property / method | Description |
|-------------------|-------------|
| `is_done() -> bool` | Finished (success or failure) |
| `is_completed() -> bool` | Completed successfully |
| `is_cancelled() -> bool` | Cancelled |
| `cancel()` | Request cancellation |
| `try_result() -> Any \| None` | Result if done; otherwise `None` |
| `wait(timeout_ms: int \| None = None)` | Block until done or timeout |
| `task_id` | Task ID |

---

### Task management

#### `get_task_manager() -> TaskManager`

Global task manager for monitoring and coordinated shutdown.

```python
tm = cawlib.get_task_manager()
print(tm.status())
# {"running_timers": 2, "running_tasks": 1, "stopped": 0, "total": 3}

tm.shutdown(timeout_ms=5000)
```

| Property / method | Description |
|-------------------|-------------|
| `running_count` | Running tasks |
| `total_count` | Registered tasks |
| `timer_count` | Running timers |
| `async_task_count` | Running async tasks |
| `stop_all()` | Stop all tasks |
| `wait_all(timeout_ms: int \| None = None)` | Wait for all to finish |
| `shutdown(timeout_ms: int \| None = None)` | Stop all and wait |
| `cleanup()` | Drop finished tasks |
| `status() -> dict` | Status summary |

#### Module-level helpers

| Function | Description |
|----------|-------------|
| `shutdown_all(timeout_ms=None)` | Stop all tasks and wait |
| `block_all(timeout_ms=None)` | Block until SIGINT/SIGTERM, then shut down cleanly |
| `block_until(wait_ms=None, shutdown_timeout_ms=None)` | Block until timeout or signal, then shut down |

---

## Example: dual-motor control

```python
import cawlib

motor = cawlib.Motor("can0", host=0x100)
d1 = motor.add_device(slot_index=0, node_id=0x201, mode="speed")
d2 = motor.add_device(slot_index=1, node_id=0x202, mode="speed")

target = 5.0

@cawlib.precision_timer(500.0)  # 500 Hz control
def control():
    d1.set_target(target)
    d2.set_target(-target)
    motor.broadcast()

@cawlib.timer(2.0)  # 2 Hz monitoring
def monitor():
    f1 = d1.get_feedback()
    f2 = d2.get_feedback()
    print(f"D1  spd={f1['speed']:+.1f} rad/s  cur={f1['current']:.2f} A")
    print(f"D2  spd={f2['speed']:+.1f} rad/s  cur={f2['current']:.2f} A")

control()
monitor()
cawlib.block_all(timeout_ms=3000)
```

---

## License

Proprietary — All rights reserved.

