Metadata-Version: 2.4
Name: OrderPulse
Version: 0.2.28
Summary: High-performance exchange feed parser and orderflow analytics engine with Rust and Python bindings
Requires-Python: >=3.9
Description-Content-Type: text/markdown

# FastReader Orderbook Engine

FastReader is a high-performance Rust + Python library for reading binary market data and building an in-memory limit orderbook from order and trade messages.

The library is written in Rust for speed and exposed to Python using PyO3. Python users can read messages from a binary file, build an orderbook, inspect top bid/ask levels, export snapshot rows, and process very large files without loading the full file into RAM.

---

## Main Features

- Read binary order/trade files from Python.
- Use a RAM-based cache reader for fast repeated analysis.
- Use a streaming loader for very large files.
- Build bid/ask orderbook from binary messages or Python `list[dict]` test data.
- Support order message types:
  - `N` = New order
  - `M` = Modify order
  - `X` = Delete/cancel order
  - `T` = Trade
- Return top N snapshot levels.
- Return full market depth.
- Return CSV-style snapshot rows.
- Apply message-type filters before building the orderbook.

---

## Architecture

```text
Binary Market Data File
        |
        v
+-------------------------+       +--------------------------+
| MessageCacheReader      |       | StreamingBinaryLoader    |
| Loads full file in RAM  |       | Reads one message at a   |
| Best for repeated use   |       | time from disk           |
+-------------------------+       +--------------------------+
        |                                   |
        +---------------+-------------------+
                        |
                        v
              +--------------------+
              | OrderbookBuilder   |
              | Python-facing API  |
              +--------------------+
                        |
                        v
              +--------------------+
              | OrderBookManager   |
              | Real orderbook     |
              | logic in Rust      |
              +--------------------+
                        |
                        v
              Bid Book / Ask Book
                        |
                        v
              Snapshot / Full Depth / CSV Row
```

---

## Orderbook Logic

The orderbook has two sides:

```text
Buy side  = Bid book
Sell side = Ask book
```

Message handling:

```text
N = Add a new order
M = Modify an existing order
X = Remove an existing order
T = Reduce quantity because a trade happened
```

Book calculation:

```text
Best Bid = highest buy price
Best Ask = lowest sell price
Spread   = best ask - best bid
Mid      = (best bid + best ask) / 2
```

---

## Installation

This project is a Rust library exposed to Python. A common build method is `maturin`.

```bash
pip install maturin
maturin develop --release
```

After installation, Python should be able to import:

```python
from fastreader import MessageCacheReader, StreamingBinaryLoader, OrderbookBuilder
```

---

## Python Classes

The module exposes three main classes:

```python
MessageCacheReader
StreamingBinaryLoader
OrderbookBuilder
```

---

# 1. `MessageCacheReader`

`MessageCacheReader` loads the entire binary file into RAM.

Use this when:

- The file fits safely in memory.
- You want fast repeated analysis.
- You need to inspect all decoded messages.

Do not use this for very large files that may exceed available RAM.

---

## `MessageCacheReader()`

Creates an empty cache reader.

```python
from fastreader import MessageCacheReader

reader = MessageCacheReader()
```

Expected behavior:

```text
Creates an empty object.
No file is loaded yet.
```

---

## `load_to_cache(file_path)`

Loads the full binary file into memory and returns total decoded message count.

```python
from fastreader import MessageCacheReader

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"

reader = MessageCacheReader()
count = reader.load_to_cache(file_path)

print("Loaded:", count)
```

Example output:

```text
Loaded: 2500000
```

Purpose:

```text
Read the complete file once and keep all decoded messages in RAM.
```

Common error:

```text
RuntimeError: No such file or directory (os error 2)
```

This means the file path is wrong or the file is not visible from the current Python process.

---

## `get_all_messages()`

Returns all cached messages as readable strings.

```python
messages = reader.get_all_messages()

for msg in messages[:5]:
    print(msg)
```

Example output:

