19 Real-World Data Quality Workflows
Practical workflows combining @analyze mode with data quality assessment, profiling, and validation.
19.1 Example 1: Data Quality Assessment Pipeline
Build a comprehensive data quality assessment pipeline to identify and report quality issues.
import pandas as pd
import additory as add
# Simulate a dataset with quality issues
df = pd.DataFrame({
'customer_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'name': ['Alice', 'Bob', None, 'David', 'Eve', 'Frank', None, 'Helen', 'Ivan', 'Jane'],
'email': ['alice@email.com', 'bob@email.com', 'charlie@email.com', None,
'eve@email.com', None, 'grace@email.com', 'helen@email.com',
None, 'jane@email.com'],
'age': [25, 30, 35, 40, None, 50, 55, None, 65, 70],
'purchase_count': [5, 10, 15, 20, 25, 30, 35, 40, 45, 50]
})
# Step 1: Analyze data quality
quality_report = add.scan(
'@analyze',
df,
as_type='dataframe'
)
print("Data Quality Report:")
print(quality_report)
# Step 2: Identify columns with quality issues (>10% nulls)
high_null_cols = quality_report[quality_report['null_pct'] > 10.0]
print("\nColumns with >10% null values:")
print(high_null_cols[['column', 'null_count', 'null_pct']])
# Step 3: Get summary statistics
summary = {
'total_columns': len(quality_report),
'columns_with_nulls': len(quality_report[quality_report['null_count'] > 0]),
'high_null_columns': len(high_null_cols),
'avg_null_pct': quality_report['null_pct'].mean()
}
print("\nQuality Summary:")
for key, value in summary.items():
print(f" {key}: {value}")Output:
Data Quality Report:
column dtype count null_count null_pct unique mean std min max
0 customer_id Int64 10 0 0.0 10 5.5 3.027650 1.0 10.0
1 name String(Utf8) 10 2 20.0 8 NaN NaN NaN NaN
2 email String(Utf8) 10 3 30.0 7 NaN NaN NaN NaN
3 age Int64 10 2 20.0 8 47.5 16.583124 25.0 70.0
4 purchase_count Int64 10 0 0.0 10 27.5 15.732133 5.0 50.0
Columns with >10% null values:
column null_count null_pct
1 name 2 20.0
2 email 3 30.0
3 age 2 20.0
Quality Summary:
total_columns: 5
columns_with_nulls: 3
high_null_columns: 3
avg_null_pct: 14.0
Key Workflow Steps: 1. Run @analyze to get comprehensive quality metrics 2. Filter results to identify problematic columns 3. Generate summary statistics for reporting 4. Use insights to prioritize data cleaning efforts
Quality Thresholds: - null_pct > 10%: High null rate, requires attention - null_pct > 50%: Critical quality issue, consider dropping column - unique == count: Potential ID column or high cardinality - unique == 1: Constant column, no information value
Note: This also works with polars DataFrames.
19.2 Example 2: Column Profiling Workflow
Profile columns by data type to understand distributions and cardinality patterns.
import pandas as pd
import additory as add
# Sales data with various data types
df = pd.DataFrame({
'transaction_id': range(1, 101),
'amount': [100 + i * 10 for i in range(100)],
'category': ['Electronics'] * 30 + ['Clothing'] * 40 + ['Food'] * 30,
'payment_method': ['Credit'] * 50 + ['Debit'] * 30 + ['Cash'] * 20,
'customer_age': [25 + (i % 50) for i in range(100)]
})
# Profile the data
profile = add.scan(
'@analyze',
df,
as_type='dataframe'
)
print("Column Profile:")
print(profile)
# Extract insights for each column type
numeric_cols = profile[profile['dtype'].str.contains('Int|Float', na=False)]
categorical_cols = profile[profile['dtype'].str.contains('String', na=False)]
print("\nNumeric Columns:")
print(numeric_cols[['column', 'mean', 'std', 'min', 'max']])
print("\nCategorical Columns:")
print(categorical_cols[['column', 'unique', 'count']])
# Get cardinality insights
insights = {
'numeric_columns': len(numeric_cols),
'categorical_columns': len(categorical_cols),
'high_cardinality': len(profile[profile['unique'] > 50]),
'low_cardinality': len(profile[profile['unique'] <= 10])
}
print("\nCardinality Insights:")
for key, value in insights.items():
print(f" {key}: {value}")Output:
Column Profile:
column dtype count null_count null_pct unique mean std min max
0 transaction_id Int64 100 0 0.0 100 50.5 29.011492 1.0 100.0
1 amount Int64 100 0 0.0 100 595.0 295.594641 100.0 1090.0
2 category String(Utf8) 100 0 0.0 3 NaN NaN NaN NaN
3 payment_method String(Utf8) 100 0 0.0 3 NaN NaN NaN NaN
4 customer_age Int64 100 0 0.0 50 49.5 14.430869 25.0 74.0
Numeric Columns:
column mean std min max
0 transaction_id 50.5 29.011492 1.0 100.0
1 amount 595.0 295.594641 100.0 1090.0
4 customer_age 49.5 14.430869 25.0 74.0
Categorical Columns:
column unique count
2 category 3 100
3 payment_method 3 100
Cardinality Insights:
numeric_columns: 3
categorical_columns: 2
high_cardinality: 2
low_cardinality: 3
Profiling Insights: - High Cardinality (unique > 50): Likely ID columns or continuous numeric data - Low Cardinality (unique ≤ 10): Good candidates for categorical encoding - Numeric Statistics: Use mean/std to detect outliers and distributions - Categorical Distribution: Use unique count to assess encoding strategy
Use Cases: - Feature engineering: Identify columns for one-hot encoding vs label encoding - Data modeling: Understand which columns need normalization - Storage optimization: High cardinality strings may benefit from dictionary encoding
Note: This also works with polars DataFrames.
19.3 Example 3: Automated Data Validation
Build automated validation checks to ensure data quality before processing.
import pandas as pd
import additory as add
# Dataset that should pass validation
df_valid = pd.DataFrame({
'product_id': [1, 2, 3, 4, 5],
'price': [10.99, 20.99, 30.99, 40.99, 50.99],
'stock': [100, 200, 150, 175, 225]
})
# Dataset with validation issues
df_invalid = pd.DataFrame({
'product_id': [1, 2, None, 4, 5],
'price': [10.99, None, 30.99, None, 50.99],
'stock': [100, 200, 150, 175, None]
})
# Validate both datasets
report_valid = add.scan('@analyze', df_valid, as_type='dataframe')
report_invalid = add.scan('@analyze', df_invalid, as_type='dataframe')
# Check validation rules
valid_passed = (report_valid['null_count'] == 0).all()
invalid_passed = (report_invalid['null_count'] == 0).all()
print("Valid Dataset:")
print(f" Validation Passed: {valid_passed}")
print(f" Null Values: {report_valid['null_count'].sum()}")
print("\nInvalid Dataset:")
print(f" Validation Passed: {invalid_passed}")
print(f" Null Values: {report_invalid['null_count'].sum()}")
# Count issues in invalid dataset
if not invalid_passed:
issues = {
'columns_with_nulls': len(report_invalid[report_invalid['null_count'] > 0]),
'total_null_values': report_invalid['null_count'].sum(),
'validation_passed': invalid_passed
}
print("\nValidation Issues:")
for key, value in issues.items():
print(f" {key}: {value}")
print("\nColumns with Issues:")
print(report_invalid[report_invalid['null_count'] > 0][['column', 'null_count', 'null_pct']])Output:
Valid Dataset:
Validation Passed: True
Null Values: 0
Invalid Dataset:
Validation Passed: False
Null Values: 5
Validation Issues:
columns_with_nulls: 3
total_null_values: 5
validation_passed: False
Columns with Issues:
column null_count null_pct
0 product_id 1 20.0
1 price 2 40.0
2 stock 1 20.0
Validation Rules: - No Nulls: All columns must have zero null values - Data Types: Verify expected data types match actual types - Value Ranges: Check min/max values are within expected bounds - Cardinality: Ensure unique counts match expectations
Automation Workflow: 1. Run @analyze on incoming data 2. Apply validation rules to the analysis report 3. Generate pass/fail status and detailed issue report 4. Block processing if validation fails 5. Log issues for data quality monitoring
Integration Points: - ETL pipelines: Validate before loading to warehouse - API endpoints: Validate incoming data payloads - Batch processing: Pre-flight checks before expensive operations - Data monitoring: Track quality metrics over time
Note: This also works with polars DataFrames.
19.4 Workflow Patterns
19.4.1 Pattern 1: Quality Gating
# Run analysis
report = add.scan('@analyze', df, as_type='dataframe')
# Define quality gates
max_null_pct = 10.0
min_unique_ratio = 0.1
# Check gates
quality_passed = (
(report['null_pct'] <= max_null_pct).all() and
(report['unique'] / report['count'] >= min_unique_ratio).all()
)
if quality_passed:
# Proceed with processing
process_data(df)
else:
# Reject and log issues
log_quality_issues(report)19.4.2 Pattern 2: Incremental Profiling
# Profile baseline data
baseline = add.scan('@analyze', df_baseline, as_type='dataframe')
# Profile new data
current = add.scan('@analyze', df_current, as_type='dataframe')
# Compare distributions
drift_detected = (
abs(baseline['mean'] - current['mean']) > threshold
).any()
if drift_detected:
alert_data_drift()19.4.3 Pattern 3: Multi-Stage Validation
# Stage 1: Schema validation
report = add.scan('@analyze', df, as_type='dataframe')
schema_valid = validate_schema(report)
# Stage 2: Quality validation
if schema_valid:
quality_valid = validate_quality(report)
# Stage 3: Business rules
if quality_valid:
business_valid = validate_business_rules(df, report)19.5 Parameters
19.5.1 Required Parameters
mode:'@analyze'for statistical profilingdf: Input DataFrame (pandas or polars)
19.5.2 Optional Parameters
columns: Column filter (not yet implemented)where: SQL-like filter condition (not yet implemented)rows: Row range specifications (not yet implemented)as_type: Output format ('dataframe','dict','text')
19.5.3 Positional Parameters
# Also works without naming certain parameters:
report = add.scan('@analyze', df, as_type='dataframe')19.6 Best Practices
19.6.1 Performance
- Run
@analyzeonce and reuse results for multiple checks - Use
as_type='dataframe'for programmatic analysis - Use
as_type='text'for logging and display only
19.6.2 Quality Thresholds
- Critical: null_pct > 50%, unique == 1 (constant)
- Warning: null_pct > 10%, unique == count (all unique)
- Info: null_pct > 0%, unique < 10 (low cardinality)
19.6.3 Automation
- Integrate into CI/CD pipelines for data validation
- Set up alerts for quality threshold violations
- Track quality metrics over time for trend analysis
19.7 Next Steps
- Page 1: Basic data scanning with
@analyzemode - Page 2: Lineage tracking with
@lineagemode - Batch 5: Error handling and edge cases (coming soon)