21  Advanced Lineage Workflows

Learn advanced lineage tracking patterns for complex data pipelines, debugging, and quality assurance.

21.1 Example 1: Multi-Step Transformation Lineage

Track complex transformation pipelines with multiple steps to understand the complete data journey.

import pandas as pd
import additory as add

df = pd.DataFrame({
    'product': ['Widget', 'Gadget', 'Doohickey'],
    'price': [100, 200, 150],
    'cost': [60, 120, 90],
    'quantity': [2, 1, 3]
})

# Step 1: Calculate profit
result = add.transform(
    '@calc',
    df,
    columns=['price', 'cost'],
    expression='price - cost',
    as_='profit',
    lineage=True
)

# Step 2: Calculate revenue
result = add.transform(
    '@calc',
    result,
    columns=['price', 'quantity'],
    expression='price * quantity',
    as_='revenue',
    lineage=True
)

# Step 3: Filter high-value items
result = add.transform(
    '@filter',
    result,
    columns=['product', 'profit', 'revenue'],
    where='revenue > 200',
    lineage=True
)

# View complete lineage report
lineage_report = add.scan('@lineage', result)
print(lineage_report)

Output:

═══════════════════════════════════════════════════════════════
LINEAGE REPORT
═══════════════════════════════════════════════════════════════

DataFrame: 1 rows × 3 columns
Operations: 3 transformations applied

───────────────────────────────────────────────────────────────
Step 1: add.transform - 2026-03-11T15:10:22
───────────────────────────────────────────────────────────────
  Rows: 3 → 3 (no change)
  Columns Added: profit
  
  Parameters:
    columns: ["price","cost"]
    expression: ["price - cost"]
    mode: "@calc"

───────────────────────────────────────────────────────────────
Step 2: add.transform - 2026-03-11T15:10:22
───────────────────────────────────────────────────────────────
  Rows: 3 → 3 (no change)
  Columns Added: revenue
  
  Parameters:
    columns: ["price","quantity"]
    expression: ["price * quantity"]
    mode: "@calc"

───────────────────────────────────────────────────────────────
Step 3: add.transform - 2026-03-11T15:10:22
───────────────────────────────────────────────────────────────
  Rows: 3 → 1 (2 rows filtered out)
  Columns Added: none
  
  Parameters:
    columns: ["product","profit","revenue"]
    where: "revenue > 200"
    mode: "@filter"

Use Cases: - Pipeline Debugging: Identify which step introduced an issue - Performance Analysis: See which steps changed row counts - Documentation: Auto-generate pipeline documentation - Validation: Verify each transformation step

Note: This also works with polars DataFrames.


21.2 Example 2: Sorting and Calculation Lineage

Track sorting operations combined with calculations to understand data ordering and transformations.

import pandas as pd
import additory as add

df = pd.DataFrame({
    'product': ['Widget', 'Gadget', 'Doohickey'],
    'price': [100, 200, 150],
    'sales': [50, 150, 75]
})

# Step 1: Calculate revenue
result = add.transform(
    '@calc',
    df,
    columns=['price', 'sales'],
    expression='price * sales',
    as_='revenue',
    lineage=True
)

# Step 2: Sort by revenue (descending)
result = add.transform(
    '@sort',
    result,
    columns=['product', 'revenue'],
    by='revenue',
    order='desc',
    lineage=True
)

# View lineage report
lineage_report = add.scan('@lineage', result)
print(lineage_report)

Output:

═══════════════════════════════════════════════════════════════
LINEAGE REPORT
═══════════════════════════════════════════════════════════════

DataFrame: 3 rows × 2 columns
Operations: 2 transformations applied

───────────────────────────────────────────────────────────────
Step 1: add.transform - 2026-03-11T15:15:45
───────────────────────────────────────────────────────────────
  Rows: 3 → 3 (no change)
  Columns Added: revenue
  
  Parameters:
    columns: ["price","sales"]
    expression: ["price * sales"]
    mode: "@calc"

