Metadata-Version: 2.4
Name: threadmsg
Version: 0.2.6
Summary: Thread safe message queue
Home-page: https://github.com/wheresjames/threadmsg
Author: Robert Umbehant
Author-email: threadmsg@wheresjames.com
License: MIT
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: author
Dynamic: author-email
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: license
Dynamic: license-file
Dynamic: summary

# threadmsg

A lightweight Python library that runs an **asyncio event loop in a background thread** and lets you send messages to it safely from any other thread.

It is particularly useful for serializing work onto a single background thread — for example, managing a network connection, a database handle, or any resource that must be touched from one thread at a time.

```
pip install threadmsg
```

&nbsp;

## Table of contents

- [How it works](#how-it-works)
- [Quick start](#quick-start)
- [Examples](#examples)
  - [1 · Simple message passing](#1--simple-message-passing)
  - [2 · Passing parameters to the thread](#2--passing-parameters-to-the-thread)
  - [3 · Function mapping with call()](#3--function-mapping-with-call)
  - [4 · Waiting for a reply (async)](#4--waiting-for-a-reply-async)
  - [5 · Waiting for a reply (sync / no event loop)](#5--waiting-for-a-reply-sync--no-event-loop)
- [Thread function return values](#thread-function-return-values)
- [API reference](#api-reference)
  - [Constructor](#threadmsgf-p-starttrue-deffknone)
  - [Thread control](#thread-control)
  - [Sending messages](#sending-messages)
  - [Reading messages](#reading-messages-inside-the-thread-function)
  - [Function mapping](#function-mapping-inside-the-thread-function)
  - [ThreadMsgReply](#threadmsgreply)
  - [Thread function context](#thread-function-context-ctx)
  - [Error handling](#error-handling)
- [Running the tests](#running-the-tests)
- [Links](#links)
- [Alternatives](#alternatives)
  - [pykka](#pykka)
  - [janus](#janus)
  - [concurrent.futures](#concurrentfutures-stdlib)
  - [asyncio + threading](#asyncio--threading-stdlib)
  - [Summary table](#summary-table)

&nbsp;

---

## <a id="how-it-works"></a>How it works

`ThreadMsg` spins up a background thread that runs its own `asyncio` event loop. You write an `async` function — the *thread function* — that is called repeatedly by the loop. It receives a `ctx` handle back to the `ThreadMsg` object so it can pull messages off the queue, call helpers, and check whether the thread should keep running.

```
  Main thread(s)                 Background thread
  ──────────────                 ─────────────────
  t.addMsg(data)   ──[queue]──►  async def myThread(ctx):
  t.call(...)      ──[queue]──►      msg = ctx.getMsg()
  t.stop()         ──[event]──►      ...
```

Everything in the queue is processed sequentially on the background thread, so you never need locks around the resource you are protecting.

&nbsp;

---

## <a id="quick-start"></a>Quick start

```python
import asyncio
import threadmsg as tm

async def myThread(ctx):
    msg = ctx.getMsgData()
    if msg:
        print('received:', msg)
        return -1   # stop after the first message

t = tm.ThreadMsg(myThread)
t.addMsg('hello')
t.join()
```

&nbsp;

---

## <a id="examples"></a>Examples

### <a id="1--simple-message-passing"></a>1 · Simple message passing

The thread function is called whenever a message arrives (or the wait period expires). Pulling a message off the queue with `getMsgData()` returns the raw data, or `None` if the queue is empty.

```python
import threadmsg as tm

async def msgThread(ctx):
    msg = ctx.getMsgData()
    if msg:
        print('got:', msg)
        return -1   # exit after processing one message

t = tm.ThreadMsg(msgThread)
t.addMsg('Hello, thread!')
t.join()
```

&nbsp;

### <a id="2--passing-parameters-to-the-thread"></a>2 · Passing parameters to the thread

Extra positional parameters passed to `ThreadMsg` are forwarded to the thread function on every call.

```python
import threadmsg as tm

async def counterThread(ctx, limit):
    if not ctx.run:
        print('exiting')
        return

    print('loop', ctx.loops)

    if ctx.loops >= limit - 1:
        return -1   # done

    return 0.1      # call again after 100 ms

t = tm.ThreadMsg(counterThread, (5,))
t.join()
# prints: loop 0 … loop 4, then 'exiting'
```

&nbsp;

### <a id="3--function-mapping-with-call"></a>3 · Function mapping with `call()`

The most common pattern is to subclass `ThreadMsg`, define methods on the class, and use `mapMsgAsync` + `call()` to dispatch work. This keeps all resource access inside the background thread while the public API stays simple.

```python
import asyncio
import threadmsg as tm

class MyWorker(tm.ThreadMsg):

    def __init__(self):
        self.callMap = {
            'add': self.add,
            'greet': self.greet,
        }
        super().__init__(self.msgThread, deffk='_fn')

    @staticmethod
    async def msgThread(ctx):
        while msg := ctx.getMsg():
            await ctx.mapMsgAsync(None, ctx.callMap, msg)
        # returning None → wait until the next message arrives

    def add(self, a, b):
        return a + b

    async def greet(self, name):
        return f'hello, {name}!'


async def main():
    worker = MyWorker()

    # Fire-and-forget with a callback
    def on_result(ctx, params, result, err):
        print('add result:', result)

    worker.call(on_result, 'add', a=3, b=4)

    # Await the reply directly
    reply = worker.call('greet', name='world')
    if await reply.wait(5):
        print(reply.getData())   # hello, world!

    worker.join(True)

asyncio.run(main())
```

&nbsp;

### <a id="4--waiting-for-a-reply-async"></a>4 · Waiting for a reply (async)

`call()` returns a `ThreadMsgReply` when no callback is provided. Inside an async context the running event loop is captured automatically, and `reply.wait(timeout)` suspends the caller until the result is ready.

```python
async def main():
    worker = MyWorker()

    reply = worker.call('add', a=10, b=20)

    if await reply.wait(timeout=5):
        print(reply.getData())    # 30
    else:
        print('timed out')

    worker.join(True)
```

&nbsp;

### <a id="5--waiting-for-a-reply-sync--no-event-loop"></a>5 · Waiting for a reply (sync / no event loop)

Outside an async context there is no event loop to attach to, so use a callback or poll `isData()` / `isError()` directly.

```python
import time
import threadmsg as tm

worker = MyWorker()

# Option A: callback
result = []
done = threading.Event()

def cb(ctx, params, ret, err):
    result.append(ret)
    done.set()

worker.call(cb, 'add', a=7, b=8)
done.wait(timeout=5)
print(result[0])   # 15

# Option B: pass a no-loop ThreadMsgReply and poll isData()
tmr = tm.ThreadMsg.ThreadMsgReply(None)

def deliver(ctx, params, ret, err):
    tmr.setData(ret)

worker.call(deliver, 'add', a=1, b=2)

while not tmr.isData():
    time.sleep(0.01)

print(tmr.data)   # 3

worker.join(True)
```

&nbsp;

---

## <a id="thread-function-return-values"></a>Thread function return values

The value returned by the thread function controls when it is called again.

| Return value | Behaviour |
|---|---|
| `None` (or no return) | Wait indefinitely; wake up when a message arrives or `notify()` is called |
| `0` | Call again immediately |
| `0.5` (any positive number) | Wait that many **seconds**, then call again |
| `-1` (any negative number) | Stop the thread |

After a stop is requested (via a negative return value, or `stop()` / `join(True)` from outside), the thread function is called **one final time** with `ctx.run = False`. Use this to release resources cleanly.

```python
async def managed(ctx):
    if not ctx.run:
        print('cleaning up')
        db.close()
        return

    # normal work …
```

&nbsp;

---

## <a id="api-reference"></a>API reference

### <a id="threadmsgf-p-starttrue-deffknone"></a>`ThreadMsg(f, p=(), start=True, deffk=None)`

Creates a background thread running `f`.

| Parameter | Description |
|---|---|
| `f` | `async def f(ctx, *p)` — the thread function |
| `p` | Tuple of extra arguments forwarded to `f` on every call |
| `start` | Start the thread immediately (default `True`) |
| `deffk` | Default function-key name used by `call()` and the `mapXxx` helpers |

---

### <a id="thread-control"></a>Thread control

| Method | Description |
|---|---|
| `start()` | Start the thread (only needed when `start=False`) |
| `stop()` | Signal the thread to stop; does not block |
| `join(stop=False)` | Wait for the thread to finish; pass `True` to stop it first |
| `notify()` | Wake the thread out of a timed wait without stopping it |

---

### <a id="sending-messages"></a>Sending messages

| Method | Description |
|---|---|
| `addMsg(data, cb=None)` | Push `data` onto the queue; `cb(ctx, data, result, err)` is called with the result if provided |
| `call(*args, **kwargs)` | High-level dispatch — extracts a callback, function name, and params dict from `args` in any order; returns a `ThreadMsgReply` when no callback is given |

`call()` accepts its arguments in any order:

```python
# All of these are equivalent
worker.call(my_cb, 'add', a=1, b=2)
worker.call(my_cb, {'_fn': 'add', 'a': 1}, b=2)
worker.call(my_cb, _fn='add', a=1, b=2)
```

---

### <a id="reading-messages-inside-the-thread-function"></a>Reading messages (inside the thread function)

| Method | Description |
|---|---|
| `getMsg()` | Pop the next message as `{'data': ..., 'cb': ...}`, or `None` |
| `getMsgData()` | Pop the next message and return just the data, or `None` |

---

### <a id="function-mapping-inside-the-thread-function"></a>Function mapping (inside the thread function)

These helpers inspect the function signature and match parameters by name, so you do not have to unpack dicts manually. They also handle invoking the message callback.

| Method | Description |
|---|---|
| `mapCall(f, fm, params, **kw)` | Call `f` (or look it up in `fm`) with matching params |
| `mapCallAsync(f, fm, params, **kw)` | Same as above, but awaits the result if the function is async |
| `mapMsg(f, fm, msg)` | `mapCall` + invokes `msg['cb']` with the result or error |
| `mapMsgAsync(f, fm, msg)` | `mapCallAsync` + invokes `msg['cb']`, awaiting it if async |

`f` can be:
- A callable — called directly.
- A string — used as the key to look up the function name inside `params`.
- `None` — falls back to the `deffk` default key.

---

### <a id="threadmsgreply"></a>`ThreadMsgReply`

Returned by `call()` when no callback is provided.

| Method | Description |
|---|---|
| `await wait(timeout)` | Wait up to `timeout` seconds; returns `True` if data arrived, `False` on timeout |
| `getData()` | Return the result, or `None` if not yet signalled |
| `getError()` | Return the exception, or `None` if not yet signalled |
| `isData()` | `True` if `setData()` has been called (works without an event loop) |
| `isError()` | `True` if `setError()` has been called (works without an event loop) |
| `getParams()` | Return the params dict that was associated with this reply |

> **Note:** `getData()` and `getError()` require an async context (event loop) to confirm completion. In a sync context use `isData()` / `isError()` and access `.data` / `.err` directly.

---

### <a id="thread-function-context-ctx"></a>Thread function context (`ctx`)

Inside the thread function `ctx` is the `ThreadMsg` instance itself, so all of the methods above are available. Two read-only attributes are also useful:

| Attribute | Description |
|---|---|
| `ctx.loops` | Number of times the thread function has completed a normal (non-exit) iteration |
| `ctx.run` | `True` while running; `False` during the final exit call |

---

### <a id="error-handling"></a>Error handling

By default errors raised inside the thread function are printed to stdout. Replace `on_threadmsg_error` to handle them yourself:

```python
t = tm.ThreadMsg(myThread)
t.on_threadmsg_error = lambda e: logging.error('thread error: %s', e)
```

&nbsp;

---

## <a id="running-the-tests"></a>Running the tests

```
pip install pytest pytest-asyncio
pytest
```

&nbsp;

---

## <a id="links"></a>Links

- **PyPI:** https://pypi.org/project/threadmsg/
- **Repository:** https://github.com/wheresjames/threadmsg
- **Issues:** https://github.com/wheresjames/threadmsg/issues
- **License:** MIT

&nbsp;

---

## <a id="alternatives"></a>Alternatives

### <a id="pykka"></a>[pykka](https://pykka.readthedocs.io/)

pykka implements the [actor model](https://en.wikipedia.org/wiki/Actor_model). Each actor is a class whose instances run in their own thread. Callers interact with a transparent proxy object — `proxy.my_method(a, b)` queues the call behind the scenes and returns a `Future` for the result.

**Key differences:**
- Calls are made through a proxy (`actor_ref.proxy().method(...)`) rather than by explicitly pushing messages onto a queue — there is no `getMsg()` equivalent
- `ThreadingActor` (the default) uses a plain thread with no asyncio; `AsyncioActor` (v3+) is async-native but runs inside the *caller's* existing event loop, not a dedicated background thread
- The actor has no mechanism to control its own call cadence — it cannot sleep, retry immediately, or schedule its next run via a return value
- pykka is more mature with broader adoption, more documentation, and a larger community

**Choose pykka if:** your design maps naturally to objects with named methods and you want to call them as if they were ordinary Python calls, just serialized onto a background thread.

**Choose threadmsg if:** you need asyncio available inside the background thread, want direct access to the message queue, or need the worker to control its own polling and scheduling via return values.

---

### <a id="janus"></a>[janus](https://github.com/aio-libs/janus)

janus provides a single queue object with two faces: a `queue.Queue`-compatible synchronous interface and an `asyncio.Queue`-compatible async interface. Any thread can put to one side while a coroutine gets from the other, safely.

**Key differences:**
- janus is a queue primitive only — it provides no thread lifecycle management, no dispatch, no reply mechanism, and no function routing
- You create and manage your own threads, event loops, and any reply/callback logic
- Very small, focused API; the entire library is essentially one class

**Choose janus if:** you are already managing your own threads and asyncio loop and need only a well-tested, minimal queue to pass data between them.

**Choose threadmsg if:** you want a complete solution — thread lifecycle, message dispatch, function routing, and reply objects — rather than a primitive you wire together yourself.

---

### <a id="concurrentfutures-stdlib"></a>[concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) (stdlib)

`concurrent.futures` submits callables to a managed pool of worker threads or processes and returns `Future` objects. The pool controls scheduling; you do not own any individual thread.

**Key differences:**
- Work is distributed across a pool of workers, not serialized onto a single dedicated thread — unsuitable for protecting a resource that must be touched from only one thread
- Workers run synchronous code; there is no asyncio event loop inside worker threads
- No persistent message queue — work is submitted as `executor.submit(fn, *args)` rather than pushed onto a queue
- Ships with Python's standard library; no extra dependencies required

**Choose concurrent.futures if:** you want to run many independent tasks in parallel, do not need a single dedicated thread, and want zero extra dependencies.

**Choose threadmsg if:** you need one background thread that exclusively owns a resource, need asyncio available inside that thread, or need the worker to receive structured messages with associated callbacks.

---

### <a id="asyncio--threading-stdlib"></a>[asyncio + threading](https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading) (stdlib)

Python's stdlib includes the raw primitives to build any thread/async bridge yourself: `asyncio.Queue`, `loop.call_soon_threadsafe()`, `loop.run_coroutine_threadsafe()`, and `asyncio.to_thread()`.

**Key differences:**
- No abstraction — all thread lifecycle management, queue wiring, reply handling, error propagation, and function dispatch must be implemented manually
- `asyncio.Queue` is not safe to put to from a synchronous thread directly; doing so correctly requires `call_soon_threadsafe()`
- Maximum flexibility and zero extra dependencies, at the cost of significant boilerplate

**Choose asyncio + threading if:** you cannot add dependencies, are already deep inside the asyncio ecosystem and want direct control, or have requirements that an abstraction layer would prevent you from meeting.

**Choose threadmsg if:** you want the thread lifecycle, queue, reply objects, and function routing handled for you so you can focus on the work the thread actually performs.

&nbsp;

### <a id="summary-table"></a>Summary table

| | threadmsg | pykka | janus | concurrent.futures | asyncio + threading |
|---|---|---|---|---|---|
| Dedicated single background thread | Yes | Yes | No | No | No |
| asyncio event loop in that thread | Yes | No ¹ | No | No | Yes (manual) |
| Cross-thread message queue | Yes | Yes | Yes | No | No |
| Return value / reply to caller | Yes | Yes | No | Yes | Yes |
| Named function routing | Yes | Yes (proxy) | No | No | No |
| Worker controls its own schedule | Yes | No | No | No | No |
| No extra dependencies | No | No | No | Yes | Yes |
| Relative complexity | Low | Low | Low | Low | Medium |

> ¹ pykka's `ThreadingActor` uses a plain thread with no asyncio. `AsyncioActor` (v3+) is async-native but shares the caller's event loop rather than running in a dedicated background thread.
