19  Real-World Data Quality Workflows

Practical workflows combining @analyze mode with data quality assessment, profiling, and validation.

19.1 Example 1: Data Quality Assessment Pipeline

Build a comprehensive data quality assessment pipeline to identify and report quality issues.

import pandas as pd
import additory as add

# Simulate a dataset with quality issues
df = pd.DataFrame({
    'customer_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    'name': ['Alice', 'Bob', None, 'David', 'Eve', 'Frank', None, 'Helen', 'Ivan', 'Jane'],
    'email': ['alice@email.com', 'bob@email.com', 'charlie@email.com', None, 
              'eve@email.com', None, 'grace@email.com', 'helen@email.com', 
              None, 'jane@email.com'],
    'age': [25, 30, 35, 40, None, 50, 55, None, 65, 70],
    'purchase_count': [5, 10, 15, 20, 25, 30, 35, 40, 45, 50]
})

# Step 1: Analyze data quality
quality_report = add.scan(
    '@analyze',
    df,
    as_type='dataframe'
)

print("Data Quality Report:")
print(quality_report)

# Step 2: Identify columns with quality issues (>10% nulls)
high_null_cols = quality_report[quality_report['null_pct'] > 10.0]

print("\nColumns with >10% null values:")
print(high_null_cols[['column', 'null_count', 'null_pct']])

# Step 3: Get summary statistics
summary = {
    'total_columns': len(quality_report),
    'columns_with_nulls': len(quality_report[quality_report['null_count'] > 0]),
    'high_null_columns': len(high_null_cols),
    'avg_null_pct': quality_report['null_pct'].mean()
}

print("\nQuality Summary:")
for key, value in summary.items():
    print(f"  {key}: {value}")

Output:

Data Quality Report:
        column       dtype  count  null_count  null_pct  unique   mean       std    min    max
0  customer_id       Int64     10           0       0.0      10    5.5  3.027650    1.0   10.0
1         name  String(Utf8)     10           2      20.0       8    NaN       NaN    NaN    NaN
2        email  String(Utf8)     10           3      30.0       7    NaN       NaN    NaN    NaN
3          age       Int64     10           2      20.0       8   47.5  16.583124   25.0   70.0
4 purchase_count       Int64     10           0       0.0      10   27.5  15.732133    5.0   50.0

Columns with >10% null values:
    column  null_count  null_pct
1     name           2      20.0
2    email           3      30.0
3      age           2      20.0

Quality Summary:
  total_columns: 5
  columns_with_nulls: 3
  high_null_columns: 3
  avg_null_pct: 14.0

Key Workflow Steps: 1. Run @analyze to get comprehensive quality metrics 2. Filter results to identify problematic columns 3. Generate summary statistics for reporting 4. Use insights to prioritize data cleaning efforts

Quality Thresholds: - null_pct > 10%: High null rate, requires attention - null_pct > 50%: Critical quality issue, consider dropping column - unique == count: Potential ID column or high cardinality - unique == 1: Constant column, no information value

Note: This also works with polars DataFrames.


19.2 Example 2: Column Profiling Workflow

Profile columns by data type to understand distributions and cardinality patterns.

import pandas as pd
import additory as add

# Sales data with various data types
df = pd.DataFrame({
    'transaction_id': range(1, 101),
    'amount': [100 + i * 10 for i in range(100)],
    'category': ['Electronics'] * 30 + ['Clothing'] * 40 + ['Food'] * 30,
    'payment_method': ['Credit'] * 50 + ['Debit'] * 30 + ['Cash'] * 20,
    'customer_age': [25 + (i % 50) for i in range(100)]
})

# Profile the data
profile = add.scan(
    '@analyze',
    df,
    as_type='dataframe'
)

print("Column Profile:")
print(profile)

# Extract insights for each column type
numeric_cols = profile[profile['dtype'].str.contains('Int|Float', na=False)]
categorical_cols = profile[profile['dtype'].str.contains('String', na=False)]

