21 Advanced Lineage Workflows
Learn advanced lineage tracking patterns for complex data pipelines, debugging, and quality assurance.
21.1 Example 1: Multi-Step Transformation Lineage
Track complex transformation pipelines with multiple steps to understand the complete data journey.
import pandas as pd
import additory as add
df = pd.DataFrame({
'product': ['Widget', 'Gadget', 'Doohickey'],
'price': [100, 200, 150],
'cost': [60, 120, 90],
'quantity': [2, 1, 3]
})
# Step 1: Calculate profit
result = add.transform(
'@calc',
df,
columns=['price', 'cost'],
expression='price - cost',
as_='profit',
lineage=True
)
# Step 2: Calculate revenue
result = add.transform(
'@calc',
result,
columns=['price', 'quantity'],
expression='price * quantity',
as_='revenue',
lineage=True
)
# Step 3: Filter high-value items
result = add.transform(
'@filter',
result,
columns=['product', 'profit', 'revenue'],
where='revenue > 200',
lineage=True
)
# View complete lineage report
lineage_report = add.scan('@lineage', result)
print(lineage_report)Output:
═══════════════════════════════════════════════════════════════
LINEAGE REPORT
═══════════════════════════════════════════════════════════════
DataFrame: 1 rows × 3 columns
Operations: 3 transformations applied
───────────────────────────────────────────────────────────────
Step 1: add.transform - 2026-03-11T15:10:22
───────────────────────────────────────────────────────────────
Rows: 3 → 3 (no change)
Columns Added: profit
Parameters:
columns: ["price","cost"]
expression: ["price - cost"]
mode: "@calc"
───────────────────────────────────────────────────────────────
Step 2: add.transform - 2026-03-11T15:10:22
───────────────────────────────────────────────────────────────
Rows: 3 → 3 (no change)
Columns Added: revenue
Parameters:
columns: ["price","quantity"]
expression: ["price * quantity"]
mode: "@calc"
───────────────────────────────────────────────────────────────
Step 3: add.transform - 2026-03-11T15:10:22
───────────────────────────────────────────────────────────────
Rows: 3 → 1 (2 rows filtered out)
Columns Added: none
Parameters:
columns: ["product","profit","revenue"]
where: "revenue > 200"
mode: "@filter"
Use Cases: - Pipeline Debugging: Identify which step introduced an issue - Performance Analysis: See which steps changed row counts - Documentation: Auto-generate pipeline documentation - Validation: Verify each transformation step
Note: This also works with polars DataFrames.
21.2 Example 2: Sorting and Calculation Lineage
Track sorting operations combined with calculations to understand data ordering and transformations.
import pandas as pd
import additory as add
df = pd.DataFrame({
'product': ['Widget', 'Gadget', 'Doohickey'],
'price': [100, 200, 150],
'sales': [50, 150, 75]
})
# Step 1: Calculate revenue
result = add.transform(
'@calc',
df,
columns=['price', 'sales'],
expression='price * sales',
as_='revenue',
lineage=True
)
# Step 2: Sort by revenue (descending)
result = add.transform(
'@sort',
result,
columns=['product', 'revenue'],
by='revenue',
order='desc',
lineage=True
)
# View lineage report
lineage_report = add.scan('@lineage', result)
print(lineage_report)Output:
═══════════════════════════════════════════════════════════════
LINEAGE REPORT
═══════════════════════════════════════════════════════════════
DataFrame: 3 rows × 2 columns
Operations: 2 transformations applied
───────────────────────────────────────────────────────────────
Step 1: add.transform - 2026-03-11T15:15:45
───────────────────────────────────────────────────────────────
Rows: 3 → 3 (no change)
Columns Added: revenue
Parameters:
columns: ["price","sales"]
expression: ["price * sales"]
mode: "@calc"
───────────────────────────────────────────────────────────────
Step 2: add.transform - 2026-03-11T15:15:45
───────────────────────────────────────────────────────────────
Rows: 3 → 3 (no change)
Columns Added: none
Parameters:
columns: ["product","revenue"]
by: "revenue"
order: "desc"
mode: "@sort"
Key Insights: - Sorting operations are tracked with their parameters - You can see the sort column and order - Row count remains the same (sorting doesn’t filter) - Useful for understanding data ordering logic
Note: This also works with polars DataFrames.
21.3 Example 3: Lineage for Debugging Data Quality
Use lineage tracking to debug data quality issues and understand data cleaning steps.
import pandas as pd
import additory as add
df = pd.DataFrame({
'product': ['Widget', 'Gadget', 'Doohickey', 'Thingamajig'],
'price': [100, 200, None, 150],
'quantity': [2, 1, 3, 0]
})
# Step 1: Impute missing values
result = add.transform(
'@deduce',
df,
columns=['price'],
strategy={'method': 'mean'},
lineage=True
)
# Step 2: Filter out zero quantity
result = add.transform(
'@filter',
result,
columns=['product', 'price', 'quantity'],
where='quantity > 0',
lineage=True
)
# Step 3: Calculate total
result = add.transform(
'@calc',
result,
columns=['price', 'quantity'],
expression='price * quantity',
as_='total',
lineage=True
)
# View lineage report to understand data cleaning steps
lineage_report = add.scan('@lineage', result)
print(lineage_report)Output:
═══════════════════════════════════════════════════════════════
LINEAGE REPORT
═══════════════════════════════════════════════════════════════
DataFrame: 3 rows × 4 columns
Operations: 3 transformations applied
───────────────────────────────────────────────────────────────
Step 1: add.transform - 2026-03-11T15:20:18
───────────────────────────────────────────────────────────────
Rows: 4 → 4 (no change)
Columns Added: none
Parameters:
columns: ["price"]
strategy: {"method":"mean"}
mode: "@deduce"
───────────────────────────────────────────────────────────────
Step 2: add.transform - 2026-03-11T15:20:18
───────────────────────────────────────────────────────────────
Rows: 4 → 3 (1 row filtered out)
Columns Added: none
Parameters:
columns: ["product","price","quantity"]
where: "quantity > 0"
mode: "@filter"
───────────────────────────────────────────────────────────────
Step 3: add.transform - 2026-03-11T15:20:18
───────────────────────────────────────────────────────────────
Rows: 3 → 3 (no change)
Columns Added: total
Parameters:
columns: ["price","quantity"]
expression: ["price * quantity"]
mode: "@calc"
Debugging Workflow: 1. Identify the issue: Final data has 3 rows instead of 4 2. Check lineage: Step 2 filtered out 1 row 3. Understand why: Filter condition was quantity > 0 4. Verify imputation: Step 1 handled missing price values 5. Confirm calculation: Step 3 calculated totals correctly
Benefits: - Quick identification of data loss points - Clear understanding of data cleaning logic - Easy verification of transformation correctness - Reproducible debugging process
Note: This also works with polars DataFrames.
21.4 Real-World Use Cases
21.4.1 1. Audit Trail for Compliance
Track all transformations for regulatory compliance and audit requirements.
# Enable lineage for all operations
df = add.transform('@calc', df, ..., lineage=True)
df = add.transform('@filter', df, ..., lineage=True)
df = add.to(df, bring_from=ref, ..., lineage=True)
# Generate audit report
audit_report = add.scan('@lineage', df)
# Save audit report for compliance
with open('audit_trail.txt', 'w') as f:
f.write(audit_report)21.4.2 2. Pipeline Documentation
Auto-generate documentation for data pipelines.
# Build pipeline with lineage
result = build_data_pipeline(raw_data, lineage=True)
# Generate documentation
docs = add.scan('@lineage', result)
# Include in README or documentation
print("## Data Pipeline\n")
print(docs)21.4.3 3. Debugging Production Issues
When production data looks wrong, lineage helps identify the problem.
# Production pipeline with lineage
result = production_pipeline(data, lineage=True)
# Check lineage when issues arise
lineage = add.scan('@lineage', result)
# Identify problematic step
# Look for unexpected row changes or column additions21.4.4 4. Collaboration and Code Review
Share transformation history with team members.
# Developer creates pipeline
result = create_analysis(data, lineage=True)
# Share lineage report with reviewer
lineage_report = add.scan('@lineage', result)
# Reviewer can see exact transformation steps
# No need to read through all the code21.4.5 5. A/B Testing Validation
Verify that A/B test groups are processed identically.
# Process group A
group_a = process_data(data_a, lineage=True)
lineage_a = add.scan('@lineage', group_a)
# Process group B
group_b = process_data(data_b, lineage=True)
lineage_b = add.scan('@lineage', group_b)
# Compare lineage reports to ensure identical processing
# (except for input data differences)21.5 Advanced Patterns
21.5.1 Pattern 1: Conditional Lineage
Enable lineage only in development/debugging mode.
import os
# Enable lineage in development
DEBUG_MODE = os.getenv('DEBUG', 'false').lower() == 'true'
result = add.transform(
'@calc',
df,
expression='price * quantity',
as_='total',
lineage=DEBUG_MODE # Only track in debug mode
)
if DEBUG_MODE:
print(add.scan('@lineage', result))21.5.2 Pattern 2: Lineage Checkpoints
Save lineage at key pipeline stages.
# Stage 1: Data cleaning
cleaned = clean_data(raw_data, lineage=True)
checkpoint_1 = add.scan('@lineage', cleaned)
# Stage 2: Feature engineering
features = engineer_features(cleaned, lineage=True)
checkpoint_2 = add.scan('@lineage', features)
# Stage 3: Final transformations
final = final_transforms(features, lineage=True)
checkpoint_3 = add.scan('@lineage', final)
# Save all checkpoints
save_checkpoints([checkpoint_1, checkpoint_2, checkpoint_3])21.5.3 Pattern 3: Lineage-Based Testing
Use lineage to verify pipeline behavior.
def test_pipeline_steps():
result = my_pipeline(test_data, lineage=True)
lineage = add.scan('@lineage', result)
# Verify expected number of steps
assert 'Step 1' in lineage
assert 'Step 2' in lineage
assert 'Step 3' in lineage
# Verify no unexpected data loss
assert '0 rows filtered out' in lineage or 'no change' in lineage
# Verify expected columns were added
assert 'Columns Added: total' in lineage21.6 Performance Considerations
21.6.1 Lineage Overhead
Lineage tracking has minimal performance impact:
- Memory: ~1-5KB per operation (JSON metadata)
- Time: <100ms per operation (metadata creation)
- Computation: No impact on actual data processing
21.6.2 When to Disable Lineage
Consider disabling lineage in:
- Production pipelines: Unless required for compliance
- Large-scale batch processing: When processing millions of rows
- Performance-critical code: When every millisecond counts
- Simple one-off scripts: When transformation history isn’t needed
21.6.3 Lineage Best Practices
- Enable selectively: Use lineage where it adds value
- Clean up metadata: Lineage is lost when session ends (by design)
- Don’t over-track: Not every operation needs lineage
- Use for debugging: Enable when investigating issues
- Document with lineage: Include in code reviews and documentation
21.7 Troubleshooting
21.7.1 Issue: “No lineage metadata found”
Cause: Forgot to add lineage=True to operations.
Solution: Add lineage=True to all operations in the pipeline.
21.7.2 Issue: Lineage lost after type conversion
Cause: Used as_type parameter with lineage=True.
Solution: Remove as_type parameter or disable lineage.
21.7.3 Issue: Lineage not showing all steps
Cause: Some operations didn’t have lineage=True.
Solution: Ensure all operations in the chain have lineage=True.
21.8 Next Steps
- Page 1: Lineage tracking basics
- Troubleshooting Guide: Common errors and solutions
- API Reference: Complete parameter documentation