Metadata-Version: 2.4
Name: pristy-alfresco-operators
Version: 0.7.7
Summary: Alfresco Operators for Pristy
Author-email: Jérémie Lesage <jeremie.lesage@jeci.fr>
License: Apache-2.0
Keywords: airflow,alfresco,pristy
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: Apache Airflow
Classifier: Framework :: Pytest
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Requires-Python: <3.13,>=3.12
Requires-Dist: apache-airflow-providers-apache-kafka>=1.6.1
Requires-Dist: apache-airflow-providers-http>=4.13.3
Requires-Dist: apache-airflow-providers-postgres>=5.14.0
Requires-Dist: apache-airflow<4.0.0,>=3.1.2
Requires-Dist: jsonschema>=4.24.0
Requires-Dist: pendulum>=3.1.0
Requires-Dist: pytest>=8.3.4
Requires-Dist: requests>=2.32.4
Description-Content-Type: text/markdown

# Pristy Alfresco Operators for Apache Airflow

Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.

[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Python Version](https://img.shields.io/badge/python-3.12-blue.svg)](https://www.python.org/downloads/)
[![Apache Airflow](https://img.shields.io/badge/Airflow-3.1%2B-017CEE.svg)](https://airflow.apache.org/)
[![PyPI version](https://badge.fury.io/py/pristy-alfresco-operators.svg)](https://pypi.org/project/pristy-alfresco-operators/)

## Features

- **Search & Fetch**: Query Alfresco nodes via Search API with pagination support
- **Transform**: Convert Alfresco nodes to standardized Pristy pivot format
- **Export**: Push transformed nodes to Kafka or filesystem
- **Backup & Restore**: Download/upload binary content with SHA1 checksum verification (v0.7.0+)
- **SQLite Storage**: Save/load node metadata to/from SQLite databases for backup portability (v0.7.0+)
- **State Tracking**: PostgreSQL-based migration state management
- **Schema Validation**: JSON Schema validation before export

## Installation

```bash
pip install pristy-alfresco-operators
```

Or with uv (recommended):

```bash
uv add pristy-alfresco-operators
```

## Requirements

- Python 3.12
- Apache Airflow 3.1+
- PostgreSQL (for state tracking)
- Apache Kafka (optional, for Kafka export)

### Version Compatibility

- **Version 0.6.0+**: Requires Apache Airflow 3.1+ (breaking change), uses uv for dependency management
- **Version 0.5.x and below**: Compatible with Apache Airflow 2.9+

### Migrating from Airflow 2.x

If you're upgrading from Airflow 2.x to 3.x, version 0.6.0 includes the following changes:

- Replaced deprecated `airflow.utils.helpers.merge_dicts` with Python's native `|` operator
- Updated dependency constraint to `apache-airflow>=3.1.1,<4.0.0`
- Fixed deprecation warnings for Airflow 3.1+ imports

All operators remain API-compatible. No changes required in your DAG code.

## Operators

### Search & Fetch Operators

#### `AlfrescoSearchOperator`
Search Alfresco nodes using FTS (Full Text Search) with pagination.

```python
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator

search_task = AlfrescoSearchOperator(
    task_id="search_documents",
    query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
    page_size=100,
    max_items=1000,
    sort_field="cm:modified",
    sort_ascending=False,
    http_conn_id="alfresco_api"
)
```

#### `AlfrescoFetchChildrenOperator`
Fetch all children of a folder node.

```python
from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator

fetch_children = AlfrescoFetchChildrenOperator(
    task_id="fetch_children",
    folders="workspace://SpacesStore/folder-uuid",
    page_size=50,
    max_items=2000
)
```

#### `AlfrescoFetchNodeOperator`
Fetch a single node by UUID.

```python
from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator

fetch_node = AlfrescoFetchNodeOperator(
    task_id="fetch_node",
    node_id="workspace://SpacesStore/node-uuid"
)
```

### Transform Operators

#### `TransformFileOperator`
Transform Alfresco file nodes to Pristy pivot format.

```python
from pristy.alfresco_operator.transform_file import TransformFileOperator

transform_files = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    mapping_func=custom_metadata_mapper  # Optional
)
```

#### `TransformFolderOperator`
Transform Alfresco folder nodes to Pristy pivot format.

```python
from pristy.alfresco_operator.transform_folder import TransformFolderOperator

transform_folders = TransformFolderOperator(
    task_id="transform_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)
```

### Export Operators

#### `PushToKafkaOperator`
Push nodes to Kafka with JSON Schema validation.

```python
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    source_key="uuid"
)
```

#### `PushToDirectoryOperator`
Export nodes as JSON files to filesystem.

```python
from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)
```

### Database Operators

#### `CreateChildrenTableOperator`
Create PostgreSQL tracking table.

```python
from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator

create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="export_alfresco_folder_children"
)
```

#### `SaveFolderToDbOperator`
Save folder children to tracking table.

```python
from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="export_alfresco_folder_children"
)
```

### Backup & Restore Operators (v0.7.0+)

#### `AlfrescoDownloadContentOperator`
Download binary content from Alfresco nodes with SHA1 checksum verification.

```python
from pristy.alfresco_operator.download_content_operator import AlfrescoDownloadContentOperator

download_file = AlfrescoDownloadContentOperator(
    task_id="download_file",
    node_id="workspace://SpacesStore/file-uuid",
    output_dir="/datas/backups/files",
    filename="{uuid}.bin",  # Supports {uuid}, {name} placeholders
    calculate_checksum=True,  # SHA1 checksum calculation
    http_conn_id="alfresco_api"
)
```

#### `AlfrescoUploadPristyOperator`
Upload files or folders to Alfresco via Pristy custom endpoint with multipart/form-data.

```python
from pristy.alfresco_operator.upload_pristy_operator import AlfrescoUploadPristyOperator

upload_file = AlfrescoUploadPristyOperator(
    task_id="upload_file",
    file_path="/datas/backups/files/abc-123.bin",
    metadata={
        "name": "document.pdf",
        "type": "cm:content",
        "path": {"parent_uuid": "workspace://SpacesStore/parent-uuid"},
        "properties": {"cm:title": "My Document"}
    },
    http_conn_id="alfresco_api_target",
    endpoint="/alfresco/service/fr/jeci/pristy/nodes/inject"
)
```

#### `SaveNodeToSqliteOperator`
Save node metadata to SQLite database for backup purposes.

```python
from pristy.alfresco_operator.save_node_to_sqlite import SaveNodeToSqliteOperator

save_to_sqlite = SaveNodeToSqliteOperator(
    task_id="save_metadata",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    sqlite_path="/datas/backups/backup_20250110/backup.db",
    file_downloaded=True,
    checksums={"uuid-123": "2fd4e1c67a2d28fced849ee1bb76e7391b93eb12"}
)
```

#### `LoadNodesFromSqliteOperator`
Load node metadata from SQLite database for restore operations.

```python
from pristy.alfresco_operator.load_nodes_from_sqlite import LoadNodesFromSqliteOperator

load_from_sqlite = LoadNodesFromSqliteOperator(
    task_id="load_metadata",
    sqlite_path="/datas/backups/backup_20250110/backup.db",
    batch_size=20,
    node_type_filter="file",  # 'folder', 'file', or None
    order_by="file_size DESC"  # Custom sorting
)
```

**Complete Backup & Restore System:**
See the [dag-alfresco-pristy](https://github.com/your-org/dag-alfresco-pristy) project for a complete implementation using these operators to backup and restore Alfresco content hierarchies.

## Configuration

### Airflow Connections

Define these connections in Airflow:

```bash
# Alfresco API connection
airflow connections add alfresco_api \
    --conn-type http \
    --conn-host alfresco.example.com \
    --conn-login admin \
    --conn-password admin \
    --conn-port 443 \
    --conn-schema https

# PostgreSQL tracking database
airflow connections add local_pg \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password airflow \
    --conn-schema airflow \
    --conn-port 5432

# Kafka (optional)
airflow connections add kafka_pristy \
    --conn-type kafka \
    --conn-extra '{"bootstrap.servers": "localhost:9092"}'
```

### Airflow Variables

```bash
# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"

# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"

# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"

# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"
```

### Configurable Connection and Variable Names (v0.6.0+)

Starting with version 0.6.0, all operators support custom connection IDs and variable names, allowing you to use the same operators in different contexts within a single Airflow instance.

#### PostgreSQL Operators

All PostgreSQL-based operators now accept `postgres_conn_id`:

```python
# Use a custom PostgreSQL connection
create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="my_tracking_table",
    postgres_conn_id="postgres_target"  # Default: "local_pg"
)
```

#### Alfresco HTTP Operators

All Alfresco operators now accept `http_conn_id`:

```python
# Use different Alfresco connections for source and target
search_source = AlfrescoSearchOperator(
    task_id="search_source",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_source"  # Default: "alfresco_api"
)

fetch_from_target = AlfrescoFetchNodeOperator(
    task_id="fetch_target",
    node_id="workspace://SpacesStore/node-uuid",
    http_conn_id="alfresco_target"  # Default: "alfresco_api"
)
```

#### Kafka Operator

Kafka operator accepts custom connection and topic variable:

```python
push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    kafka_conn_id="kafka_prod",           # Default: "kafka_pristy"
    topic_var="kafka_export_topic_prod"   # Default: "kafka_export_topic"
)
```

#### Transform Operators

Transform operators accept custom variable names:

```python
transform = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    source_server_var="alfresco_source_server_prod"  # Default: "alfresco_source_server"
)
```

#### Path Update Function

The `update_node_path` function (used internally by transform operators) accepts custom variable names:

```python
from pristy.alfresco_operator.update_node_path import update_node_path

# Customize variable names for path transformation
update_node_path(
    src_node,
    new_node,
    target_site_var="alfresco_export_site_custom",      # Default: "alfresco_export_target_site"
    target_root_uuid_var="alfresco_root_uuid_custom",   # Default: "alfresco_target_root_uuid"
    short_path_remove_var="path_prefix_remove_custom"   # Default: "short_path_remove"
)
```

#### Directory Export

Directory export operator accepts custom output path:

```python
push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    output_base_dir="/data/exports"  # Default: "/usr/local/airflow/output"
)
```

#### Multi-Context Example

Run multiple parallel migration workflows with different configurations:

```python
# Migration from Production Alfresco to Dev Alfresco
search_prod = AlfrescoSearchOperator(
    task_id="search_prod",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_prod"
)

# Migration from Staging Alfresco to Test Kafka
search_staging = AlfrescoSearchOperator(
    task_id="search_staging",
    query="TYPE:'cm:content'",
    http_conn_id="alfresco_staging"
)

push_prod = PushToKafkaOperator(
    task_id="push_prod",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_prod') }}",
    table_name="migration_prod",
    kafka_conn_id="kafka_dev",
    topic_var="kafka_topic_dev",
    postgres_conn_id="postgres_dev"
)

push_staging = PushToKafkaOperator(
    task_id="push_staging",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_staging') }}",
    table_name="migration_staging",
    kafka_conn_id="kafka_test",
    topic_var="kafka_topic_test",
    postgres_conn_id="postgres_test"
)
```

## Pristy Pivot Format

The operators transform Alfresco nodes to a standardized format as defined in the [Pristy Injector documentation](https://gitlab.com/pristy-oss/pristy-core/-/blob/develop/docs/PristyInjector.md?ref_type=heads).

Example node structure:

```json
{
  "name": "document.pdf",
  "type": "cm:content",
  "dateCreated": "2024-01-15T10:30:00Z",
  "owner": "admin",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin",
    "cm:modified": "2024-01-20T14:45:00Z",
    "cm:modifier": "editor"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid",
    "mimetype": "application/pdf",
    "size": 102400
  }
}
```

For complete format specification and available fields, see the [Pristy Injector documentation](https://gitlab.com/pristy-oss/pristy-core/-/blob/develop/docs/PristyInjector.md?ref_type=heads).

## Example DAG

```python
from datetime import datetime
from airflow import DAG
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

with DAG(
    dag_id="alfresco_to_kafka",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    search = AlfrescoSearchOperator(
        task_id="search_documents",
        query="TYPE:'cm:content'",
        page_size=100
    )

    transform = TransformFileOperator(
        task_id="transform_files",
        child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
    )

    push = PushToKafkaOperator(
        task_id="push_to_kafka",
        nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
        table_name="migration_tracking"
    )

    search >> transform >> push
```

## Development

### Setup

```bash
# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators

# Install dependencies (creates venv automatically)
uv sync --group test
```

### Testing

```bash
# Run all tests with coverage
uv run pytest

# Run specific test
uv run pytest tests/schema/test_schema.py -v

# Run tests without coverage
uv run pytest --no-cov

# Run tests in verbose mode
uv run pytest -vv
```

### Code Quality

This project follows strict code quality standards:

- **Security**: All SQL queries use parameterized statements
- **Type hints**: PEP 604 union syntax (`str | None`)
- **Imports**: Lazy imports in `execute()` methods for Airflow performance
- **Resource management**: `try/finally` blocks for connections
- **Error handling**: Granular error states with proper tracking

See [CONVENTIONS.md](CONVENTIONS.md) for detailed guidelines.

## Release Process

1. Update version in `pyproject.toml`
2. Update CHANGELOG.md (if present)
3. Create release:

```bash
TAG=0.8.0
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
uv build
uv publish
```

## Architecture

- **Operators**: Extend `BaseOperator` with task-specific logic
- **Utils**: Shared utilities (`parse_alfresco_pagination`, `create_base_node`)
- **State Tracking**: PostgreSQL tables with `new` → `running` → `success`/`error` states
- **Schema Validation**: JSON Schema validation before export

## License

Apache License 2.0 - see [LICENSE](LICENSE) file for details.

## Contributing

Contributions are welcome! Please:

1. Follow the code conventions in [CONVENTIONS.md](CONVENTIONS.md)
2. Write tests for new features
3. Ensure all tests pass
4. Submit a pull request

## Support

- **Issues**: https://gitlab.com/pristy-oss/pristy-alfresco-operators/-/issues
- **Documentation**: https://docs.pristy.fr/

## Acknowledgments

Developed by [Jeci](https://jeci.fr) for integration with [Pristy](https://pristy.fr/en/) services platform.