```text
Order Message: SeqNo 1, MsgLen 38, MsgType 'N', ExchTs 100000, LocalTs 100010, OrderId 1001, Token 777, Side 'B', Price 1000, Quantity 40, Missed 0
Trade Message: SeqNo 2, MsgLen 45, MsgType 'T', ExchTs 100020, LocalTs 100030, BuyOrderId 1001, SellOrderId 1002, Token 777, Price 1050, Quantity 10, Missed 0
```

Purpose:

```text
Useful for debugging binary decoding and checking what messages are inside the file.
```

Important:

```text
This returns formatted strings, not raw Rust packet objects.
```

---

## `get_cache_summary()`

Returns metadata about the loaded cache.

```python
summary = reader.get_cache_summary()
print(summary)
```

Example output:

```python
{
    'file_source': '/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025',
    'total_messages': 2500000,
    'total_orders': 2100000,
    'total_trades': 400000,
    'memory_usage_bytes': 280000000
}
```

Purpose:

```text
Check file source, total messages, order/trade count, and approximate RAM usage.
```

---

# 2. `StreamingBinaryLoader`

`StreamingBinaryLoader` reads the binary file one message at a time.

Use this when:

- The file is very large.
- You want low memory usage.
- You only need sequential processing.

It keeps the file on disk and reads only the next message when requested.

---

## `StreamingBinaryLoader()`

Creates an empty streaming loader.

```python
from fastreader import StreamingBinaryLoader

loader = StreamingBinaryLoader()
```

Expected behavior:

```text
Creates an empty object.
No file stream is open yet.
```

---

## `open_stream(file_path, count_messages=True)`

Opens the file and prepares it for one-by-one reading.

```python
from fastreader import StreamingBinaryLoader

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"

loader = StreamingBinaryLoader()
count = loader.open_stream(file_path, count_messages=False)

print("Count:", count)
```

Example output when `count_messages=False`:

```text
Count: 0
```

This is expected. It means the loader did not scan the full file for counting.

Example with counting enabled:

```python
count = loader.open_stream(file_path, count_messages=True)
print("Total messages:", count)
```

Example output:

```text
Total messages: 2500000
```

Important:

```text
count_messages=True scans the full file once to count messages.
For huge files, count_messages=False is faster to start.
```

---

## `get_next_message()`

Reads the next message from the open file stream and returns it as a formatted string.

```python
msg = loader.get_next_message()
print(msg)
```

Example output:

```text
Order Message: SeqNo 1, MsgLen 38, MsgType 'N', ExchTs 100000, LocalTs 100010, OrderId 1001, Token 777, Side 'B', Price 1000, Quantity 40, Missed 0
```

Read first 10 messages:

```python
for _ in range(10):
    msg = loader.get_next_message()
    if msg == "END":
        break
    print(msg)
```

When the file ends, output is:

```text
END
```

Purpose:

```text
Inspect messages one by one without loading the whole file into RAM.
```

---

## `reset_cursor()`

Moves the file pointer back to the beginning.

```python
loader.get_next_message()
loader.get_next_message()

loader.reset_cursor()

print(loader.get_next_message())
```

Purpose:

```text
Use this when you want to reread the same file stream from the start.
```

---

# 3. `OrderbookBuilder`

`OrderbookBuilder` is the Python-facing engine that builds the orderbook.

It receives messages from:

```text
MessageCacheReader
StreamingBinaryLoader
Python list[dict]
```

Internally, it forwards each valid message to the Rust `OrderBookManager`, which maintains the real bid/ask state.

---

## `OrderbookBuilder()`

Creates an empty orderbook engine.

```python
from fastreader import OrderbookBuilder

builder = OrderbookBuilder()
```

Expected behavior:

```text
Creates empty bid/ask books.
No messages are processed yet.
```

---

## `apply_filter(logic_criteria=None)`

Controls which message types should be processed.

```python
builder.apply_filter(["N", "M", "X", "T"])
```

Meaning:

```text
N = process new order messages
M = process modify order messages
X = process cancel/delete order messages
T = process trade messages
```

Examples:

