Metadata-Version: 2.4
Name: cybotrade
Version: 2.0.17
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Dist: colorlog>=6.9
Requires-Dist: cybotrade-datasource>=0.1.9
Requires-Dist: polars>=1.31.0
Requires-Dist: requests>=2.32.4
Requires-Dist: eth-hash[pycryptodome]>=0.7.1
Requires-Dist: aion-scheduler>=0.1.0
License-File: LICENSE
Summary: Primitives and utilities for automated trading strategies.
Requires-Python: >=3.12
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM

# Cybotrade

**Primitives and utilities for building automated trading strategies.**

Cybotrade is a Python library for writing live, event-driven crypto trading
strategies. It gives you a single, consistent interface across multiple
exchanges (REST + private WebSocket), a built-in market-data layer, a job
scheduler, and a `BaseStrategy` harness that wires everything together so you
can focus on strategy logic instead of plumbing.

The performance-critical core (HTTP client, WebSocket transport, symbol/topic
parsing) is implemented in Rust via [PyO3](https://pyo3.rs) and shipped as a
compiled extension, so there is no Rust toolchain required to use it.

---

## Features

- **Unified exchange interface** — one `ExchangeClient` abstraction for REST
  trading (place/cancel orders, positions, balances, order details, orderbook,
  symbol info) across all supported venues.
- **Private WebSocket streams** — authenticated order-update streams with
  automatic heartbeating and reconnection.
- **Market data** — pull historical and streaming data through
  [`cybotrade-datasource`](https://pypi.org/project/cybotrade-datasource/) using
  a simple `Topic` model, delivered as [Polars](https://pola.rs) DataFrames.
- **Strategy harness** — `BaseStrategy` runs your scheduled jobs, exchange
  events, and datasource stream together under one asyncio event loop, with
  graceful `SIGINT`/`SIGTERM` shutdown.
- **Built-in scheduler** — cron/interval/date job scheduling via
  [`aion`](https://pypi.org/project/aion-scheduler/).
- **Typed throughout** — ships with `py.typed` and stub files; rich dataclass
  models (`OrderUpdate`, `Position`, `Balance`, `SymbolInfo`, …) and `Decimal`
  precision for prices and quantities.
- **Logging helpers** — colorized console and rotating-file handlers.

### Supported exchanges

| Exchange | REST client          | Private WebSocket   |
| -------- | -------------------- | ------------------- |
| Bybit    | `BybitLinearClient`  | `BybitPrivateWS`    |
| Binance  | `BinanceLinearClient`| `BinancePrivateWS`  |
| KuCoin   | `KucoinLinearClient` | `KucoinPrivateWS`   |
| EdgeX    | `EdgeXClient`        | `EdgeXPrivateWS`    |

> All exchange clients currently target **linear (USDⓈ-M) perpetual** markets.

---

## Installation

```bash
pip install cybotrade
```

Requires **Python 3.12+**. Pre-built wheels are published for macOS
(universal2), Linux (`x86_64` + `aarch64`), and Windows (`x86_64`), so no
compilation is needed on those platforms.

---

## Quick start

### Placing an order

```python
import asyncio
from decimal import Decimal

from cybotrade import Symbol
from cybotrade.models import OrderSide
from cybotrade.bybit import BybitLinearClient


async def main():
    client = BybitLinearClient(api_key="...", api_secret="...", testnet=True)

    # Inspect the symbol's trading rules
    info = await client.get_symbol_info(Symbol("BTCUSDT"))
    print(info.quantity_precision, info.tick_size)

    # Market buy 0.001 BTC
    resp = await client.place_order(
        symbol=Symbol("BTCUSDT"),
        side=OrderSide.BUY,
        quantity=Decimal("0.001"),
    )
    print(resp.order_id)

    # Check the resulting position
    positions = await client.get_positions(Symbol("BTCUSDT"))
    print(positions)


asyncio.run(main())
```

### Writing a strategy

`BaseStrategy` ties together three sources of work — a **scheduler**, an
**exchange event stream**, and an optional **datasource stream** — and drives
them from a single `start()` call. Subclass it and implement `on_init`,
`on_event`, and `on_shutdown`.

```python
import asyncio
from datetime import timedelta

from aion import Trigger
from cybotrade import Symbol, Topic
from cybotrade.io import Event, EventType
from cybotrade.strategy import BaseStrategy
from cybotrade.bybit import BybitLinearClient, BybitPrivateWS


class MyStrategy(BaseStrategy):
    def __init__(self, trader, events):
        self.trader = trader
        self.events = events
        super().__init__(
            datasource_api_key="DATASOURCE_API_KEY",
            datasource_topics=[
                Topic("bybit-linear", "candle", {"symbol": "BTCUSDT", "interval": "1m"}),
            ],
            lookback_size=200,
        )

    def on_init(self):
        # Register a recurring job (runs every 60s)
        asyncio.get_event_loop().create_task(
            self.schedule(self.rebalance, Trigger.Interval(duration=timedelta(minutes=1)))
        )

    async def rebalance(self):
        price = await self.trader.get_current_price(Symbol("BTCUSDT"))
        self.logger.info(f"mid price = {price}")

    async def on_event(self, event: Event):
        if event.event_type == EventType.DatasourceUpdate:
            topic = ...  # identify which subscribed Topic this update belongs to
            ready = self.maintain_datamap(topic, event.data["data"])
            if ready:
                df = self.datamap[topic]  # Polars DataFrame of the last N candles
                ...  # compute signals, place orders via self.trader
        elif event.event_type == EventType.OrderUpdate:
            self.logger.info(f"order update: {event.data}")

    def on_shutdown(self):
        self.logger.info("shutting down cleanly")


async def main():
    trader = BybitLinearClient(api_key="...", api_secret="...")
    events = BybitPrivateWS(api_key="...", api_secret="...", topics=["order"])
    strategy = MyStrategy(trader, events)
    await strategy.start(events)


asyncio.run(main())
```

---

## Core concepts

### `Symbol`

A parsed trading pair. Construct from a venue string and split into base/quote:

```python
from cybotrade import Symbol

s = Symbol("BTCUSDT")
s.split()  # ("BTC", "USDT")
```

### `Topic` — market data

A `Topic` identifies a data feed by `provider`, `endpoint`, and query params. It
is the addressing scheme used by `cybotrade-datasource` for both historical
queries and live streams.

```python
from cybotrade import Topic

topic = Topic("bybit-linear", "candle", {"symbol": "BTCUSDT", "interval": "1m"})
topic.endpoint_with_query_params()  # "candle?symbol=BTCUSDT&interval=1m"
topic.interval()                    # timedelta(minutes=1)

# Or parse from a string
Topic.from_str("bybit-linear|candle?symbol=BTCUSDT&interval=1m")
```

When `datasource_topics` and `datasource_api_key` are supplied to
`BaseStrategy`, the harness automatically backfills `lookback_size` rows into
`self.datamap[topic]` (a Polars DataFrame) on startup and then streams live
updates as `EventType.DatasourceUpdate` events. `maintain_datamap()` keeps each
topic's rolling window at the configured size.

### `ExchangeClient`

The REST trading interface implemented by every exchange client:

| Method | Returns |
| ------ | ------- |
| `place_order(symbol, side, quantity, limit=None, ...)` | `OrderResponse` |
| `cancel_order(symbol, order_id=None, client_order_id=None)` | `OrderResponse` |
| `get_positions(symbol=None)` | `list[Position]` |
| `get_wallet_balance(coin=None)` | `Balance` |
| `get_order_details(symbol, order_id=None, client_order_id=None)` | `OrderUpdate \| None` |
| `get_order_details_from_history(...)` | `OrderUpdate \| None` |
| `get_open_orders(symbol=None)` | `list[OrderUpdate]` |
| `get_symbol_info(symbol)` | `SymbolInfo` |
| `get_orderbook_snapshot(symbol)` | `OrderbookSnapshot` |
| `get_current_price(symbol)` | `Decimal` (mid of best bid/ask) |

All prices and quantities are `Decimal` to avoid floating-point drift.

### Events

`on_event` receives an `Event` whose `event_type` is one of:

`Authenticated`, `Subscribed`, `OrderUpdate`, `DatasourceSubscribed`,
`DatasourceUpdate`, `Error`, `Unknown`.

`event.data` holds the parsed payload; `event.orig` holds the raw message.

---

## Utilities

```python
from decimal import Decimal
from cybotrade.utils import getenv, truncate_decimal, round_to_tick, extract_precision

getenv("BYBIT_API_KEY")                          # raises if unset
truncate_decimal(Decimal("1.23456"), 3)          # Decimal("1.234")
round_to_tick(Decimal("100.07"), Decimal("0.1")) # Decimal("100.1")
extract_precision(Decimal("0.001"))              # 3
```

### Logging

```python
import logging
from cybotrade.logging import setup_logger, make_colorlog_stream_handler

setup_logger(log_level=logging.INFO, handlers=[make_colorlog_stream_handler()])
```

---

## Dependencies

Installed automatically with the package:

- [`cybotrade-datasource`](https://pypi.org/project/cybotrade-datasource/) — market-data access
- [`aion-scheduler`](https://pypi.org/project/aion-scheduler/) — job scheduling
- [`polars`](https://pypi.org/project/polars/) — DataFrame engine
- `requests`, `colorlog`, `eth-hash[pycryptodome]`

---

## License

Copyright © Balaena Quant Sdn Bhd. All rights reserved. This software is
proprietary; see [`LICENSE`](./LICENSE) for terms.

