Metadata-Version: 2.4
Name: atlas-asset-websocket-client
Version: 0.1.4
Summary: Async helper for Atlas Command's asset websocket gateway.
Author: ATLAS Team
License-Expression: MIT
Project-URL: Homepage, https://github.com/atlas/command
Project-URL: Repository, https://github.com/atlas/command
Project-URL: Documentation, https://github.com/atlas/command/wiki
Keywords: atlas,command,websocket,asset
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Topic :: System :: Monitoring
Classifier: Topic :: Software Development :: Libraries
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: <3.13,>=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pydantic<3,>=1.10
Requires-Dist: websockets>=10
Provides-Extra: dev
Requires-Dist: pytest>=8.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.1.0; extra == "dev"
Requires-Dist: mypy>=1.10.0; extra == "dev"
Dynamic: license-file

# Atlas Asset WebSocket Client

Async helper that speaks the asset WebSocket contract (`/ws/assets`). The package manages the handshake, heartbeat cadence, telemetry helpers, command acknowledgements, and schema validation so asset OS projects stay in sync with Atlas Command without reimplementing the protocol.

## Requirements

- Python 3.9–3.12
- `websockets>=10`
- `pydantic>=1.10`

Install the package from PyPI:

```bash
pip install atlas-asset-websocket-client
```

During development inside this repository, install the editable package:

```bash
pip install -e Atlas_Command/connection_packages/atlas_asset_ws_client
```

## Quickstart

```python
import asyncio
from atlas_asset_ws_client import AssetWebSocketClient

URL = "ws://localhost:8000/ws/assets"
ASSET_ID = "asset-123"

async def connect(url: str):
    import websockets

    return await websockets.connect(url, ping_interval=20)

async def main() -> None:
    client = AssetWebSocketClient(URL, asset_id=ASSET_ID, connect=connect)
    async with client:
        ack = await client.handshake(
            {
                "firmware_version": "1.6.2",
                "capabilities": {"telemetry": True, "commands": True},
            }
        )
        print("Handshake ACK:", ack.payload)
        await client.send_heartbeat()
        await client.send_telemetry_update(
            {"latitude": 40.7128, "longitude": -74.0060, "altitude_m": 120}
        )

asyncio.run(main())
```

## API Overview

- **Connection**: `connect()`, `close()`, `async with client: ...`
- **Protocol helpers**: `handshake()`, `send_heartbeat()`, `send_telemetry_update()`, `request_command_queue()`, `complete_command()`, `send_settings_update()`, `create_track()`, `update_track()`, `create_geofeature()`, `update_geofeature()`, `delete_geofeature()`
- **Listener helpers**: `atlas_asset_ws_client.listener.listen()` and `atlas_asset_ws_client.listener.process_command_queue()` keep your asset reacting to realtime command feeds without re-implementing the loop.
- **Schema helpers**: `atlas_asset_ws_client.schemas.AssetEnvelope` and `atlas_asset_ws_client.schemas.AssetMeta` expose the same metadata contract used by Atlas Command.
- **Low-level helpers**: `send_json()`, `receive_json()`, `receive_envelope()`

All helpers populate `meta.asset_id`, `meta.stream`, timestamps, and `correlation_id` defaults when omitted.

## Custom Transport or Headers

Provide a custom `connect(url: str)` coroutine if you need TLS, headers, proxies, or token handling. The coroutine must return an object that implements `send`, `recv`, and `close` coroutines.

## Operation Notes

- Always call `handshake()` before sending other frames.
- Honor the heartbeat cadence advertised in the handshake acknowledgement.
- After reconnecting, redo the handshake and call `request_command_queue()` to restore queue state.
- Supply deterministic `correlation_id` values to trace requests across services.

## Schema Exports

The package re-exports the schema models used internally so you can import the envelope definitions without pulling in Atlas Command:

- `atlas_asset_ws_client.schemas.AssetEnvelope`
- `atlas_asset_ws_client.schemas.AssetMeta`
- `atlas_asset_ws_client.schemas.TelemetryPayload`
- `atlas_asset_ws_client.schemas.CommandCompletionPayload`
- `atlas_asset_ws_client.schemas.TrackPayload`
- `atlas_asset_ws_client.schemas.GeofeaturePayload`
- `atlas_asset_ws_client.schemas.SettingsPayload`

Use the typed helpers to pass BaseModel instances into the stream helpers instead of raw dicts, and avoid duplicating the payload definitions that Atlas Command itself consumes.

## Advanced helpers

- `listen()` yields every frame pushed by the asset gateway; you can `async for envelope in listen(client)` and dispatch only the `commands` stream frames, while `receive_envelope()` still enforces the expected type/stream/asset identifiers.
- `process_command_queue(client, handler)` automatically requests the latest queue, waits for additional `command_queue:*` envelopes, and reruns `handler` until you signal an `asyncio.Event` to stop.

```python
from atlas_asset_ws_client import AssetWebSocketClient
from atlas_asset_ws_client.listener import listen
from atlas_asset_ws_client.schemas import TrackPayload

client = AssetWebSocketClient("ws://localhost:8000/ws/assets", asset_id="asset-123")

async def run() -> None:
    async with client:
        await client.handshake(payload={"model_id": "DRONE-X"})
        await client.create_track(payload=TrackPayload(name="patrol"))
        async for envelope in listen(client):
            if envelope.type == "command_queue:data":
                print("queue", envelope.payload)
```

See [`Atlas_Command/docs/WEBSOCKET.md`](../../Atlas_Command/docs/WEBSOCKET.md#asset-gateway-wsassets) and [`Atlas_Command/docs/CLIENT_INTEGRATION.md`](../../Atlas_Command/docs/CLIENT_INTEGRATION.md#asset-example) for protocol references and integration tips.

## Testing

- Run the deterministic suite:

  ```bash
  pip install -r requirements-dev.txt
  pip install -e Atlas_Command/connection_packages/atlas_asset_ws_client[dev]
  pytest Atlas_Command/connection_packages/atlas_asset_ws_client/tests
  ```

- The package exposes a live integration test that connects to `ws://localhost:8000/ws/assets` (override with `ATLAS_WS_URL`). Supply `ATLAS_WS_TOKEN` and `ATLAS_WS_ASSET` to control authentication and asset identity. The test will skip quietly if the endpoint is unreachable:

  ```bash
  pytest Atlas_Command/connection_packages/atlas_asset_ws_client/tests/test_client.py -m live
  ```