```python
# Process all useful orderbook messages
builder.apply_filter(["N", "M", "X", "T"])

# Process only order-side messages, ignore trades
builder.apply_filter(["N", "M", "X"])

# Process only trades
builder.apply_filter(["T"])

# Remove filter and process everything
builder.apply_filter(None)
```

Purpose:

```text
Useful when you want to test or build the book using only selected message types.
```

---

## `build_from_list(source)`

Builds orderbook from either:

```text
1. MessageCacheReader
2. Python list[dict]
```

---

### Example A: Build from `MessageCacheReader`

```python
from fastreader import MessageCacheReader, OrderbookBuilder

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"

reader = MessageCacheReader()
reader.load_to_cache(file_path)

builder = OrderbookBuilder()
processed = builder.build_from_list(reader)

print("Processed:", processed)
```

Example output:

```text
Processed: 2500000
```

Purpose:

```text
Use cached RAM messages to build orderbook quickly.
```

---

### Example B: Build from Python test messages

This is the best way to test orderbook logic without a binary file.

```python
from fastreader import OrderbookBuilder

messages = [
    {
        "msg_type": "N",
        "order_id": 1,
        "token": 777,
        "order_type": "B",
        "price": 1000,
        "quantity": 40,
    },
    {
        "msg_type": "N",
        "order_id": 2,
        "token": 777,
        "order_type": "S",
        "price": 1100,
        "quantity": 20,
    },
]

builder = OrderbookBuilder()
processed = builder.build_from_list(messages)

print("Processed:", processed)
print(builder.get_snapshot(777, levels=5))
```

Example output:

```python
Processed: 2
{
    'token': 777,
    'found': True,
    'mid_price': 1050,
    'best_bid': (1000, 40),
    'best_ask': (1100, 20),
    'spread': 100,
    'bids': [(1000, 40)],
    'asks': [(1100, 20)]
}
```

---

## Python Dictionary Message Format

### New/Modify/Delete Order Message

Required keys:

```python
{
    "msg_type": "N",        # "N", "M", or "X"
    "order_id": 1,
    "token": 777,
    "order_type": "B",     # "B" = buy, "S" = sell
    "price": 1000,
    "quantity": 40,
}
```

Optional keys:

```python
{
    "exch_ts": 100000,
    "local_ts": 100010,
    "flags": False,
}
```

### Trade Message

Required keys:

```python
{
    "msg_type": "T",
    "buy_order_id": 1,
    "sell_order_id": 2,
    "token": 777,
    "trade_quantity": 10,
}
```

Optional keys:

```python
{
    "exch_ts": 100020,
    "trade_price": 1050,
    "local_ts": 100030,
    "flags": False,
}
```

---

## `build_from_source(source, limit=None)`

Builds orderbook from either:

```text
MessageCacheReader
StreamingBinaryLoader
```

This is the preferred method when you want one common function for both cache and stream sources.

---

### Example A: Build from stream

```python
from fastreader import StreamingBinaryLoader, OrderbookBuilder

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"

token = 777

loader = StreamingBinaryLoader()
loader.open_stream(file_path, count_messages=False)

builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X", "T"])

processed = builder.build_from_source(loader)

print("Processed:", processed)
print(builder.get_snapshot(token, levels=5))
```

Example output:

```text
Processed: 2500000
{'token': 777, 'found': True, 'mid_price': 1050, 'best_bid': (1000, 40), 'best_ask': (1100, 20), 'spread': 100, 'bids': [(1000, 40)], 'asks': [(1100, 20)]}
```

---

### Example B: Build only first N messages

```python
loader = StreamingBinaryLoader()
loader.open_stream(file_path, count_messages=False)

builder = OrderbookBuilder()
processed = builder.build_from_source(loader, limit=10000)

print("Processed:", processed)
```

Example output:

```text
Processed: 10000
```

Purpose:

```text
Useful for testing large files quickly.
```

---

## `get_snapshot(token, levels=None)`

Returns top N bid/ask levels for a token.

Default levels:

```text
5
```

Example:

```python
snapshot = builder.get_snapshot(777, levels=5)
print(snapshot)
```

Example output:

```python
{
    'token': 777,
    'found': True,
    'mid_price': 1050,
    'best_bid': (1000, 40),
    'best_ask': (1100, 20),
    'spread': 100,
    'bids': [(1000, 40)],
    'asks': [(1100, 20)]
}
```

If token is not found:

```python
{
    'token': 999999,
    'found': False,
    'mid_price': 0,
    'best_bid': None,
    'best_ask': None,
    'spread': None,
    'bids': [],
    'asks': []
}
```

Field meaning:

| Field | Meaning |
|---|---|
| `token` | Instrument token |
| `found` | Whether orderbook exists for token |
| `mid_price` | Average of best bid and best ask |
| `best_bid` | Highest buy price level |
| `best_ask` | Lowest sell price level |
| `spread` | Best ask minus best bid |
| `bids` | Top N bid levels |
| `asks` | Top N ask levels |

---

## `get_full_depth(token)`

Returns every non-zero bid/ask level for one token.

```python
depth = builder.get_full_depth(777)
print(depth)
```

Example output:

```python
{
    'token': 777,
    'found': True,
    'best_bid': (1000, 40),
    'best_ask': (1100, 20),
    'spread': 100,
    'bids': [(1000, 40), (990, 50), (980, 10)],
    'asks': [(1100, 20), (1110, 30), (1120, 15)]
}
```

Purpose:

```text
Use this when you need complete market depth, not only top 5 levels.
```

---

## `snapshot_header()`

Returns CSV column names for snapshot rows.

```python
header = builder.snapshot_header()
print(header)
```

Example output:

```text
local_ts,exch_ts,mid_price,bid_price_0,bid_qty_0,ask_price_0,ask_qty_0,bid_price_1,bid_qty_1,ask_price_1,ask_qty_1,bid_price_2,bid_qty_2,ask_price_2,ask_qty_2,bid_price_3,bid_qty_3,ask_price_3,ask_qty_3,bid_price_4,bid_qty_4,ask_price_4,ask_qty_4
```

Purpose:

```text
Use this as the first row when writing snapshots to CSV.
```

---

## `get_snapshot_row(token, levels=None)`

Returns one CSV-formatted snapshot row for the token.

```python
row = builder.get_snapshot_row(777, levels=5)
print(row)
```

Example output:

```text
0,0,1050,1000,40,1100,20,990,50,1110,30,980,10,1120,15,0,0,0,0,0,0,0,0
```

Important:

```text
Current implementation sets local_ts and exch_ts to 0 in get_snapshot_row().
```

Purpose:

```text
Use this when exporting orderbook snapshots into CSV format.
```

---

# Complete Usage Examples

## Example 1: Low-memory build from large binary file

```python
from fastreader import StreamingBinaryLoader, OrderbookBuilder

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"
token = 777

loader = StreamingBinaryLoader()
loader.open_stream(file_path, count_messages=False)

builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X", "T"])

processed = builder.build_from_source(loader)

print("Processed messages:", processed)
print("Snapshot:", builder.get_snapshot(token, levels=5))
print("Full depth:", builder.get_full_depth(token))
```

---

## Example 2: Fast repeated analysis using cache

```python
from fastreader import MessageCacheReader, OrderbookBuilder

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"
token = 777

reader = MessageCacheReader()
loaded = reader.load_to_cache(file_path)

print("Loaded messages:", loaded)
print("Summary:", reader.get_cache_summary())

builder = OrderbookBuilder()
processed = builder.build_from_list(reader)

print("Processed messages:", processed)
print("Snapshot:", builder.get_snapshot(token, levels=5))
```

---

## Example 3: Unit test using Python messages

```python
from fastreader import OrderbookBuilder

messages = [
    {"msg_type": "N", "order_id": 1, "token": 777, "order_type": "B", "price": 1000, "quantity": 40},
    {"msg_type": "N", "order_id": 2, "token": 777, "order_type": "S", "price": 1100, "quantity": 20},
    {"msg_type": "T", "buy_order_id": 1, "sell_order_id": 2, "token": 777, "trade_price": 1050, "trade_quantity": 10},
]

builder = OrderbookBuilder()
processed = builder.build_from_list(messages)

print("Processed:", processed)
print(builder.get_snapshot(777, levels=5))
```

