Metadata-Version: 2.4
Name: lakehouse-plumber
Version: 0.2.7
Summary: A tool for building and managing data lakehouse pipelines
Author-email: Mehdi Modarressi <mehdi.modarressi@gmail.com>
License: Apache License
        Version 2.0, January 2004
        http://www.apache.org/licenses/
        
        TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
        
        1. Definitions.
        
        "License" shall mean the terms and conditions for use, reproduction,
        and distribution as defined by Sections 1 through 9 of this document.
        
        "Licensor" shall mean the copyright owner or entity granting the License.
        
        "Legal Entity" shall mean the union of the acting entity and all
        other entities that control, are controlled by, or are under common
        control with that entity. For the purposes of this definition,
        "control" means (i) the power, direct or indirect, to cause the
        direction or management of such entity, whether by contract or
        otherwise, or (ii) ownership of fifty percent (50%) or more of the
        outstanding shares, or (iii) beneficial ownership of such entity.
        
        "You" (or "Your") shall mean an individual or Legal Entity
        exercising permissions granted by this License.
        
        "Source" form shall mean the preferred form for making modifications,
        including but not limited to software source code, documentation
        source, and configuration files.
        
        "Object" form shall mean any form resulting from mechanical
        transformation or translation of a Source form, including but
        not limited to compiled object code, generated documentation,
        and conversions to other media types.
        
        "Work" shall mean the work of authorship, whether in Source or
        Object form, made available under the License, as indicated by a
        copyright notice that is included in or attached to the work
        (which shall not include communication on electronic mailing
        lists, announcements on electronic bulletin boards, and
        similar communications).
        
        "Derivative Works" shall mean any work, whether in Source or Object
        form, that is based upon (or derived from) the Work and for which the
        editorial revisions, annotations, elaborations, or other modifications
        represent, as a whole, an original work of authorship. For the purposes
        of this License, Derivative Works shall not include works that remain
        separable from, or merely link (or bind by name) to the interfaces of,
        the Work and derivative works thereof.
        
        "Contribution" shall mean any work of authorship, including
        the original version of the Work and any modifications or additions
        to that Work or Derivative Works thereof, that is intentionally
        submitted to Licensor for inclusion in the Work by the copyright owner
        or by an individual or Legal Entity authorized to submit on behalf of
        the copyright owner. For the purposes of this definition, "submitted"
        means any form of electronic, verbal, or written communication sent
        to the Licensor or its representatives, including but not limited to
        communication on electronic mailing lists, source code control
        systems, and issue tracking systems that are managed by, or on behalf
        of, the Licensor for the purpose of discussing and improving the Work,
        but excluding communication that is conspicuously marked or otherwise
        designated in writing by the copyright owner as "Not a Contribution."
        
        "Contributor" shall mean Licensor and any individual or Legal Entity
        on behalf of whom a Contribution has been received by Licensor and
        subsequently incorporated within the Work.
        
        2. Grant of Copyright License. Subject to the terms and conditions of
        this License, each Contributor hereby grants to You a perpetual,
        worldwide, non-exclusive, no-charge, royalty-free, irrevocable
        copyright license to use, reproduce, modify, merge, publish,
        distribute, sublicense, and/or sell copies of the Work, and to
        permit persons to whom the Work is furnished to do so, subject to
        the following conditions:
        
        The above copyright notice and this permission notice shall be
        included in all copies or substantial portions of the Work.
        
        3. Grant of Patent License. Subject to the terms and conditions of
        this License, each Contributor hereby grants to You a perpetual,
        worldwide, non-exclusive, no-charge, royalty-free, irrevocable
        (except as stated in this section) patent license to make, have made,
        use, offer to sell, sell, import, and otherwise transfer the Work,
        where such license applies only to those patent claims licensable
        by such Contributor that are necessarily infringed by their
        Contribution(s) alone or by combination of their Contribution(s)
        with the Work to which such Contribution(s) was submitted. If You
        institute patent litigation against any entity (including a
        cross-claim or counterclaim in a lawsuit) alleging that the Work
        or a Contribution incorporated within the Work constitutes direct
        or contributory patent infringement, then any patent licenses
        granted to You under this License for that Work shall terminate
        as of the date such litigation is filed.
        
        4. Redistribution. You may reproduce and distribute copies of the
        Work or Derivative Works thereof in any medium, with or without
        modifications, and in Source or Object form, provided that You
        meet the following conditions:
        
        (a) You must give any other recipients of the Work or
        Derivative Works a copy of this License; and
        
        (b) You must cause any modified files to carry prominent notices
        stating that You changed the files; and
        
        (c) You must retain, in the Source form of any Derivative Works
        that You distribute, all copyright, patent, trademark, and
        attribution notices from the Source form of the Work,
        excluding those notices that do not pertain to any part of
        the Derivative Works; and
        
        (d) If the Work includes a "NOTICE" text file as part of its
        distribution, then any Derivative Works that You distribute must
        include a readable copy of the attribution notices contained
        within such NOTICE file, excluding those notices that do not
        pertain to any part of the Derivative Works, in at least one
        of the following places: within a NOTICE text file distributed
        as part of the Derivative Works; within the Source form or
        documentation, if provided along with the Derivative Works; or,
        within a display generated by the Derivative Works, if and
        wherever such third-party notices normally appear. The contents
        of the NOTICE file are for informational purposes only and
        do not modify the License. You may add Your own attribution
        notices within Derivative Works that You distribute, alongside
        or as an addendum to the NOTICE text from the Work, provided
        that such additional attribution notices cannot be construed
        as modifying the License.
        
        You may add Your own copyright notice to Your modifications and
        may provide additional or different license terms and conditions
        for use, reproduction, or distribution of Your modifications, or
        for any such Derivative Works as a whole, provided Your use,
        reproduction, and distribution of the Work otherwise complies with
        the conditions stated in this License.
        
        5. Submission of Contributions. Unless You explicitly state otherwise,
        any Contribution intentionally submitted for inclusion in the Work
        by You to the Licensor shall be under the terms and conditions of
        this License, without any additional terms or conditions.
        Notwithstanding the above, nothing herein shall supersede or modify
        the terms of any separate license agreement you may have executed
        with Licensor regarding such Contributions.
        
        6. Trademarks. This License does not grant permission to use the trade
        names, trademarks, service marks, or product names of the Licensor,
        except as required for reasonable and customary use in describing the
        origin of the Work and reproducing the content of the NOTICE file.
        
        7. Disclaimer of Warranty. Unless required by applicable law or
        agreed to in writing, Licensor provides the Work (and each
        Contributor provides its Contributions) on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
        implied, including, without limitation, any warranties or conditions
        of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
        PARTICULAR PURPOSE. You are solely responsible for determining the
        appropriateness of using or redistributing the Work and assume any
        risks associated with Your exercise of permissions under this License.
        
        8. Limitation of Liability. In no event and under no legal theory,
        whether in tort (including negligence), contract, or otherwise,
        unless required by applicable law (such as deliberate and grossly
        negligent acts) or agreed to in writing, shall any Contributor be
        liable to You for damages, including any direct, indirect, special,
        incidental, or consequential damages of any character arising as a
        result of this License or out of the use or inability to use the
        Work (including but not limited to damages for loss of goodwill,
        work stoppage, computer failure or malfunction, or any and all
        other commercial damages or losses), even if such Contributor
        has been advised of the possibility of such damages.
        
        9. Accepting Warranty or Support. You may choose to offer, and to
        charge a fee for, warranty, support, indemnity or other liability
        obligations and/or rights consistent with this License. However, in
        accepting such obligations, You may act only on Your own behalf and on
        Your sole responsibility, not on behalf of any other Contributor, and
        only if You agree to indemnify, defend, and hold each Contributor
        harmless for any liability incurred by, or claims asserted against,
        such Contributor by reason of your accepting any such warranty or support.
        
        END OF TERMS AND CONDITIONS
        
        APPENDIX: How to apply the Apache License to your work.
        
        To apply the Apache License to your work, attach the following
        boilerplate notice, with the fields enclosed by brackets "[]"
        replaced with your own identifying information. (Don't include
        the brackets!)  The text should be enclosed in the appropriate
        comment syntax for the file format. We also recommend that a
        file or class name and description of purpose be included on the
        same page as the copyright notice for easier identification within
        third-party archives.
        
        Copyright 2024 Mehdi Modarressi
        
        Licensed under the Apache License, Version 2.0 (the "License");
        you may not use this file except in compliance with the License.
        You may obtain a copy of the License at
        
            http://www.apache.org/licenses/LICENSE-2.0
        
        Unless required by applicable law or agreed to in writing, software
        distributed under the License is distributed on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        See the License for the specific language governing permissions and
        limitations under the License. 
