Metadata-Version: 2.3
Name: pristy-alfresco-operators
Version: 0.5.0
Summary: Alfresco Operators for Pristy
Author: Jérémie Lesage
Author-email: jeremie.lesage@jeci.fr
Requires-Python: >=3.12,<3.13
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Requires-Dist: apache-airflow (>=2.9.1,<3.0.0)
Requires-Dist: apache-airflow-providers-apache-kafka (>=1.6.1)
Requires-Dist: apache-airflow-providers-http (>=4.13.3)
Requires-Dist: apache-airflow-providers-postgres (>=5.14.0)
Requires-Dist: jsonschema (>=4.24.0)
Requires-Dist: pendulum (>=3.1.0)
Requires-Dist: requests (>=2.32.4)
Description-Content-Type: text/markdown

# Pristy Alfresco Operators for Apache Airflow

Custom Apache Airflow operators for interacting with Alfresco ECM REST API and transforming content to Pristy pivot format.

[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Python Version](https://img.shields.io/badge/python-3.12-blue.svg)](https://www.python.org/downloads/)
[![PyPI version](https://badge.fury.io/py/pristy-alfresco-operators.svg)](https://pypi.org/project/pristy-alfresco-operators/)

## Features

- **Search & Fetch**: Query Alfresco nodes via Search API with pagination support
- **Transform**: Convert Alfresco nodes to standardized Pristy pivot format
- **Export**: Push transformed nodes to Kafka or filesystem
- **State Tracking**: PostgreSQL-based migration state management
- **Schema Validation**: JSON Schema validation before export

## Installation

```bash
pip install pristy-alfresco-operators
```

Or with Poetry:

```bash
poetry add pristy-alfresco-operators
```

## Requirements

- Python 3.12
- Apache Airflow 2.9+
- PostgreSQL (for state tracking)
- Apache Kafka (optional, for Kafka export)

## Operators

### Search & Fetch Operators

#### `AlfrescoSearchOperator`
Search Alfresco nodes using FTS (Full Text Search) with pagination.

```python
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator

search_task = AlfrescoSearchOperator(
    task_id="search_documents",
    query="TYPE:'cm:content' AND ANCESTOR:'workspace://SpacesStore/site-id'",
    page_size=100,
    max_items=1000,
    sort_field="cm:modified",
    sort_ascending=False,
    http_conn_id="alfresco_api"
)
```

#### `AlfrescoFetchChildrenOperator`
Fetch all children of a folder node.

```python
from pristy.alfresco_operator.fetch_children_node_operator import AlfrescoFetchChildrenOperator

fetch_children = AlfrescoFetchChildrenOperator(
    task_id="fetch_children",
    folders="workspace://SpacesStore/folder-uuid",
    page_size=50,
    max_items=2000
)
```

#### `AlfrescoFetchNodeOperator`
Fetch a single node by UUID.

```python
from pristy.alfresco_operator.fetch_node_operator import AlfrescoFetchNodeOperator

fetch_node = AlfrescoFetchNodeOperator(
    task_id="fetch_node",
    node_id="workspace://SpacesStore/node-uuid"
)
```

### Transform Operators

#### `TransformFileOperator`
Transform Alfresco file nodes to Pristy pivot format.

```python
from pristy.alfresco_operator.transform_file import TransformFileOperator

transform_files = TransformFileOperator(
    task_id="transform_files",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    mapping_func=custom_metadata_mapper  # Optional
)
```

#### `TransformFolderOperator`
Transform Alfresco folder nodes to Pristy pivot format.

```python
from pristy.alfresco_operator.transform_folder import TransformFolderOperator

transform_folders = TransformFolderOperator(
    task_id="transform_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}"
)
```

### Export Operators

#### `PushToKafkaOperator`
Push nodes to Kafka with JSON Schema validation.

```python
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

push_to_kafka = PushToKafkaOperator(
    task_id="push_to_kafka",
    nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
    table_name="migration_tracking",
    source_key="uuid"
)
```

#### `PushToDirectoryOperator`
Export nodes as JSON files to filesystem.

```python
from pristy.alfresco_operator.push_node_to_directory import PushToDirectoryOperator

push_to_dir = PushToDirectoryOperator(
    task_id="export_to_dir",
    node="{{ task_instance.xcom_pull(task_ids='transform_files') }}"
)
```

### Database Operators

#### `CreateChildrenTableOperator`
Create PostgreSQL tracking table.

```python
from pristy.alfresco_operator.create_children_table import CreateChildrenTableOperator

create_table = CreateChildrenTableOperator(
    task_id="create_table",
    table_name="export_alfresco_folder_children"
)
```

#### `SaveFolderToDbOperator`
Save folder children to tracking table.

```python
from pristy.alfresco_operator.save_folder_to_db import SaveFolderToDbOperator

save_to_db = SaveFolderToDbOperator(
    task_id="save_folders",
    child="{{ task_instance.xcom_pull(task_ids='fetch_children') }}",
    table_name="export_alfresco_folder_children"
)
```

## Configuration

### Airflow Connections

Define these connections in Airflow:

```bash
# Alfresco API connection
airflow connections add alfresco_api \
    --conn-type http \
    --conn-host alfresco.example.com \
    --conn-login admin \
    --conn-password admin \
    --conn-port 443 \
    --conn-schema https

# PostgreSQL tracking database
airflow connections add local_pg \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password airflow \
    --conn-schema airflow \
    --conn-port 5432

# Kafka (optional)
airflow connections add kafka_pristy \
    --conn-type kafka \
    --conn-extra '{"bootstrap.servers": "localhost:9092"}'
```

### Airflow Variables

```bash
# Source Alfresco server URL
airflow variables set alfresco_source_server "https://alfresco.example.com"

# Kafka export topic (optional)
airflow variables set kafka_export_topic "pristy-node-injector"

# Target site for migration (optional)
airflow variables set alfresco_export_target_site "my-target-site"

# Target root UUID for migration (optional)
airflow variables set alfresco_target_root_uuid "workspace://SpacesStore/target-folder-uuid"
```

## Pristy Pivot Format

The operators transform Alfresco nodes to a standardized format as defined in the [Pristy Injector documentation](https://gitlab.com/pristy-oss/pristy-core/-/blob/develop/docs/PristyInjector.md?ref_type=heads).

Example node structure:

```json
{
  "name": "document.pdf",
  "type": "cm:content",
  "dateCreated": "2024-01-15T10:30:00Z",
  "owner": "admin",
  "path": {
    "root": "site:my-site",
    "short": "/Documents/Folder"
  },
  "properties": {
    "cm:created": "2024-01-15T10:30:00Z",
    "cm:creator": "admin",
    "cm:modified": "2024-01-20T14:45:00Z",
    "cm:modifier": "editor"
  },
  "source": {
    "type": "alfresco",
    "server": "https://alfresco.example.com",
    "uuid": "workspace://SpacesStore/node-uuid",
    "mimetype": "application/pdf",
    "size": 102400
  }
}
```

For complete format specification and available fields, see the [Pristy Injector documentation](https://gitlab.com/pristy-oss/pristy-core/-/blob/develop/docs/PristyInjector.md?ref_type=heads).

## Example DAG

```python
from airflow import DAG
from airflow.utils.dates import days_ago
from pristy.alfresco_operator.search_node_operator import AlfrescoSearchOperator
from pristy.alfresco_operator.transform_file import TransformFileOperator
from pristy.alfresco_operator.push_node_to_kafka import PushToKafkaOperator

with DAG(
    dag_id="alfresco_to_kafka",
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False
) as dag:

    search = AlfrescoSearchOperator(
        task_id="search_documents",
        query="TYPE:'cm:content'",
        page_size=100
    )

    transform = TransformFileOperator(
        task_id="transform_files",
        child="{{ task_instance.xcom_pull(task_ids='search_documents') }}"
    )

    push = PushToKafkaOperator(
        task_id="push_to_kafka",
        nodes="{{ task_instance.xcom_pull(task_ids='transform_files') }}",
        table_name="migration_tracking"
    )

    search >> transform >> push
```

## Development

### Setup

```bash
# Clone repository
git clone https://github.com/your-org/pristy-alfresco-operators.git
cd pristy-alfresco-operators

# Create virtual environment
python3.12 -m venv .venv
source .venv/bin/activate

# Install dependencies
pip install poetry
poetry install
```

### Testing

```bash
# Run tests
pytest tests/

# Run specific test
pytest tests/schema/test_schema.py -v
```

### Code Quality

This project follows strict code quality standards:

- **Security**: All SQL queries use parameterized statements
- **Type hints**: PEP 604 union syntax (`str | None`)
- **Imports**: Lazy imports in `execute()` methods for Airflow performance
- **Resource management**: `try/finally` blocks for connections
- **Error handling**: Granular error states with proper tracking

See [CONVENTIONS.md](CONVENTIONS.md) for detailed guidelines.

## Release Process

1. Update version in `pyproject.toml`
2. Update CHANGELOG.md (if present)
3. Create release:

```bash
TAG=0.4.2
git add pyproject.toml README.md
git commit -m "version $TAG"
git tag "$TAG"
git push
git push origin "tags/$TAG"
poetry build
poetry publish
```

## Architecture

- **Operators**: Extend `BaseOperator` with task-specific logic
- **Utils**: Shared utilities (`parse_alfresco_pagination`, `create_base_node`)
- **State Tracking**: PostgreSQL tables with `new` → `running` → `success`/`error` states
- **Schema Validation**: JSON Schema validation before export

## License

Apache License 2.0 - see [LICENSE](LICENSE) file for details.

## Contributing

Contributions are welcome! Please:

1. Follow the code conventions in [CONVENTIONS.md](CONVENTIONS.md)
2. Write tests for new features
3. Ensure all tests pass
4. Submit a pull request

## Support

- **Issues**: https://gitlab.com/pristy-oss/pristy-alfresco-operators/-/issues
- **Documentation**: https://docs.pristy.fr/

## Acknowledgments

Developed by [Jeci](https://jeci.fr) for integration with [Pristy](https://pristy.fr/en/) services platform.

