Metadata-Version: 2.4
Name: datacontext
Version: 0.1.1
Summary: Runtime attribution for data access in Python.
Project-URL: Homepage, https://github.com/data-context-hq/datacontext
Project-URL: Repository, https://github.com/data-context-hq/datacontext
Project-URL: Issues, https://github.com/data-context-hq/datacontext/issues
Project-URL: Changelog, https://github.com/data-context-hq/datacontext/blob/main/CHANGELOG.md
Project-URL: Roadmap, https://github.com/data-context-hq/datacontext/blob/main/ROADMAP.md
Project-URL: Security, https://github.com/data-context-hq/datacontext/security
Author: DataContext contributors
License: Apache-2.0
License-File: LICENSE
Keywords: attribution,database,observability,opentelemetry,tracing
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Typing :: Typed
Requires-Python: >=3.9
Provides-Extra: bigquery
Requires-Dist: google-cloud-bigquery>=3.0; extra == 'bigquery'
Provides-Extra: dagster
Requires-Dist: dagster>=1.8; extra == 'dagster'
Provides-Extra: dbt
Requires-Dist: dbt-core>=1.7; extra == 'dbt'
Provides-Extra: otel
Requires-Dist: opentelemetry-api>=1.20; extra == 'otel'
Provides-Extra: postgres
Requires-Dist: psycopg[binary]>=3; extra == 'postgres'
Provides-Extra: snowflake
Requires-Dist: snowflake-connector-python>=3; extra == 'snowflake'
Provides-Extra: sqlalchemy
Requires-Dist: sqlalchemy>=1.4; extra == 'sqlalchemy'
Description-Content-Type: text/markdown