Project-URL: Homepage, https://github.com/Mmodarre/Lakehouse_Plumber
Project-URL: Documentation, https://lakehouse-plumber.readthedocs.io/
Project-URL: Repository, https://github.com/Mmodarre/Lakehouse_Plumber.git
Project-URL: Bug Tracker, https://github.com/Mmodarre/Lakehouse_Plumber/issues
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: click>=8.0.0
Requires-Dist: pyyaml>=6.0.0
Requires-Dist: jinja2>=3.0.0
Requires-Dist: rich>=12.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: jsonschema>=4.0.0
Requires-Dist: importlib_resources>=1.3.0; python_version < "3.9"
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10.0; extra == "dev"
Requires-Dist: flake8>=6.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Provides-Extra: notebook
Requires-Dist: ipython>=7.0.0; extra == "notebook"
Requires-Dist: jupyter>=1.0.0; extra == "notebook"
Dynamic: license-file

# Lakehouse Plumber
**Plumbing the future of data engineering, one pipeline at a time** 🚀

*Because every data lake needs a good plumber to keep the flows running smoothly!* 🚰


<div align="center">
  <img src="lakehouse-plumber-logo.png" alt="LakehousePlumber Logo">
</div>

<div align="center">

[![PyPI version](https://badge.fury.io/py/lakehouse-plumber.svg)](https://badge.fury.io/py/lakehouse-plumber)
[![Tests](https://github.com/Mmodarre/Lakehouse_Plumber/actions/workflows/python_ci.yml/badge.svg)](https://github.com/Mmodarre/Lakehouse_Plumber/actions/workflows/python_ci.yml)
<!-- [![Python Support](https://img.shields.io/pypi/pyversions/lakehouse-plumber.svg)](https://pypi.org/project/lakehouse-plumber/) -->
[![License: Apache 2.0](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Lines of Code](https://img.shields.io/badge/lines%20of%20code-~15k-blue)](https://github.com/Mmodarre/Lakehouse_Plumber)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](http://makeapullrequest.com)
[![codecov](https://codecov.io/gh/Mmodarre/Lakehouse_Plumber/branch/main/graph/badge.svg?token=80IBHIFAQY)](https://codecov.io/gh/Mmodarre/Lakehouse_Plumber)
[![Documentation](https://img.shields.io/badge/docs-available-brightgreen.svg)](https://lakehouse-plumber.readthedocs.io/)
[![Databricks](https://img.shields.io/badge/Databricks-DLT-orange.svg)](https://databricks.com/product/delta-live-tables)

</div>

**Action-based Lakeflow Declaritive Pipelines (formerly DLT) code generator for Databricks**

LakehousePlumber is a powerful CLI tool that generates Lakeflow Declaritive Pipelines from YAML configurations, enabling data engineers to build robust, scalable data pipelines using a declarative approach.

## 🎯 Key Features

- **Action-Based Architecture**: Define pipelines using composable load, transform, and write actions
- **Append Flow API**: Efficient multi-stream ingestion with automatic table creation management
- **Template System**: Reusable pipeline templates with parameterization
- **Environment Management**: Multi-environment support with token substitution
- **Data Quality Integration**: Built-in expectations and validation
- **Smart Generation**: Only regenerate changed files with state management and content-based file writing
- **Pipeline Validation**: Comprehensive validation rules prevent configuration conflicts
- **Code Formatting**: Automatic Python code formatting with Black
- **Secret Management**: Secure handling of credentials and API keys
- **Operational Metadata**: Automatic lineage tracking and data provenance

## 🏗️ Architecture

### Action Types

LakehousePlumber supports three main action types:

#### 🔄 Load Actions
- **CloudFiles**: Structured streaming from cloud storage (JSON, Parquet, CSV)
- **Delta**: Read from existing Delta tables
- **SQL**: Execute SQL queries as data sources
- **JDBC**: Connect to external databases
- **Python**: Custom Python-based data loading

#### ⚡ Transform Actions
- **SQL**: Standard SQL transformations
- **Python**: Custom Python transformations
- **Data Quality**: Apply expectations
- **Schema**: Column mapping and type casting
- **Temp Table**: Create temporary views

#### 💾 Write Actions
- **Streaming Table**: Live tables with change data capture
- **Materialized View**: Batch-computed views for analytics

### Project Structure

```
my_lakehouse_project/
├── lhp.yaml                   # Project configuration
├── presets/                   # Reusable configurations
│   ├── bronze_layer.yaml      # Bronze layer defaults
│   ├── silver_layer.yaml      # Silver layer defaults
│   └── gold_layer.yaml        # Gold layer defaults
├── templates/                 # Pipeline templates
│   ├── standard_ingestion.yaml
│   └── scd_type2.yaml
├── pipelines/                 # Pipeline definitions
│   ├── bronze_ingestion/
│   │   ├── customers.yaml
│   │   └── orders.yaml
│   ├── silver_transforms/
│   │   └── customer_dimension.yaml
│   └── gold_analytics/
│       └── customer_metrics.yaml
├── substitutions/             # Environment-specific values
│   ├── dev.yaml
│   ├── staging.yaml
│   └── prod.yaml
├── expectations/              # Data quality rules
└── generated/                 # Generated code
```

## 🚀 Quick Start

### Installation

```bash
pip install lakehouse-plumber
```

### Initialize a Project

```bash
lhp init my_lakehouse_project
cd my_lakehouse_project
```

### Create Your First Pipeline

Create a simple ingestion pipeline:

```yaml
# pipelines/bronze_ingestion/customers.yaml
pipeline: bronze_ingestion
flowgroup: customers
presets:
  - bronze_layer

actions:
  - name: load_customers_raw
    type: load
    source:
      type: cloudfiles
      path: "{{ landing_path }}/customers"
      format: json
      schema_evolution_mode: addNewColumns
    target: v_customers_raw
    description: "Load raw customer data from landing zone"

  - name: write_customers_bronze
    type: write
    source: v_customers_raw
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "customers"
      table_properties:
        delta.enableChangeDataFeed: "true"
        quality: "bronze"
    description: "Write customers to bronze layer"
```

### Configure Environment

```yaml
# substitutions/dev.yaml
catalog: dev_catalog
bronze_schema: bronze
silver_schema: silver
gold_schema: gold
landing_path: /mnt/dev/landing
checkpoint_path: /mnt/dev/checkpoints

secrets:
  default_scope: dev-secrets
  scopes:
    database: dev-db-secrets
    storage: dev-storage-secrets
```

### Validate and Generate

```bash
# Validate configuration
lhp validate --env dev

# Generate pipeline code
lhp generate --env dev

# View generated code
ls generated/
```

## 📋 CLI Commands

### Project Management
- `lhp init <project_name>` - Initialize new project
- `lhp validate --env <env>` - Validate pipeline configurations
- `lhp generate --env <env>` - Generate pipeline code
- `lhp info` - Show project information and statistics

### Discovery and Inspection
- `lhp list-presets` - List available presets
- `lhp list-templates` - List available templates
- `lhp show <flowgroup> --env <env>` - Show resolved configuration
- `lhp stats` - Show project statistics

### State Management
- `lhp generate --cleanup` - Clean up orphaned generated files
- `lhp state --env <env>` - Show generation state
- `lhp state --cleanup --env <env>` - Clean up orphaned files

### IntelliSense Setup
- `lhp setup-intellisense` - Set up VS Code IntelliSense support
- `lhp setup-intellisense --check` - Check prerequisites
- `lhp setup-intellisense --status` - Show current setup status
- `lhp setup-intellisense --verify` - Verify setup is working
- `lhp setup-intellisense --conflicts` - Show extension conflict analysis
- `lhp setup-intellisense --cleanup` - Remove IntelliSense setup

## 🧠 VS Code IntelliSense Support

LakehousePlumber provides comprehensive VS Code IntelliSense support with auto-completion, validation, and documentation for all YAML configuration files.

### ✨ Features

- **Smart Auto-completion**: Context-aware suggestions for all configuration options
- **Real-time Validation**: Immediate feedback on configuration errors
- **Inline Documentation**: Hover hints and descriptions for all fields
- **Schema Validation**: Ensures your YAML files follow the correct structure
- **Error Detection**: Highlights syntax and semantic errors as you type

### 🔧 Setup

#### Prerequisites
- VS Code installed and accessible via command line
- LakehousePlumber installed (`pip install lakehouse-plumber`)

#### Quick Setup
```bash
# Check if your system is ready
lhp setup-intellisense --check

# Set up IntelliSense (one-time setup)
lhp setup-intellisense

# Restart VS Code to activate schema associations
```

#### Verify Setup
```bash
# Check if setup is working
lhp setup-intellisense --verify

# View current status
lhp setup-intellisense --status
```

### 🎯 What Gets IntelliSense Support

- **Pipeline Configurations** (`pipelines/**/*.yaml`) - Full pipeline schema with actions, sources, and targets
- **Templates** (`templates/**/*.yaml`) - Template definitions with parameter validation
- **Presets** (`presets/**/*.yaml`) - Preset configuration schema
- **Substitutions** (`substitutions/**/*.yaml`) - Environment-specific value validation
- **Project Configuration** (`lhp.yaml`) - Main project settings

### 📝 Usage

Once set up, open any Lakehouse Plumber YAML file in VS Code and enjoy:

1. **Auto-completion**: Press `Ctrl+Space` to see available options
2. **Documentation**: Hover over any field to see descriptions
3. **Validation**: Red underlines indicate errors with helpful messages
4. **Structure**: IntelliSense guides you through the correct YAML structure

Example of IntelliSense in action:
```yaml
# Type "actions:" and get auto-completion for action types
actions:
  - name: load_data
    type: # ← IntelliSense suggests: load, transform, write
    source:
      type: # ← IntelliSense suggests: cloudfiles, delta, sql, jdbc, python
      path: # ← Documentation shows path requirements
```

### 🛠️ Troubleshooting

#### Extension Conflicts
Some YAML extensions may conflict with LakehousePlumber schemas:
```bash
# Check for conflicts
lhp setup-intellisense --conflicts

# View detailed conflict analysis
lhp setup-intellisense --conflicts
```

#### Setup Issues
```bash
# Force setup even if prerequisites aren't met
lhp setup-intellisense --force

# Clean up and start fresh
lhp setup-intellisense --cleanup
lhp setup-intellisense
```

#### Common Issues

**IntelliSense not working after setup:**
1. Restart VS Code completely
2. Verify setup: `lhp setup-intellisense --verify`
3. Check for extension conflicts: `lhp setup-intellisense --conflicts`

**Schema associations missing:**
1. Check status: `lhp setup-intellisense --status`
2. Re-run setup: `lhp setup-intellisense --force`

**Red Hat YAML extension conflicts:**
- The Red Hat YAML extension is detected but usually works well alongside LakehousePlumber schemas
- If issues persist, you can temporarily disable it or adjust its settings

### 🔄 Maintenance

The IntelliSense setup is persistent and doesn't need regular maintenance. However, you may want to:

```bash
# Check status periodically
lhp setup-intellisense --status

# Update after LakehousePlumber upgrades
lhp setup-intellisense --force
```

## 🎨 Advanced Features

### Presets

Create reusable configurations:

```yaml
# presets/bronze_layer.yaml
name: bronze_layer
version: "1.0"
description: "Standard bronze layer configuration"

defaults:
  operational_metadata: true
  load_actions:
    cloudfiles:
      schema_evolution_mode: addNewColumns
      rescue_data_column: "_rescued_data"
  write_actions:
    streaming_table:
      table_properties:
        delta.enableChangeDataFeed: "true"
        delta.autoOptimize.optimizeWrite: "true"
        quality: "bronze"
```

### Templates

Create parameterized pipeline templates:

```yaml
# templates/standard_ingestion.yaml
name: standard_ingestion
version: "1.0"
description: "Standard data ingestion template"

parameters:
  - name: source_path
    type: string
    required: true
  - name: table_name
    type: string
    required: true
  - name: file_format
    type: string
    default: "json"

actions:
  - name: "load_{{ table_name }}_raw"
    type: load
    source:
      type: cloudfiles
      path: "{{ source_path }}"
      format: "{{ file_format }}"
    target: "v_{{ table_name }}_raw"
    
  - name: "write_{{ table_name }}_bronze"
    type: write
    source: "v_{{ table_name }}_raw"
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "{{ table_name }}"
```

### Data Quality

Integrate expectations:

```yaml
# expectations/customer_quality.yaml
expectations:
  - name: valid_customer_key
    constraint: "customer_key IS NOT NULL"
    on_violation: "fail"
  - name: valid_email
    constraint: "email RLIKE '^[A-Za-z0-9+_.-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"
    on_violation: "drop"
```

```yaml
# Pipeline with data quality
- name: validate_customers
  type: transform
  transform_type: data_quality
  source: v_customers_raw
  target: v_customers_validated
  expectations_file: "expectations/customer_quality.yaml"
```

### SCD Type 2

Implement Slowly Changing Dimensions:

```yaml
- name: customer_dimension_scd2
  type: transform
  transform_type: python
  source: v_customers_validated
  target: v_customers_scd2
  python_source: |
    def scd2_merge(df):
        return df.withColumn("__start_date", current_date()) \
                 .withColumn("__end_date", lit(None)) \
                 .withColumn("__is_current", lit(True))
```

## 🏗️ Table Creation and Append Flow API

LakehousePlumber uses the Databricks Append Flow API to efficiently handle multiple data streams writing to the same streaming table. This approach prevents table recreation conflicts and enables high-performance, concurrent data ingestion.

### Core Concepts

#### Table Creation Control

Every write action must specify whether it creates the table or appends to an existing one:

```yaml
- name: write_orders_primary
  type: write
  source: v_orders_cleaned_primary
  write_target:
    type: streaming_table
    database: "{catalog}.{bronze_schema}"
    table: orders
    create_table: true  # ← This action creates the table
    table_properties:
      delta.enableChangeDataFeed: "true"
      quality: "bronze"

- name: write_orders_secondary  
  type: write
  source: v_orders_cleaned_secondary
  write_target:
    type: streaming_table
    database: "{catalog}.{bronze_schema}"
    table: orders
    create_table: false  # ← This action appends to existing table
```

#### Generated DLT Code

The above configuration generates optimized DLT code:

```python
# Table is created once
dlt.create_streaming_table(
    name="catalog.bronze.orders",
    comment="Streaming table: orders",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

# Multiple append flows target the same table
@dlt.append_flow(
    target="catalog.bronze.orders",
    name="f_orders_primary",
    comment="Append flow to catalog.bronze.orders from v_orders_cleaned_primary"
)
def f_orders_primary():
    return spark.readStream.table("v_orders_cleaned_primary")

@dlt.append_flow(
    target="catalog.bronze.orders", 
    name="f_orders_secondary",
    comment="Append flow to catalog.bronze.orders from v_orders_cleaned_secondary"
)
def f_orders_secondary():
    return spark.readStream.table("v_orders_cleaned_secondary")
```

### Validation Rules

LakehousePlumber enforces strict validation rules to prevent conflicts:

#### Rule 1: Exactly One Creator Per Table
Each streaming table must have exactly one action with `create_table: true` across the entire pipeline.

```yaml
# ✅ VALID: One creator, multiple appenders
- name: write_lineitem_au
  write_target:
    table: lineitem
    create_table: true   # ← Creates table

- name: write_lineitem_nz  
  write_target:
    table: lineitem
    create_table: false  # ← Appends to table

- name: write_lineitem_us
  write_target:
    table: lineitem  
    create_table: false  # ← Appends to table
```

```yaml
# ❌ INVALID: Multiple creators
- name: action1
  write_target:
    table: lineitem
    create_table: true   # ← Error: Multiple creators

- name: action2
  write_target:
    table: lineitem
    create_table: true   # ← Error: Multiple creators
```

```yaml  
# ❌ INVALID: No creator
- name: action1
  write_target:
    table: lineitem
    create_table: false  # ← Error: No creator for table

- name: action2
  write_target:
    table: lineitem
    create_table: false  # ← Error: No creator for table
```

#### Rule 2: Explicit Configuration Required
The `create_table` field defaults to `false`, requiring explicit specification:

```yaml
# ❌ Implicit (defaults to false - may cause validation errors)
write_target:
  table: my_table
  # create_table not specified (defaults to false)

# ✅ Explicit (recommended)  
write_target:
  table: my_table
  create_table: true  # or false
```

### Error Handling

LakehousePlumber provides clear, actionable error messages:

```bash
# No table creator
Table creation validation failed:
  - Table 'catalog.bronze.orders' has no creator. 
    One action must have 'create_table: true'. 
    Used by: orders_ingestion.write_orders_bronze

# Multiple table creators  
Table creation validation failed:
  - Table 'catalog.bronze.orders' has multiple creators: 
    orders_ingestion.write_orders_primary, orders_ingestion.write_orders_secondary. 
    Only one action can have 'create_table: true'.
```

### Advanced Use Cases

#### Multi-Region Data Ingestion

```yaml
# Pipeline ingesting from multiple regions
actions:
  - name: write_events_us_east
    type: write
    source: v_events_us_east_cleaned
    write_target:
      type: streaming_table
      database: "{catalog}.{bronze_schema}"
      table: events
      create_table: true  # Primary region creates table
      partition_columns: ["event_date", "region"]
      
  - name: write_events_us_west
    type: write  
    source: v_events_us_west_cleaned
    write_target:
      type: streaming_table
      database: "{catalog}.{bronze_schema}"
      table: events
      create_table: false  # Secondary regions append
      
  - name: write_events_eu
    type: write
    source: v_events_eu_cleaned  
    write_target:
      type: streaming_table
      database: "{catalog}.{bronze_schema}"
      table: events
      create_table: false  # Secondary regions append
```

#### Cross-Flowgroup Table Sharing

Tables can be shared across multiple flowgroups within the same pipeline:

```yaml
# flowgroup1.yaml
pipeline: bronze_facts
flowgroup: orders_processing
actions:
  - name: write_orders_online
    write_target:
      table: all_orders
      create_table: true  # This flowgroup creates the table

# flowgroup2.yaml  
pipeline: bronze_facts
flowgroup: legacy_orders
actions:
  - name: write_orders_legacy
    write_target:
      table: all_orders
      create_table: false  # This flowgroup appends to existing table
```

### Smart File Generation

LakehousePlumber includes intelligent file writing that reduces unnecessary file churn:

#### Content-Based File Writing
- Only writes files when content actually changes
- Normalizes whitespace and formatting for accurate comparison
- Reduces Git noise and CI/CD overhead

```bash
# Generation output shows statistics
✅ Generation complete: 2 files written, 8 files skipped (no changes)
```

#### Benefits
- **Faster CI/CD**: Fewer file changes mean faster builds
- **Cleaner Git History**: No unnecessary commits for unchanged files  
- **Reduced Resource Usage**: Less file I/O and processing
- **Better Developer Experience**: Clear indication of actual changes

### Migration Guide

#### From Legacy DLT Code

If you have existing DLT code with multiple `dlt.create_streaming_table()` calls:

```python
# ❌ Legacy: Multiple table creations
dlt.create_streaming_table(name="catalog.bronze.orders", ...)
dlt.create_streaming_table(name="catalog.bronze.orders", ...)  # Conflict!

@dlt.table(name="catalog.bronze.orders")
def orders_flow1():
    return spark.readStream.table("source1")
    
@dlt.table(name="catalog.bronze.orders")  
def orders_flow2():
    return spark.readStream.table("source2")
```

Update your YAML configuration:

```yaml
# ✅ New: Explicit table creation control
- name: write_orders_primary
  source: source1
  write_target:
    table: orders
    create_table: true   # Only this action creates

- name: write_orders_secondary
  source: source2  
  write_target:
    table: orders
    create_table: false  # This action appends
```

#### Backward Compatibility

Existing configurations without `create_table` flags will work but may trigger validation warnings. Update configurations gradually by adding explicit `create_table` flags.

## 🔧 Development

### Prerequisites

- Python 3.8+
- Databricks workspace with enabled
- Access to cloud storage (S3, ADLS, GCS)

### Local Development

```bash
# Clone the repository
git clone https://github.com/yourusername/lakehouse-plumber.git
cd lakehouse-plumber

# Install in development mode
pip install -e .

# Run tests
pytest tests/

# Run CLI
lhp --help
```

### Testing

LakehousePlumber includes comprehensive test coverage:

```bash
# Run all tests
pytest

# Run specific test categories
pytest tests/test_integration.py      # Integration tests
pytest tests/test_cli.py             # CLI tests
pytest tests/test_advanced_features.py  # Advanced features
pytest tests/test_performance.py     # Performance tests
```

## 📚 Examples

### Bronze Layer Ingestion

```yaml
pipeline: bronze_ingestion
flowgroup: orders
presets:
  - bronze_layer

actions:
  - name: load_orders_cloudfiles
    type: load
    source:
      type: cloudfiles
      path: "{{ landing_path }}/orders"
      format: parquet
      schema_evolution_mode: addNewColumns
    target: v_orders_raw
    operational_metadata: true
    
  - name: write_orders_bronze
    type: write
    source: v_orders_raw
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ bronze_schema }}"
      table: "orders"
      partition_columns: ["order_date"]
```

### Silver Layer Transformation

```yaml
pipeline: silver_transforms
flowgroup: customer_dimension

actions:
  - name: cleanse_customers
    type: transform
    transform_type: sql
    source: "{{ catalog }}.{{ bronze_schema }}.customers"
    target: v_customers_cleansed
    sql: |
      SELECT 
        customer_key,
        TRIM(UPPER(customer_name)) as customer_name,
        REGEXP_REPLACE(phone, '[^0-9]', '') as phone_clean,
        address,
        nation_key,
        market_segment,
        account_balance
      FROM STREAM(LIVE.customers)
      WHERE customer_key IS NOT NULL
      
  - name: apply_scd2
    type: transform
    transform_type: python
    source: v_customers_cleansed
    target: v_customers_scd2
    python_source: |
      @dlt.view
      def scd2_logic():
          return spark.readStream.table("LIVE.v_customers_cleansed")
          
  - name: write_customer_dimension
    type: write
    source: v_customers_scd2
    write_target:
      type: streaming_table
      database: "{{ catalog }}.{{ silver_schema }}"
      table: "dim_customers"
      table_properties:
        delta.enableChangeDataFeed: "true"
        quality: "silver"
```

### Gold Layer Analytics

```yaml
pipeline: gold_analytics
flowgroup: customer_metrics

actions:
  - name: customer_lifetime_value
    type: transform
    transform_type: sql
    source: 
      - "{{ catalog }}.{{ silver_schema }}.dim_customers"
      - "{{ catalog }}.{{ silver_schema }}.fact_orders"
    target: v_customer_ltv
    sql: |
      SELECT 
        c.customer_key,
        c.customer_name,
        c.market_segment,
        COUNT(o.order_key) as total_orders,
        SUM(o.total_price) as lifetime_value,
        AVG(o.total_price) as avg_order_value,
        MAX(o.order_date) as last_order_date
      FROM LIVE.dim_customers c
      LEFT JOIN LIVE.fact_orders o ON c.customer_key = o.customer_key
      WHERE c.__is_current = true
      GROUP BY c.customer_key, c.customer_name, c.market_segment
      
  - name: write_customer_metrics
    type: write
    source: v_customer_ltv
    write_target:
      type: materialized_view
      database: "{{ catalog }}.{{ gold_schema }}"
      table: "customer_metrics"
      refresh_schedule: "0 2 * * *"  # Daily at 2 AM
```

## 🤝 Contributing

We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

### Development Setup

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Ensure all tests pass
6. Submit a pull request

### Publishing to PyPI

The project includes a comprehensive publishing script (`publish.sh`) that handles building and publishing to both Test PyPI and Production PyPI.

#### Prerequisites

Install the publishing dependencies:
```bash
pip install -r requirements-dev.txt
```

#### Publishing Options

**Build Only** (for testing):
```bash
./publish.sh --build-only
```

**Publish to Test PyPI**:
```bash
export TEST_PYPI_TOKEN="your-test-pypi-token"
./publish.sh --test
```

**Publish to Production PyPI**:
```bash
export PYPI_TOKEN="your-production-pypi-token"
./publish.sh --prod
```

#### Additional Options

- `--skip-tests`: Skip running tests before build
- `--skip-git-check`: Skip git status check
- `--clean-only`: Clean build artifacts only
- `--help`: Show all available options

#### API Token Setup

1. **Test PyPI**: Get your token from [test.pypi.org](https://test.pypi.org/manage/account/token/)
2. **Production PyPI**: Get your token from [pypi.org](https://pypi.org/manage/account/token/)

Set them as environment variables:
```bash
export TEST_PYPI_TOKEN="pypi-AgEIcHlwaS5vcmcC..."
export PYPI_TOKEN="pypi-AgEIcHlwaS5vcmcC..."
```

#### Safety Features

- Automatic tests run before publishing
- Git status check prevents publishing with uncommitted changes
- Double confirmation required for production publishing
- Package validation using `twine check`
- Colorized output for better visibility

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## 🆘 Support

- **Documentation**: [Wiki](https://github.com/yourusername/lakehouse-plumber/wiki)
- **Issues**: [GitHub Issues](https://github.com/yourusername/lakehouse-plumber/issues)
- **Discussions**: [GitHub Discussions](https://github.com/yourusername/lakehouse-plumber/discussions)

## 🙏 Acknowledgments

- Built for the Databricks ecosystem
- Inspired by modern data engineering practices
- Designed for the medallion architecture pattern

---

**Made with ❤️ for Databricks and Lakeflow Declarative Data Pipelines** 