───────────────────────────────────────────────────────────────
Step 2: add.transform - 2026-03-11T15:15:45
───────────────────────────────────────────────────────────────
  Rows: 3 → 3 (no change)
  Columns Added: none
  
  Parameters:
    columns: ["product","revenue"]
    by: "revenue"
    order: "desc"
    mode: "@sort"

Key Insights: - Sorting operations are tracked with their parameters - You can see the sort column and order - Row count remains the same (sorting doesn’t filter) - Useful for understanding data ordering logic

Note: This also works with polars DataFrames.


21.3 Example 3: Lineage for Debugging Data Quality

Use lineage tracking to debug data quality issues and understand data cleaning steps.

import pandas as pd
import additory as add

df = pd.DataFrame({
    'product': ['Widget', 'Gadget', 'Doohickey', 'Thingamajig'],
    'price': [100, 200, None, 150],
    'quantity': [2, 1, 3, 0]
})

# Step 1: Impute missing values
result = add.transform(
    '@deduce',
    df,
    columns=['price'],
    strategy={'method': 'mean'},
    lineage=True
)

# Step 2: Filter out zero quantity
result = add.transform(
    '@filter',
    result,
    columns=['product', 'price', 'quantity'],
    where='quantity > 0',
    lineage=True
)

# Step 3: Calculate total
result = add.transform(
    '@calc',
    result,
    columns=['price', 'quantity'],
    expression='price * quantity',
    as_='total',
    lineage=True
)

# View lineage report to understand data cleaning steps
lineage_report = add.scan('@lineage', result)
print(lineage_report)

Output:

═══════════════════════════════════════════════════════════════
LINEAGE REPORT
═══════════════════════════════════════════════════════════════

DataFrame: 3 rows × 4 columns
Operations: 3 transformations applied

───────────────────────────────────────────────────────────────
Step 1: add.transform - 2026-03-11T15:20:18
───────────────────────────────────────────────────────────────
  Rows: 4 → 4 (no change)
  Columns Added: none
  
  Parameters:
    columns: ["price"]
    strategy: {"method":"mean"}
    mode: "@deduce"

───────────────────────────────────────────────────────────────
Step 2: add.transform - 2026-03-11T15:20:18
───────────────────────────────────────────────────────────────
  Rows: 4 → 3 (1 row filtered out)
  Columns Added: none
  
  Parameters:
    columns: ["product","price","quantity"]
    where: "quantity > 0"
    mode: "@filter"

───────────────────────────────────────────────────────────────
Step 3: add.transform - 2026-03-11T15:20:18
───────────────────────────────────────────────────────────────
  Rows: 3 → 3 (no change)
  Columns Added: total
  
  Parameters:
    columns: ["price","quantity"]
    expression: ["price * quantity"]
    mode: "@calc"

Debugging Workflow: 1. Identify the issue: Final data has 3 rows instead of 4 2. Check lineage: Step 2 filtered out 1 row 3. Understand why: Filter condition was quantity > 0 4. Verify imputation: Step 1 handled missing price values 5. Confirm calculation: Step 3 calculated totals correctly

Benefits: - Quick identification of data loss points - Clear understanding of data cleaning logic - Easy verification of transformation correctness - Reproducible debugging process

Note: This also works with polars DataFrames.


21.4 Real-World Use Cases

21.4.1 1. Audit Trail for Compliance

Track all transformations for regulatory compliance and audit requirements.

# Enable lineage for all operations
df = add.transform('@calc', df, ..., lineage=True)
df = add.transform('@filter', df, ..., lineage=True)
df = add.to(df, bring_from=ref, ..., lineage=True)

# Generate audit report
audit_report = add.scan('@lineage', df)

# Save audit report for compliance
with open('audit_trail.txt', 'w') as f:
    f.write(audit_report)

21.4.2 2. Pipeline Documentation

Auto-generate documentation for data pipelines.

# Build pipeline with lineage
result = build_data_pipeline(raw_data, lineage=True)

# Generate documentation
docs = add.scan('@lineage', result)

# Include in README or documentation
print("## Data Pipeline\n")
print(docs)

21.4.3 3. Debugging Production Issues

When production data looks wrong, lineage helps identify the problem.