Expected output:

```python
Processed: 3
{
    'token': 777,
    'found': True,
    'mid_price': 1050,
    'best_bid': (1000, 30),
    'best_ask': (1100, 10),
    'spread': 100,
    'bids': [(1000, 30)],
    'asks': [(1100, 10)]
}
```

Explanation:

```text
Buy order started with 40 quantity.
Sell order started with 20 quantity.
Trade quantity was 10.
After trade:
Buy quantity = 40 - 10 = 30
Sell quantity = 20 - 10 = 10
```

---

## Example 4: Write snapshot to CSV

```python
from fastreader import StreamingBinaryLoader, OrderbookBuilder

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"
token = 777

loader = StreamingBinaryLoader()
loader.open_stream(file_path, count_messages=False)

builder = OrderbookBuilder()
builder.build_from_source(loader)

with open("snapshot.csv", "w") as f:
    f.write(builder.snapshot_header() + "\n")
    f.write(builder.get_snapshot_row(token, levels=5) + "\n")
```

---

# Choosing the Correct Reader

| Use Case | Recommended Class |
|---|---|
| Huge file | `StreamingBinaryLoader` |
| Low RAM usage | `StreamingBinaryLoader` |
| Repeated analysis on same file | `MessageCacheReader` |
| Debug decoded messages | `MessageCacheReader.get_all_messages()` or `StreamingBinaryLoader.get_next_message()` |
| Manual orderbook test | `OrderbookBuilder.build_from_list(list_of_dicts)` |

---

# Common Errors

## `No such file or directory (os error 2)`

Cause:

```text
The file path is wrong, has an extra space, or the file is not accessible.
```

Check path:

```python
from pathlib import Path

file_path = Path("/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025")

print(file_path.exists())
print(file_path.is_file())
print(file_path.resolve())
```

---

## `END` from `get_next_message()`

Cause:

```text
The stream has reached end of file.
```

Fix:

```python
loader.reset_cursor()
```

---

## Snapshot returns `found: False`

Cause:

```text
No book was built for that token, or the token value is wrong.
```

Fix:

```python
print(reader.get_all_messages()[:10])
```

Check actual token values from decoded messages.

---

# Developer Notes

## Internal Message Flow

```text
Binary bytes
   -> OrderPacket / TradePacket
   -> Message::Order / Message::Trade
   -> OrderbookBuilder.process_message()
   -> OrderBookManager.process_order_message()
   -> OrderBookManager.process_trade_message()
   -> bid_levels / ask_levels updated
   -> snapshot returned to Python
```

## Performance Design

- Rust handles binary parsing and orderbook updates.
- Python only calls high-level APIs.
- Streaming mode keeps memory usage low.
- Cache mode gives faster repeated access but uses more RAM.

---

# Minimal Working Script

```python
from fastreader import StreamingBinaryLoader, OrderbookBuilder

file_path = "/nas/50.30/NSE_CM/Feed_CM_StreamID_2_29_12_2025"
token = 777

loader = StreamingBinaryLoader()
loader.open_stream(file_path, count_messages=False)

builder = OrderbookBuilder()
builder.apply_filter(["N", "M", "X", "T"])

count = builder.build_from_source(loader)

print("Processed:", count)
print(builder.get_snapshot(token, levels=5))
```

---

# Summary

```text
MessageCacheReader    -> Full file in RAM
StreamingBinaryLoader -> One message at a time from disk
OrderbookBuilder      -> Builds bid/ask orderbook
get_snapshot          -> Top N orderbook levels
get_full_depth        -> Complete depth
get_snapshot_row      -> CSV row output
```

FastReader is best used when Python needs to analyze high-volume binary market data while Rust handles performance-critical parsing and orderbook construction.

