Metadata-Version: 2.4
Name: event-collector
Version: 0.2.2
Summary: A toolkit for creating lightweight event processing pipelines
Author: Kevin Chan
Author-email: Kevin Chan <kc@kchan.io>
License-Expression: MIT
License-File: LICENSE
Classifier: Typing :: Typed
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.14
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Topic :: System :: Networking
Classifier: Development Status :: 3 - Alpha
Requires-Dist: orjson>=3.10.16
Requires-Dist: structlog>=25.2.0
Requires-Python: >=3.13
Project-URL: Homepage, https://codeberg.org/kchan/event-collector
Project-URL: Documentation, https://kchan.codeberg.page/event-collector/
Project-URL: Repository, https://codeberg.org/kchan/event-collector
Project-URL: Issues, https://codeberg.org/kchan/event-collector/issues
Description-Content-Type: text/markdown

![](https://img.shields.io/badge/version-0.2.2-blue) ![](https://img.shields.io/badge/license-MIT-blue)

# event-collector

Event-collector is a toolkit for creating lightweight event processing pipelines.

![](docs/source/images/event-collector.svg)

This library implements an in-memory [competing consumer](https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers) queue using only Python standard library functions and `asyncio` primitives. The *collector queue* runs as `asyncio` background tasks and can autoscale according to load. Using collector queues you can set up data processing workflows quickly without having to deploy a heavy stack of external services.

## Requirements

* `>= Python 3.13`

The only external dependencies are the following:

- `orjson`
- `structlog`

## Documentation

* [event-collector documentation](https://kchan.codeberg.page/event-collector/)

## Quickstart example

To run a collector queue with an event collector, we need to perform the following steps:

1.  create a `Collector` with an event handling function to process events;
2.  configure a `CollectorQueue` to run background consumer tasks that will wait for input events;
3.  run the collector queue and publish events to the queue.

``` python
"""
example.py
"""

import asyncio
import json
import sys

from event_collector import (
    Collector,
    CollectorQueue,
    Context,
    Payload,
    Ok,
    Result,
)


async def event_handler(payload: Payload, ctx: Context) -> Result:
    """
    Extract the input data and return a result dict wrapped in
    an "Ok" Result object. Also simulate a backend processing delay.
    """
    data = payload.value
    result = {
        "data": data,
        "status": "OK",
    }
    await asyncio.sleep(1)
    return Ok(result)


async def main() -> None:
    """
    Create a Collector with the event handler and initialize
    a CollectorQueue with it. Run the queue and send a batch
    of event payloads to the collector.

    We'll set "report_interval" to a positive value (a
    report interval in seconds) so the collector queue will
    print runtime metrics at the specified interval on the
    stdout log output.
    """
    collector = Collector("demo-collector", event_handler)
    async with CollectorQueue(
        [collector],
        min_consumers=1,
        max_consumers=100,
        report_interval=1.0,
    ) as collector_queue:

        # Send event payloads to the event handler.
        publisher = collector_queue.get_publisher()

        for i in range(3):
            payload = Payload({"payload": f"payload data {i}"})
            event = await publisher.send(payload)

            # The publisher.send() call returns a CollectorEvent
            # object. We await the event object -- it will block
            # until the result is ready -- then fetch the
            # results from a ResultList.
            result_list = await event.wait()
            result = result_list.first().unwrap()

            # Print the result in JSON format.
            print(json.dumps(result))


if __name__ == "__main__":
    asyncio.run(main())
    sys.exit(0)
```

The collector queue will emit logs with runtime metrics.

Run the example code using the following command (we'll pipe the output to `jq` to format the log output):

``` bash
python example.py | jq .
```

Here is the printed result with part of the metrics log output:

``` bash
(.venv) [event-collector]$ python example.py | jq .
...
[snip]
{
  "logger": "CollectorQueue-107974cacc474a3298202c299d1497d8",
  "metrics_timestamp": "2026-05-06T05:57:01.322421+00:00",
  "qsize": 0,
  "busy": 1,
  "consumers": 2,
  "load": 0.5,
  "avg_load": 0.5,
  "service_time": 1.0007,
  "wait_time": 1.0023,
  "arrival_rate": 0.0,
  "departure_rate": 0.0,
  "wip": 0.0,
  "t_delta": 0.0,
  "event_count": 2,
  "event": "collector queue metrics",
  "level": "info",
  "timestamp": "2026-05-06T05:57:01.399931Z"
}
{
  "data": {
    "payload": "payload data 0"
  },
  "status": "OK"
}
{
  "data": {
    "payload": "payload data 1"
  },
  "status": "OK"
}
{
  "data": {
    "payload": "payload data 2"
  },
  "status": "OK"
}
```

### Contact

- Kevin Chan `<kc@kchan.io>`