print("\nNumeric Columns:")
print(numeric_cols[['column', 'mean', 'std', 'min', 'max']])

print("\nCategorical Columns:")
print(categorical_cols[['column', 'unique', 'count']])

# Get cardinality insights
insights = {
    'numeric_columns': len(numeric_cols),
    'categorical_columns': len(categorical_cols),
    'high_cardinality': len(profile[profile['unique'] > 50]),
    'low_cardinality': len(profile[profile['unique'] <= 10])
}

print("\nCardinality Insights:")
for key, value in insights.items():
    print(f"  {key}: {value}")

Output:

Column Profile:
          column    dtype  count  null_count  null_pct  unique      mean        std     min      max
0  transaction_id    Int64    100           0       0.0     100      50.5  29.011492     1.0    100.0
1          amount    Int64    100           0       0.0     100     595.0  295.594641   100.0   1090.0
2        category  String(Utf8)    100           0       0.0       3       NaN        NaN     NaN      NaN
3  payment_method  String(Utf8)    100           0       0.0       3       NaN        NaN     NaN      NaN
4    customer_age    Int64    100           0       0.0      50      49.5  14.430869    25.0     74.0

Numeric Columns:
          column    mean        std     min      max
0  transaction_id    50.5  29.011492     1.0    100.0
1          amount   595.0  295.594641   100.0   1090.0
4    customer_age    49.5  14.430869    25.0     74.0

Categorical Columns:
          column  unique  count
2        category       3    100
3  payment_method       3    100

Cardinality Insights:
  numeric_columns: 3
  categorical_columns: 2
  high_cardinality: 2
  low_cardinality: 3

Profiling Insights: - High Cardinality (unique > 50): Likely ID columns or continuous numeric data - Low Cardinality (unique ≤ 10): Good candidates for categorical encoding - Numeric Statistics: Use mean/std to detect outliers and distributions - Categorical Distribution: Use unique count to assess encoding strategy

Use Cases: - Feature engineering: Identify columns for one-hot encoding vs label encoding - Data modeling: Understand which columns need normalization - Storage optimization: High cardinality strings may benefit from dictionary encoding

Note: This also works with polars DataFrames.


19.3 Example 3: Automated Data Validation

Build automated validation checks to ensure data quality before processing.

import pandas as pd
import additory as add

# Dataset that should pass validation
df_valid = pd.DataFrame({
    'product_id': [1, 2, 3, 4, 5],
    'price': [10.99, 20.99, 30.99, 40.99, 50.99],
    'stock': [100, 200, 150, 175, 225]
})

# Dataset with validation issues
df_invalid = pd.DataFrame({
    'product_id': [1, 2, None, 4, 5],
    'price': [10.99, None, 30.99, None, 50.99],
    'stock': [100, 200, 150, 175, None]
})

# Validate both datasets
report_valid = add.scan('@analyze', df_valid, as_type='dataframe')
report_invalid = add.scan('@analyze', df_invalid, as_type='dataframe')

# Check validation rules
valid_passed = (report_valid['null_count'] == 0).all()
invalid_passed = (report_invalid['null_count'] == 0).all()

print("Valid Dataset:")
print(f"  Validation Passed: {valid_passed}")
print(f"  Null Values: {report_valid['null_count'].sum()}")

print("\nInvalid Dataset:")
print(f"  Validation Passed: {invalid_passed}")
print(f"  Null Values: {report_invalid['null_count'].sum()}")

# Count issues in invalid dataset
if not invalid_passed:
    issues = {
        'columns_with_nulls': len(report_invalid[report_invalid['null_count'] > 0]),
        'total_null_values': report_invalid['null_count'].sum(),
        'validation_passed': invalid_passed
    }
    
    print("\nValidation Issues:")
    for key, value in issues.items():
        print(f"  {key}: {value}")
    
    print("\nColumns with Issues:")
    print(report_invalid[report_invalid['null_count'] > 0][['column', 'null_count', 'null_pct']])

