Metadata-Version: 2.4
Name: pbz2
Version: 0.1.0
Summary: Stream and parallel-process .bz2 files via pbzip2.
Project-URL: repository, https://github.com/gitronald/pbz2
Author-email: gitronald <gitronald@users.noreply.github.com>
License-Expression: MIT
License-File: LICENSE
Requires-Python: >=3.11
Requires-Dist: orjson>=3.11.9
Requires-Dist: typer
Description-Content-Type: text/markdown

# pbz2

Stream and parallel-process `.bz2` files via [pbzip2](http://compression.great-site.net/pbzip2/) (parallel bzip2). Falls back to the stdlib `bz2` module when the `pbzip2` binary is unavailable.

## Install

```bash
uv add pbz2
```

Install `pbzip2` for parallel decompression:

```bash
sudo apt install pbzip2     # Debian/Ubuntu
brew install pbzip2         # macOS
```

## Usage

### Iterate

```python
import pbz2

# Parsed JSON objects from a .json.bz2 file
for obj in pbz2.iter_jsonl("data.json.bz2"):
    ...

# Raw UTF-8 lines
for line in pbz2.iter_lines("data.txt.bz2"):
    ...

# Newline-aligned text chunks (useful for batched processing)
for chunk in pbz2.iter_chunks("data.txt.bz2"):
    ...
```

### Parallel processing

`process_parallel` streams chunks of newline-terminated records through a worker pool. The worker function receives raw text chunks (so parsing happens in the worker, not the main process), and `on_result` runs in the main process to handle each result as it completes.

```python
import json
import pbz2

def parse_chunk(chunk: str) -> list[dict]:
    return [json.loads(line) for line in chunk.splitlines() if line]

def save(records: list[dict]) -> None:
    ...  # write to db, file, etc.

pbz2.process_parallel(
    "data.json.bz2",
    worker_fn=parse_chunk,
    on_result=save,
    num_processes=8,
)
```

### CLI

```bash
pbz2 count data.json.bz2
pbz2 head data.json.bz2 -n 5
```

## API

| Function | Description |
| --- | --- |
| `iter_chunks(path, **opts)` | Yield UTF-8 text chunks ending on a newline boundary. |
| `iter_lines(path, **opts)` | Yield non-empty UTF-8 lines (no trailing newline). |
| `iter_jsonl(path, *, loads=None, **opts)` | Yield parsed JSON objects (uses `orjson` if installed). |
| `process_parallel(path, worker_fn, *, on_result=None, worker_args=(), num_processes=None, max_pending=None, ...)` | Run `worker_fn(chunk, *worker_args)` in a process pool, dispatching results to `on_result`. |
| `open_decompress(path, **opts)` | Low-level: open a binary stream of decompressed bytes. |

### Common options

- `num_processors` — pbzip2 worker count (default: cpu_count - 1)
- `bufsize_mb` — OS pipe buffer between pbzip2 and Python (default: 32 MB)
- `stream_buffer_mb` — Python-side read chunk size (default: 4 MB)