[![Tests](https://github.com/data-context-hq/datacontext/actions/workflows/tests.yml/badge.svg)](https://github.com/data-context-hq/datacontext/actions/workflows/tests.yml)
[![PyPI](https://img.shields.io/pypi/v/datacontext?cacheSeconds=3600)](https://pypi.org/project/datacontext/)
[![Python](https://img.shields.io/pypi/pyversions/datacontext?cacheSeconds=3600)](https://pypi.org/project/datacontext/)
[![License](https://img.shields.io/pypi/l/datacontext?cacheSeconds=3600)](https://github.com/data-context-hq/datacontext/blob/main/LICENSE)
[![Discussions](https://img.shields.io/badge/GitHub-Discussions-2ea44f)](https://github.com/data-context-hq/datacontext/discussions)
[![Roadmap](https://img.shields.io/badge/Roadmap-DataContext-blue)](https://github.com/data-context-hq/datacontext/blob/main/ROADMAP.md)
# DataContext
#### Runtime attribution for data access in Python

[Why](#why-datacontext) | [How It Works](#how-it-works) | [Quick Start](#quick-start) | [Event Shape](#event-shape) | [Production Behavior](#production-behavior) | [Roadmap](https://github.com/data-context-hq/datacontext/blob/main/ROADMAP.md)

DataContext helps developers answer a simple question:
    
> Which code path, request, job, or agent caused this query?

DataContext gives developers and platform teams more context for understanding data access patterns and improving how production services use databases and data platforms.

DataContext is early and intentionally small. The core event model is designed to stay stable, while integrations and APIs will evolve with real-world usage.

## Install

```bash
pip install datacontext
```

Optional OpenTelemetry support:

```bash
pip install "datacontext[otel]"
```

Optional SQLAlchemy support:

```bash
pip install "datacontext[sqlalchemy]"
```

Optional PostgreSQL support:

```bash
pip install "datacontext[postgres]"
```

Optional BigQuery support:

```bash
pip install "datacontext[bigquery]"
```

Optional Dagster support:

```bash
pip install "datacontext[dagster]"
```

Optional Snowflake support:

```bash
pip install "datacontext[snowflake]"
```

Optional dbt support:

```bash
pip install "datacontext[dbt]"
```

## Quick Start

Configure DataContext at an explicit data-access boundary:

```python
import datacontext

datacontext.configure(
    service_name="checkout-api",
    environment="production",
    instruments=[
        datacontext.instrument_function(
            target="app.db.execute",
            query_arg="query",
            db_system="postgres",
            client="internal-db-wrapper",
        )
    ],
)
```

After configuration, calls to `app.db.execute(...)` emit one completed query event when the function returns or raises.

Wrappers preserve return values and re-raise original exceptions unchanged. If DataContext fails, your application should not.

Emitted event:

```json
{
  "event_name": "datacontext.query",
  "timestamp": "2026-05-15T10:31:04.203Z",
  "started_at": "2026-05-15T10:31:04.182Z",
  "ended_at": "2026-05-15T10:31:04.203Z",
  "service_name": "checkout-api",
  "environment": "production",
  "db_system": "postgres",
  "client": "internal-db-wrapper",
  "query_fingerprint": "sha256:4f5b7f...",
  "query_text": "select * from orders where id = ?",
  "duration_ms": 21.4,
  "callsite": {
    "file": "checkout.py",
    "path": "/app/checkout.py",
    "line": 42,
    "function": "load_cart",
    "stack": "checkout:42 load_cart -> routes:88 post_checkout"
  },
  "status": "ok"
}
```

## Why DataContext?

Queries often lose their application context by the time they reach logs, traces, or the data platform itself.

That makes it hard to answer:

- Which request, job, or agent triggered this query?
- Which code path caused this unexpected load?
- Which actor, tenant, or session was involved?

DataContext connects query events to runtime context, source callsites, and OpenTelemetry trace context when available.

## How It Works

<p align="center">
  <img src="https://raw.githubusercontent.com/data-context-hq/datacontext/main/assets/datacontext-flow.svg" alt="DataContext query attribution flow" width="680">
</p>

## Supported Today

DataContext currently supports:

- manual query instrumentation with `trace_query(...)` and `capture_query(...)`,
- wrapping explicit data-access functions with `instrument_function(...)`,
- SQLAlchemy engine instrumentation through the optional `sqlalchemy` extra,
- native PostgreSQL connection instrumentation through the optional `postgres` extra,
- native BigQuery client instrumentation through the optional `bigquery` extra,
- Dagster execution context attribution through the optional `dagster` extra,
- dbt execution context attribution through the optional `dbt` extra,
- native Snowflake connector instrumentation through the optional `snowflake` extra,
- JSONL, callback, and OpenTelemetry-oriented sinks,
- correlating query events with runtime context and active OpenTelemetry spans.

Other database drivers are not automatically instrumented yet.

## Planned Integrations

Other database clients, ORMs, and data-platform libraries will be prioritized from real usage.

Use [GitHub Discussions](https://github.com/data-context-hq/datacontext/discussions) or [feature requests](https://github.com/data-context-hq/datacontext/issues/new?template=feature_or_integration_request.md) to share the library, data-access pattern, sync/async behavior, and event fields you need.

## Add Runtime Context

DataContext is most useful when queries are connected to runtime context:

```python
from datacontext import context

with context.use(
    operation="checkout",
    actor="user:123",
    request_id="req_abc",
    attributes={"tenant": "acme", "region": "us-east-1"},
):
    run_business_logic()
```

Any query captured inside the context includes that attribution.

## Event Shape

DataContext emits one final event per query, at finish or error time.

Every normal event includes:

- `event_name`, `timestamp`, `started_at`, `ended_at`,
- `service_name`, `environment`, `db_system`, `client`,
- `query_fingerprint`, `duration_ms`, `callsite`, and `status`.

The `timestamp` is the event finish time and matches `ended_at`. By default, events also include sanitized `query_text`; it can be disabled globally or per captured query. Optional fields are only present when DataContext can derive them or when the caller supplies them.

Example `datacontext.query` event:

```json
{
  "event_name": "datacontext.query",
  "timestamp": "2026-05-15T10:31:04.203Z",
  "started_at": "2026-05-15T10:31:04.182Z",
  "ended_at": "2026-05-15T10:31:04.203Z",
  "service_name": "checkout-api",
  "environment": "production",
  "db_system": "postgres",
  "client": "internal-db-wrapper",
  "query_fingerprint": "sha256:4f5b7f...",
  "query_text": "select * from orders where id = ?",
  "duration_ms": 21.4,
  "callsite": {
    "file": "checkout.py",
    "path": "/app/checkout.py",
    "line": 42,
    "function": "load_cart",
    "stack": "checkout:42 load_cart -> routes:88 post_checkout"
  },
  "status": "ok",
  "trace_id": "0af7651916cd43dd8448eb211c80319c",
  "span_id": "b7ad6b7169203331",
  "trace_flags": "01",
  "operation": "checkout",
  "actor": "user:123",
  "request_id": "req_abc",
  "job_id": "job_456",
  "session_id": "sess_789",
  "rows": 12,
  "db_name": "checkout",
  "db_host": "postgres.internal",
  "attributes": {
    "tenant": "acme",
    "region": "us-east-1"
  }
}
```

On errors, DataContext emits `status: "error"` and includes compact error metadata before re-raising the original exception.

```json
{
  "status": "error",
  "error": {
    "type": "ValueError",
    "message": "boom"
  }
}
```

## Production Behavior

DataContext is designed to sit on production data-access paths without changing application behavior:

- wrappers preserve return values and re-raise original exceptions,
- DataContext capture failures fall back to a minimal event,
- sink failures are logged and dropped,
- sanitized `query_text` is emitted by default, while raw SQL is explicit opt-in,
- OpenTelemetry trace context is used when present, but DataContext does not configure tracing or exporters.

## Schema Philosophy

DataContext uses a small, stable event shape on purpose.

The core schema answers the questions teams usually need first:

- what query shape ran,
- where it came from in code,
- which runtime context caused it,
- which trace or span it belongs to.

The schema is meant to work as JSON logs, warehouse rows, debugging artifacts, or observability events. Team-specific metadata belongs in `attributes`, so teams can extend events without changing the common attribution layer.

## Manual Instrumentation

The Quick Start approach is the recommended default: configure DataContext once and wrap your existing data-access function. When that does not fit, you can instrument directly at the call site with the lower-level APIs:

```python
with datacontext.trace_query(
    db_system="postgres",
    client="internal-db-wrapper",
    query=query,
):
    db.execute(query)
```

Use `capture_query(...)` when timing is already measured by your integration:

```python
datacontext.capture_query(
    db_system="postgres",
    client="internal-db-wrapper",
    query=query,
    started_at=started_at,
    ended_at=ended_at,
    duration_ms=duration_ms,
    status="ok",
    rows=12,
)
```

## SQLAlchemy

SQLAlchemy support is optional and only installed with the `sqlalchemy` extra. Pass an engine to `instrument_sqlalchemy(...)` during configuration:

```python
import datacontext

datacontext.configure(
    service_name="checkout-api",
    environment="production",
    instruments=[
        datacontext.instrument_sqlalchemy(engine),
    ],
)
```

The integration listens to SQLAlchemy engine events and emits one DataContext event for each completed or failed statement. It also supports async engines by registering listeners on the underlying sync engine.

## PostgreSQL

PostgreSQL support is optional and only installed with the `postgres` extra. It instruments a `psycopg` connection by wrapping connection-level `execute(...)` calls and cursors returned by `cursor()`.

```python
import datacontext
import psycopg

conn = psycopg.connect("postgresql://checkout@postgres.internal/checkout")

datacontext.configure(
    service_name="checkout-api",
    environment="production",
)
datacontext.instrument_postgres(conn).apply()

with conn.cursor() as cursor:
    cursor.execute("select * from orders where id = %s", [order_id])
```

The integration emits one DataContext event per completed or failed `execute(...)` or `executemany(...)` call. Events use `db_system: "postgresql"`, `client: "psycopg"`, and include `db_name`, `db_host`, and `rows` when available from the connection or cursor.

## BigQuery

BigQuery support is optional and only installed with the `bigquery` extra. Pass a `google.cloud.bigquery.Client` to `instrument_bigquery(...)` during configuration:

```python
from google.cloud import bigquery
import datacontext

client = bigquery.Client(project="analytics-prod")

datacontext.configure(
    service_name="warehouse-loader",
    environment="production",
    instruments=[
        datacontext.instrument_bigquery(
            client,
            labels={"service": "warehouse-loader"},
            job_id_prefix="warehouse_loader_",
        ),
    ],
)
```

The integration instruments `Client.query_and_wait(...)` and `Client.query(...)`. For `query(...)`, DataContext emits the event when the returned job's `result()` method completes or raises, so the duration follows the waited query rather than only job submission. Captured events use `db_system: "bigquery"`, `client: "google-cloud-bigquery"`, the client project as `db_name`, and BigQuery job metadata under `attributes`.

BigQuery job labels and `job_id_prefix` are opt-in. When configured, labels are injected through `QueryJobConfig`; if the call already passed a `job_config`, DataContext merges labels into it and user-defined labels win on matching keys. `job_id_prefix` is injected for `Client.query(...)` only if the call did not already pass `job_id` or `job_id_prefix`.

## Dagster

Dagster support is optional and only installed with the `dagster` extra. DataContext does not replace Dagster observability, materializations, asset lineage, or run state. Dagster remains the source of truth for orchestration identity; DataContext adds Dagster metadata to query events emitted inside assets and ops.

Use the dependency-free context bridge inside a Dagster asset or op:

```python
import datacontext as dc

@asset
def orders(context):
    with dc.use_dagster_context(context):
        run_queries()
```

When Dagster is installed, you can also use the native resource:

```python
from datacontext import DataContextResource

@asset
def orders(context, datacontext: DataContextResource):
    with datacontext.use_context(context):
        run_queries()
```

Captured queries include the Dagster run id as `job_id`, the asset key or op name as `operation`, and Dagster details under `attributes` such as `dagster.run_id`, `dagster.job_name`, `dagster.op_name`, `dagster.asset_key`, and `dagster.partition_key`. Dagster run tags are included only when `include_run_tags=True`.

## Snowflake

Snowflake connector support is optional and only installed with the `snowflake` extra. Configure it once before creating or using cursors:

```python
import snowflake.connector

import datacontext

datacontext.configure(
    service_name="analytics-worker",
    environment="production",
    instruments=[
        datacontext.instrument_snowflake(),
    ],
)

conn = snowflake.connector.connect(
    account="acme-prod",
    user="loader",
    password="...",
    warehouse="analytics_wh",
    database="analytics",
    schema="public",
)

cursor = conn.cursor()
cursor.execute("select count(*) from orders")
```

The integration wraps `snowflake-connector-python` cursor `execute`, `executemany`, and `execute_async`. It emits `db_system: "snowflake"`, `client: "snowflake-connector-python"`, `rows` from `cursor.rowcount` when available, and Snowflake metadata under `attributes`, including `snowflake.query_id` from `cursor.sfqid`.

Richer Snowflake cost and performance metrics, such as bytes scanned, partitions scanned, execution time, spill bytes, load percent, and cloud-services credits, come from Snowflake Query History. DataContext does not query Query History inside the synchronous cursor wrapper; join those metrics later by `attributes.snowflake.query_id`.

## dbt

dbt support is optional and only installed with the `dbt` extra. DataContext does not replace dbt artifacts, exposures, lineage, or run results. dbt remains the source of truth for transformation identity; DataContext adds dbt metadata to query events emitted inside Python models or other dbt-adjacent execution code.

Use the dependency-free context bridge inside a dbt Python model:

```python
import datacontext as dc

def model(dbt, session):
    with dc.use_dbt_context(dbt):
        return run_queries(session)
```

Captured queries include the dbt invocation id as `job_id`, the model unique id or relation as `operation`, and dbt details under `attributes` such as `dbt.invocation_id`, `dbt.node.unique_id`, `dbt.node.name`, `dbt.node.resource_type`, `dbt.node.package_name`, `dbt.this`, and `dbt.target.name`.

## Privacy and Query Text

DataContext emits `query_fingerprint` and sanitized `query_text` by default. Raw query text is not emitted unless you explicitly opt in.

To emit only the fingerprint without sanitized query text, disable query text:

```python
datacontext.configure(
    service_name="checkout-api",
    environment="production",
    include_query_text=False,
)
```

The sanitizer uses the same normalization as fingerprinting: it replaces string and numeric literals with `?`, normalizes whitespace, lowercases SQL, and compacts placeholder `IN (...)` lists.

To include exact raw SQL instead, use the explicit raw-query option:

```python
datacontext.capture_query(
    db_system="postgres",
    client="internal-db-wrapper",
    query=query,
    started_at=started_at,
    ended_at=ended_at,
    duration_ms=duration_ms,
    status="ok",
    include_raw_query_text=True,
)
```

## OpenTelemetry

DataContext uses OpenTelemetry context when it exists. It does not set up tracing, choose exporters, or replace your existing pipeline.

With an active span, DataContext adds `trace_id`, `span_id`, and `trace_flags` to emitted events. It can also attach compact `datacontext.*` attributes to the active span, including query fingerprint, status, duration, operation, and request ID.

## Sinks

The default sink writes JSON Lines to stdout. You can send events to a file, a callback, or an OpenTelemetry-oriented sink.

Configure a file sink:

```python
import datacontext
from datacontext.sinks import FileJsonlSink

datacontext.configure(
    service_name="checkout-api",
    environment="production",
    sink=FileJsonlSink("datacontext.jsonl"),
)
```

Configure a callback sink:

```python
from datacontext.sinks import CallbackSink

datacontext.configure(
    service_name="checkout-api",
    environment="production",
    sink=CallbackSink(lambda event: send_to_pipeline(event)),
)
```

Sink failures are dropped and logged. They should not block application work.

## Community

Use [GitHub Discussions](https://github.com/data-context-hq/datacontext/discussions) for questions, design feedback, and integration ideas.

Use [GitHub Issues](https://github.com/data-context-hq/datacontext/issues) for bugs and focused feature requests.

## License

Apache-2.0