Output:

Valid Dataset:
  Validation Passed: True
  Null Values: 0

Invalid Dataset:
  Validation Passed: False
  Null Values: 5

Validation Issues:
  columns_with_nulls: 3
  total_null_values: 5
  validation_passed: False

Columns with Issues:
       column  null_count  null_pct
0  product_id           1      20.0
1       price           2      40.0
2       stock           1      20.0

Validation Rules: - No Nulls: All columns must have zero null values - Data Types: Verify expected data types match actual types - Value Ranges: Check min/max values are within expected bounds - Cardinality: Ensure unique counts match expectations

Automation Workflow: 1. Run @analyze on incoming data 2. Apply validation rules to the analysis report 3. Generate pass/fail status and detailed issue report 4. Block processing if validation fails 5. Log issues for data quality monitoring

Integration Points: - ETL pipelines: Validate before loading to warehouse - API endpoints: Validate incoming data payloads - Batch processing: Pre-flight checks before expensive operations - Data monitoring: Track quality metrics over time

Note: This also works with polars DataFrames.


19.4 Workflow Patterns

19.4.1 Pattern 1: Quality Gating

# Run analysis
report = add.scan('@analyze', df, as_type='dataframe')

# Define quality gates
max_null_pct = 10.0
min_unique_ratio = 0.1

# Check gates
quality_passed = (
    (report['null_pct'] <= max_null_pct).all() and
    (report['unique'] / report['count'] >= min_unique_ratio).all()
)

if quality_passed:
    # Proceed with processing
    process_data(df)
else:
    # Reject and log issues
    log_quality_issues(report)

19.4.2 Pattern 2: Incremental Profiling

# Profile baseline data
baseline = add.scan('@analyze', df_baseline, as_type='dataframe')

# Profile new data
current = add.scan('@analyze', df_current, as_type='dataframe')

# Compare distributions
drift_detected = (
    abs(baseline['mean'] - current['mean']) > threshold
).any()

if drift_detected:
    alert_data_drift()

19.4.3 Pattern 3: Multi-Stage Validation

# Stage 1: Schema validation
report = add.scan('@analyze', df, as_type='dataframe')
schema_valid = validate_schema(report)

# Stage 2: Quality validation
if schema_valid:
    quality_valid = validate_quality(report)
    
    # Stage 3: Business rules
    if quality_valid:
        business_valid = validate_business_rules(df, report)

19.5 Parameters

19.5.1 Required Parameters

  • mode: '@analyze' for statistical profiling
  • df: Input DataFrame (pandas or polars)

19.5.2 Optional Parameters

  • columns: Column filter (not yet implemented)
  • where: SQL-like filter condition (not yet implemented)
  • rows: Row range specifications (not yet implemented)
  • as_type: Output format ('dataframe', 'dict', 'text')

19.5.3 Positional Parameters

# Also works without naming certain parameters:
report = add.scan('@analyze', df, as_type='dataframe')

19.6 Best Practices

19.6.1 Performance

  • Run @analyze once and reuse results for multiple checks
  • Use as_type='dataframe' for programmatic analysis
  • Use as_type='text' for logging and display only

19.6.2 Quality Thresholds

  • Critical: null_pct > 50%, unique == 1 (constant)
  • Warning: null_pct > 10%, unique == count (all unique)
  • Info: null_pct > 0%, unique < 10 (low cardinality)

19.6.3 Automation

  • Integrate into CI/CD pipelines for data validation
  • Set up alerts for quality threshold violations
  • Track quality metrics over time for trend analysis

19.7 Next Steps

  • Page 1: Basic data scanning with @analyze mode
  • Page 2: Lineage tracking with @lineage mode
  • Batch 5: Error handling and edge cases (coming soon)