Metadata-Version: 2.4
Name: shmx
Version: 1.0.1
Summary: High-performance shared-memory IPC for frame streaming
Author: SHMX Contributors
License: MIT
Keywords: ipc,shared-memory,high-performance,streaming,frame
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: C++
Classifier: Operating System :: Microsoft :: Windows
Classifier: Operating System :: POSIX :: Linux
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.7
Description-Content-Type: text/markdown

# SHMX - High-Performance Shared Memory IPC

[![PyPI version](https://badge.fury.io/py/shmx.svg)](https://badge.fury.io/py/shmx)
[![License: MPL 2.0](https://img.shields.io/badge/License-MPL%202.0-brightgreen.svg)](https://opensource.org/licenses/MPL-2.0)

A **lock-free, zero-copy** shared-memory IPC library for high-performance frame streaming between processes. SHMX enables ultra-low latency data exchange with typed schemas, per-frame metadata, and backpressure tolerance.

## 🚀 Features

- **Zero-Copy Data Transfer** - Direct memory access via shared memory with Python `memoryview` support
- **Lock-Free Architecture** - Wait-free reads, minimal contention for writes
- **Typed Schema System** - Self-describing streams with static metadata directory
- **Backpressure Tolerant** - Slow readers drop frames gracefully without blocking the producer
- **Bidirectional Control** - Per-reader control rings for client→server messages
- **Cross-Platform** - Windows and Linux support
- **Pure Python Interface** - No numpy required (but compatible), returns native Python types
- **Introspection Tools** - Built-in inspector for debugging and monitoring

## 📦 Installation

```bash
pip install shmx
```

## 🏃 Quick Start

### Basic Producer-Consumer Example

**Producer (Server):**

```python
import shmx
import time
import struct

# Define stream schema
streams = [
    shmx.create_stream_spec(
        stream_id=1,
        name="frame_id",
        dtype_code=shmx.DT_U64,
        components=1,
        bytes_per_elem=8
    ),
    shmx.create_stream_spec(
        stream_id=2,
        name="timestamp",
        dtype_code=shmx.DT_F64,
        components=1,
        bytes_per_elem=8
    ),
    shmx.create_stream_spec(
        stream_id=3,
        name="data",
        dtype_code=shmx.DT_F32,
        components=1,
        bytes_per_elem=4
    ),
]

# Create server
server = shmx.Server()
if not server.create(
    name="my_stream",
    slots=4,
    reader_slots=16,
    static_bytes_cap=4096,
    frame_bytes_cap=65536,
    control_per_reader=4096,
    streams=streams
):
    print("Failed to create server")
    exit(1)

print(f"Server created: {server.get_header_info()}")

# Publish frames
for i in range(100):
    frame = server.begin_frame()
    
    # Append streams
    server.append_stream(frame, 1, struct.pack('Q', i), 1)
    server.append_stream(frame, 2, struct.pack('d', time.time()), 1)
    server.append_stream(frame, 3, struct.pack('10f', *range(10)), 10)
    
    # Publish
    server.publish_frame(frame, time.time())
    time.sleep(0.01)

server.destroy()
```

**Consumer (Client):**

```python
import shmx
import time

# Open client
client = shmx.Client()
if not client.open("my_stream"):
    print("Failed to open client")
    exit(1)

print(f"Client connected: {client.get_header_info()}")
print(f"Available streams: {client.get_streams_info()}")

# Read frames
for _ in range(100):
    frame = client.get_latest_frame()
    if frame is not None:
        # Access metadata
        metadata = frame['__metadata__']
        print(f"Frame {metadata['frame_id']}: sim_time={metadata['sim_time']:.3f}")
        
        # Access stream data (zero-copy memoryview)
        frame_id_data = frame['frame_id']['data']
        timestamp_data = frame['timestamp']['data']
        data_stream = frame['data']['data']
        
        # Convert to native types if needed
        import struct
        frame_id = struct.unpack('Q', frame_id_data)[0]
        timestamp = struct.unpack('d', timestamp_data)[0]
        print(f"  frame_id={frame_id}, timestamp={timestamp:.3f}")
    
    time.sleep(0.01)

client.close()
```

## 📚 API Documentation

### Server Class

The `Server` class publishes frames to shared memory for multiple clients to consume.

#### `Server()`

Create a new server instance.

#### `create(name, slots=3, reader_slots=16, static_bytes_cap=4096, frame_bytes_cap=65536, control_per_reader=4096, streams=[]) -> bool`

Create and initialize the shared memory region.

**Parameters:**
- `name` (str): Shared memory region name (acts as the channel identifier)
- `slots` (int): Number of frame slots in the ring buffer (default: 3)
- `reader_slots` (int): Maximum number of concurrent readers (default: 16)
- `static_bytes_cap` (int): Capacity for static metadata (default: 4096)
- `frame_bytes_cap` (int): Maximum bytes per frame payload (default: 65536)
- `control_per_reader` (int): Control ring buffer size per reader (default: 4096)
- `streams` (list): List of stream specification dicts (see `create_stream_spec`)

**Returns:** `bool` - True if successful, False otherwise

#### `destroy()`

Destroy and release the shared memory region.

#### `begin_frame() -> frame_handle`

Begin a new frame. Returns an opaque frame handle to be used with `append_stream` and `publish_frame`.

#### `append_stream(frame_handle, stream_id, data, elem_count) -> bool`

Append stream data to the current frame.

**Parameters:**
- `frame_handle`: Frame handle from `begin_frame()`
- `stream_id` (int): Stream ID matching the schema
- `data` (bytes): Raw binary data
- `elem_count` (int): Number of elements in the data

**Returns:** `bool` - True if successful

#### `publish_frame(frame_handle, sim_time) -> bool`

Publish the frame to shared memory, making it available to clients.

**Parameters:**
- `frame_handle`: Frame handle from `begin_frame()`
- `sim_time` (float): Simulation/frame timestamp

**Returns:** `bool` - True if successful

#### `poll_control(max_messages=256) -> list`

Poll control messages from clients.

**Returns:** List of dicts with keys: `reader_id`, `type`, `data` (bytes)

#### `snapshot_readers() -> list`

Get a snapshot of all connected readers.

**Returns:** List of dicts with keys: `reader_id`, `heartbeat`, `last_frame_seen`, `in_use`

#### `reap_stale_readers(now_ticks, timeout_ticks) -> int`

Remove stale readers that haven't sent heartbeats.

**Returns:** Number of readers reaped

#### `get_header_info() -> dict`

Get header/metadata information about the shared memory region.

---

### Client Class

The `Client` class consumes frames from shared memory published by a server.

#### `Client()`

Create a new client instance.

#### `open(name) -> bool`

Open and connect to a shared memory region.

**Parameters:**
- `name` (str): Shared memory region name

**Returns:** `bool` - True if successful

#### `close()`

Close the connection and release resources.

#### `is_open() -> bool`

Check if the client is currently connected.

#### `get_latest_frame() -> dict | None`

Get the most recent frame from the server.

**Returns:**
- `None` if no frame available or validation failed
- `dict` with the following structure:
  ```python
  {
      '__metadata__': {
          'frame_id': int,
          'sim_time': float,
          'payload_bytes': int,
          'tlv_count': int
      },
      'stream_name': {
          'data': memoryview,      # Zero-copy buffer
          'elem_count': int,
          'bytes': int
      },
      # ... additional streams
  }
  ```

**Note:** The `memoryview` objects provide zero-copy access to the shared memory. Data is only valid until the next frame is published. If you need to retain data, convert to bytes or copy it.

#### `get_streams_info() -> list`

Get metadata about all available streams.

**Returns:** List of dicts with keys: `id`, `name`, `dtype`, `dtype_code`, `components`, `layout`, `bytes_per_elem`, and optionally `extra`

#### `get_header_info() -> dict`

Get header/metadata information about the shared memory region.

#### `refresh_static() -> bool`

Refresh the static stream metadata (useful if schema changes).

#### `send_control(type, data) -> bool`

Send a control message to the server.

**Parameters:**
- `type` (int): Message type identifier
- `data` (bytes): Message payload

#### `send_control_empty(type) -> bool`

Send a control message without payload.

---

### Inspector Class

The `Inspector` class provides read-only introspection of shared memory state for debugging and monitoring.

#### `Inspector()`

Create a new inspector instance.

#### `open(name) -> bool`

Open a shared memory region in read-only mode.

#### `close()`

Close the connection.

#### `inspect() -> dict`

Get a comprehensive inspection report.

**Returns:** Dict with keys: `session_id`, `static_gen`, `frame_seq`, `readers_connected`, `streams`, `readers`

#### `get_header_info() -> dict`

Get header information.

#### `get_streams_info() -> list`

Get stream metadata.

#### `get_readers_info() -> list`

Get information about connected readers.

---

### Helper Functions

#### `create_stream_spec(stream_id, name, dtype_code, components, bytes_per_elem, layout_code=None, extra=None) -> dict`

Helper function to create stream specification dictionaries for `Server.create()`.

**Parameters:**
- `stream_id` (int): Unique stream identifier
- `name` (str): Human-readable stream name
- `dtype_code` (int): Data type constant (e.g., `shmx.DT_F32`)
- `components` (int): Number of components per element (1 for scalar)
- `bytes_per_elem` (int): Total bytes per element
- `layout_code` (int, optional): Layout constant (default: `LAYOUT_SOA_SCALAR`)
- `extra` (bytes, optional): Additional metadata

**Returns:** Dict suitable for `Server.create()` streams parameter

#### `dtype_to_string(dtype_code) -> str`

Convert a dtype code to a human-readable string.

#### `layout_to_string(layout_code) -> str`

Convert a layout code to a human-readable string.

---

### Constants

#### Data Types

- `DT_BOOL` - Boolean (1 byte)
- `DT_I8`, `DT_U8` - 8-bit signed/unsigned integer
- `DT_I16`, `DT_U16` - 16-bit signed/unsigned integer
- `DT_I32`, `DT_U32` - 32-bit signed/unsigned integer
- `DT_I64`, `DT_U64` - 64-bit signed/unsigned integer
- `DT_F16` - 16-bit float (half precision)
- `DT_BF16` - 16-bit bfloat
- `DT_F32` - 32-bit float (single precision)
- `DT_F64` - 64-bit float (double precision)

#### Layouts

- `LAYOUT_SOA_SCALAR` - Scalar data (default)
- `LAYOUT_AOS_VECTOR` - Vector data (interleaved)

#### TLV Types

- `TLV_STATIC_DIR` - Static directory entry
- `TLV_FRAME_STREAM` - Frame stream data
- `TLV_CONTROL_USER` - User control message

---

## 📖 Complete Examples

### Example 1: Video Frame Streaming

```python
import shmx
import numpy as np
import time

# Server: Publish video frames
def video_server():
    streams = [
        shmx.create_stream_spec(1, "width", shmx.DT_U32, 1, 4),
        shmx.create_stream_spec(2, "height", shmx.DT_U32, 1, 4),
        shmx.create_stream_spec(3, "pixels", shmx.DT_U8, 1, 1),
    ]
    
    server = shmx.Server()
    server.create("video_stream", slots=4, frame_bytes_cap=1920*1080*3, streams=streams)
    
    width, height = 1920, 1080
    
    for frame_num in range(1000):
        # Generate dummy frame (would be real camera data)
        pixels = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
        
        frame = server.begin_frame()
        server.append_stream(frame, 1, width.to_bytes(4, 'little'), 1)
        server.append_stream(frame, 2, height.to_bytes(4, 'little'), 1)
        server.append_stream(frame, 3, pixels.tobytes(), width * height * 3)
        server.publish_frame(frame, time.time())
        
        time.sleep(1/30)  # 30 FPS
    
    server.destroy()

# Client: Consume video frames
def video_client():
    client = shmx.Client()
    client.open("video_stream")
    
    while True:
        frame = client.get_latest_frame()
        if frame:
            width = int.from_bytes(frame['width']['data'], 'little')
            height = int.from_bytes(frame['height']['data'], 'little')
            
            # Zero-copy access to pixel data
            pixels_view = frame['pixels']['data']
            
            # Convert to numpy for processing (creates copy)
            pixels = np.frombuffer(pixels_view, dtype=np.uint8).reshape((height, width, 3))
            
            print(f"Received frame {frame['__metadata__']['frame_id']}: {width}x{height}")
            
            # Process frame...
        
        time.sleep(1/30)
```

### Example 2: Sensor Data with Control Messages

```python
import shmx
import struct
import time

# Control message types
CTRL_SET_RATE = 0x1001
CTRL_RESET = 0x1002

# Server with control message handling
def sensor_server():
    streams = [
        shmx.create_stream_spec(1, "temperature", shmx.DT_F32, 1, 4),
        shmx.create_stream_spec(2, "pressure", shmx.DT_F32, 1, 4),
        shmx.create_stream_spec(3, "humidity", shmx.DT_F32, 1, 4),
    ]
    
    server = shmx.Server()
    server.create("sensors", streams=streams)
    
    rate = 10.0  # Hz
    
    for i in range(1000):
        # Poll control messages
        msgs = server.poll_control()
        for msg in msgs:
            if msg['type'] == CTRL_SET_RATE:
                new_rate = struct.unpack('f', msg['data'])[0]
                print(f"Reader {msg['reader_id']} set rate to {new_rate} Hz")
                rate = new_rate
            elif msg['type'] == CTRL_RESET:
                print(f"Reader {msg['reader_id']} requested reset")
        
        # Publish sensor data
        frame = server.begin_frame()
        server.append_stream(frame, 1, struct.pack('f', 25.0 + i * 0.1), 1)
        server.append_stream(frame, 2, struct.pack('f', 1013.25 + i * 0.01), 1)
        server.append_stream(frame, 3, struct.pack('f', 45.0 + i * 0.05), 1)
        server.publish_frame(frame, time.time())
        
        # Check reader health
        readers = server.snapshot_readers()
        print(f"Active readers: {len([r for r in readers if r['in_use']])}")
        
        time.sleep(1.0 / rate)
    
    server.destroy()

# Client with control messages
def sensor_client():
    client = shmx.Client()
    client.open("sensors")
    
    print("Streams:", client.get_streams_info())
    
    # Request faster rate
    client.send_control(CTRL_SET_RATE, struct.pack('f', 100.0))
    
    for _ in range(100):
        frame = client.get_latest_frame()
        if frame:
            temp = struct.unpack('f', frame['temperature']['data'])[0]
            pressure = struct.unpack('f', frame['pressure']['data'])[0]
            humidity = struct.unpack('f', frame['humidity']['data'])[0]
            
            print(f"T={temp:.1f}°C P={pressure:.2f}hPa H={humidity:.1f}%")
        
        time.sleep(0.01)
    
    client.close()
```

### Example 3: Multiple Streams with Numpy

```python
import shmx
import numpy as np
import time

def numpy_example():
    # Server
    streams = [
        shmx.create_stream_spec(1, "positions", shmx.DT_F32, 3, 12),
        shmx.create_stream_spec(2, "velocities", shmx.DT_F32, 3, 12),
        shmx.create_stream_spec(3, "ids", shmx.DT_U32, 1, 4),
    ]
    
    server = shmx.Server()
    server.create("particles", frame_bytes_cap=1024*1024, streams=streams)
    
    # Client
    client = shmx.Client()
    client.open("particles")
    
    # Publish
    num_particles = 1000
    for i in range(100):
        positions = np.random.randn(num_particles, 3).astype(np.float32)
        velocities = np.random.randn(num_particles, 3).astype(np.float32)
        ids = np.arange(num_particles, dtype=np.uint32)
        
        frame = server.begin_frame()
        server.append_stream(frame, 1, positions.tobytes(), num_particles)
        server.append_stream(frame, 2, velocities.tobytes(), num_particles)
        server.append_stream(frame, 3, ids.tobytes(), num_particles)
        server.publish_frame(frame, i * 0.01)
        
        # Read back
        frame_data = client.get_latest_frame()
        if frame_data:
            # Zero-copy view into shared memory
            pos_view = np.frombuffer(frame_data['positions']['data'], dtype=np.float32)
            pos_array = pos_view.reshape(-1, 3)
            
            vel_view = np.frombuffer(frame_data['velocities']['data'], dtype=np.float32)
            vel_array = vel_view.reshape(-1, 3)
            
            print(f"Frame {i}: {len(pos_array)} particles")
            print(f"  Position range: [{pos_array.min():.2f}, {pos_array.max():.2f}]")
        
        time.sleep(0.01)
    
    server.destroy()
    client.close()

if __name__ == '__main__':
    numpy_example()
```

### Example 4: Inspector for Debugging

```python
import shmx
import time

def inspect_stream(name):
    """Inspect a running stream without interfering"""
    inspector = shmx.Inspector()
    
    if not inspector.open(name):
        print(f"Failed to open stream '{name}'")
        return
    
    # Get complete report
    report = inspector.inspect()
    
    print(f"\n=== Stream Inspection: {name} ===")
    print(f"Session ID: {report['session_id']}")
    print(f"Frame Sequence: {report['frame_seq']}")
    print(f"Readers Connected: {report['readers_connected']}")
    print(f"Static Generation: {report['static_gen']}")
    
    print("\nStreams:")
    for stream in report['streams']:
        print(f"  [{stream['id']}] {stream['name']}")
        print(f"      Type: {stream['dtype']} x {stream['components']}")
        print(f"      Layout: {stream['layout']}")
        print(f"      Bytes/elem: {stream['bytes_per_elem']}")
    
    print("\nReaders:")
    for reader in report['readers']:
        if reader['in_use']:
            print(f"  Reader {reader['reader_id']}")
            print(f"    Last frame: {reader['last_frame_seen']}")
            print(f"    Heartbeat: {reader['heartbeat']}")
    
    inspector.close()

# Usage
if __name__ == '__main__':
    inspect_stream("my_stream")
```

### Example 5: Multi-Process Communication

```python
import shmx
import multiprocessing
import time
import struct

def producer_process(name):
    """Producer process"""
    streams = [
        shmx.create_stream_spec(1, "counter", shmx.DT_U64, 1, 8),
        shmx.create_stream_spec(2, "value", shmx.DT_F64, 1, 8),
    ]
    
    server = shmx.Server()
    server.create(name, streams=streams)
    print(f"Producer: Started on '{name}'")
    
    for i in range(100):
        frame = server.begin_frame()
        server.append_stream(frame, 1, struct.pack('Q', i), 1)
        server.append_stream(frame, 2, struct.pack('d', i * 3.14), 1)
        server.publish_frame(frame, time.time())
        time.sleep(0.1)
    
    server.destroy()
    print("Producer: Done")

def consumer_process(name, consumer_id):
    """Consumer process"""
    time.sleep(0.5)  # Wait for producer
    
    client = shmx.Client()
    if not client.open(name):
        print(f"Consumer {consumer_id}: Failed to connect")
        return
    
    print(f"Consumer {consumer_id}: Connected to '{name}'")
    
    count = 0
    for _ in range(50):
        frame = client.get_latest_frame()
        if frame:
            counter = struct.unpack('Q', frame['counter']['data'])[0]
            value = struct.unpack('d', frame['value']['data'])[0]
            print(f"Consumer {consumer_id}: counter={counter}, value={value:.2f}")
            count += 1
        time.sleep(0.2)
    
    client.close()
    print(f"Consumer {consumer_id}: Received {count} frames")

if __name__ == '__main__':
    stream_name = "multi_process_test"
    
    # Start producer
    producer = multiprocessing.Process(target=producer_process, args=(stream_name,))
    producer.start()
    
    # Start multiple consumers
    consumers = []
    for i in range(3):
        consumer = multiprocessing.Process(target=consumer_process, args=(stream_name, i))
        consumer.start()
        consumers.append(consumer)
    
    # Wait for completion
    producer.join()
    for consumer in consumers:
        consumer.join()
    
    print("All processes completed")
```

## 🔧 Advanced Usage

### Memory Layout and Performance

SHMX uses a ring buffer architecture with the following characteristics:

- **Slots**: Number of frames buffered (configure with `slots` parameter)
- **Frame Size**: Maximum frame payload size (`frame_bytes_cap`)
- **Zero-Copy**: Client `memoryview` objects point directly to shared memory
- **Lock-Free**: Atomic operations for synchronization

**Performance Tips:**
1. Allocate `frame_bytes_cap` based on your maximum frame size
2. Use `slots=3` or `slots=4` for typical applications
3. `memoryview` data is only valid until the next server frame publication
4. Convert to `bytes()` or `bytearray()` if you need to retain data

### Error Handling

```python
import shmx

# Always check return values
server = shmx.Server()
if not server.create("my_stream", ...):
    print("Failed to create server - check permissions and naming")
    exit(1)

client = shmx.Client()
if not client.open("my_stream"):
    print("Failed to open client - server may not be running")
    exit(1)

# Check for None when reading frames
frame = client.get_latest_frame()
if frame is None:
    print("No frame available or checksum mismatch")
```

### Schema Evolution

```python
# Server can update static metadata
server.write_static_append(b"extra_metadata", len(b"extra_metadata"))

# Clients can refresh
client.refresh_static()
streams = client.get_streams_info()
```

## 🐛 Troubleshooting

### Common Issues

1. **"Failed to create server"**
   - Check if name is already in use
   - Ensure sufficient permissions
   - On Linux: Check `/dev/shm/` permissions

2. **"Failed to open client"**
   - Verify server is running
   - Check that names match exactly
   - Ensure client has read permissions

3. **Frames are None**
   - Server may not be publishing yet
   - Check frame checksums (possible corruption)
   - Verify session IDs match

4. **Memory issues**
   - Increase `frame_bytes_cap` if frames are too large
   - Check total shared memory usage
   - On Linux: `df -h /dev/shm`

## 📊 Performance Characteristics

- **Latency**: < 1 microsecond for local reads (CPU cache hit)
- **Throughput**: Limited by memory bandwidth (10+ GB/s typical)
- **Scalability**: Supports 16+ concurrent readers (configurable)
- **Overhead**: Minimal - atomic operations only

## 🔗 Related Projects

- Main repository: [github.com/HinaPE/shared-mem-ipc](https://github.com/HinaPE/shared-mem-ipc)
- C++ header-only library included

## 📄 License

Mozilla Public License Version 2.0

## 🤝 Contributing

Contributions are welcome! Please visit the main repository for guidelines.