# Production pipeline with lineage
result = production_pipeline(data, lineage=True)

# Check lineage when issues arise
lineage = add.scan('@lineage', result)

# Identify problematic step
# Look for unexpected row changes or column additions

21.4.4 4. Collaboration and Code Review

Share transformation history with team members.

# Developer creates pipeline
result = create_analysis(data, lineage=True)

# Share lineage report with reviewer
lineage_report = add.scan('@lineage', result)

# Reviewer can see exact transformation steps
# No need to read through all the code

21.4.5 5. A/B Testing Validation

Verify that A/B test groups are processed identically.

# Process group A
group_a = process_data(data_a, lineage=True)
lineage_a = add.scan('@lineage', group_a)

# Process group B
group_b = process_data(data_b, lineage=True)
lineage_b = add.scan('@lineage', group_b)

# Compare lineage reports to ensure identical processing
# (except for input data differences)

21.5 Advanced Patterns

21.5.1 Pattern 1: Conditional Lineage

Enable lineage only in development/debugging mode.

import os

# Enable lineage in development
DEBUG_MODE = os.getenv('DEBUG', 'false').lower() == 'true'

result = add.transform(
    '@calc',
    df,
    expression='price * quantity',
    as_='total',
    lineage=DEBUG_MODE  # Only track in debug mode
)

if DEBUG_MODE:
    print(add.scan('@lineage', result))

21.5.2 Pattern 2: Lineage Checkpoints

Save lineage at key pipeline stages.

# Stage 1: Data cleaning
cleaned = clean_data(raw_data, lineage=True)
checkpoint_1 = add.scan('@lineage', cleaned)

# Stage 2: Feature engineering
features = engineer_features(cleaned, lineage=True)
checkpoint_2 = add.scan('@lineage', features)

# Stage 3: Final transformations
final = final_transforms(features, lineage=True)
checkpoint_3 = add.scan('@lineage', final)

# Save all checkpoints
save_checkpoints([checkpoint_1, checkpoint_2, checkpoint_3])

21.5.3 Pattern 3: Lineage-Based Testing

Use lineage to verify pipeline behavior.

def test_pipeline_steps():
    result = my_pipeline(test_data, lineage=True)
    lineage = add.scan('@lineage', result)
    
    # Verify expected number of steps
    assert 'Step 1' in lineage
    assert 'Step 2' in lineage
    assert 'Step 3' in lineage
    
    # Verify no unexpected data loss
    assert '0 rows filtered out' in lineage or 'no change' in lineage
    
    # Verify expected columns were added
    assert 'Columns Added: total' in lineage

21.6 Performance Considerations

21.6.1 Lineage Overhead

Lineage tracking has minimal performance impact:

  • Memory: ~1-5KB per operation (JSON metadata)
  • Time: <100ms per operation (metadata creation)
  • Computation: No impact on actual data processing

21.6.2 When to Disable Lineage

Consider disabling lineage in:

  • Production pipelines: Unless required for compliance
  • Large-scale batch processing: When processing millions of rows
  • Performance-critical code: When every millisecond counts
  • Simple one-off scripts: When transformation history isn’t needed

21.6.3 Lineage Best Practices

  1. Enable selectively: Use lineage where it adds value
  2. Clean up metadata: Lineage is lost when session ends (by design)
  3. Don’t over-track: Not every operation needs lineage
  4. Use for debugging: Enable when investigating issues
  5. Document with lineage: Include in code reviews and documentation

21.7 Troubleshooting

21.7.1 Issue: “No lineage metadata found”

Cause: Forgot to add lineage=True to operations.

Solution: Add lineage=True to all operations in the pipeline.

21.7.2 Issue: Lineage lost after type conversion

Cause: Used as_type parameter with lineage=True.

Solution: Remove as_type parameter or disable lineage.

21.7.3 Issue: Lineage not showing all steps

Cause: Some operations didn’t have lineage=True.

Solution: Ensure all operations in the chain have lineage=True.


21.8 Next Steps

  • Page 1: Lineage tracking basics
  • Troubleshooting Guide: Common errors and solutions
  • API Reference: Complete parameter documentation