Metadata-Version: 2.4
Name: udata-hydra
Version: 2.7.1.dev8
Summary: Async crawler and parsing service for data.gouv.fr
Author-email: Opendata Team <opendatateam@data.gouv.fr>
License: MIT
Requires-Python: <3.14,>=3.11
Description-Content-Type: text/markdown
Requires-Dist: aiohttp>=3.10.3
Requires-Dist: asyncpg>=0.29.0
Requires-Dist: coloredlogs>=15.0.1
Requires-Dist: csv-detective==0.11.1.dev3
Requires-Dist: dateparser>=1.1.7
Requires-Dist: humanfriendly>=10.0
Requires-Dist: json-stream>=2.3.3
Requires-Dist: marshmallow>=3.14.1
Requires-Dist: minio>=7.2.8
Requires-Dist: owslib>=0.35.0
Requires-Dist: progressist>=0.1.0
Requires-Dist: pyarrow>=16.1.0
Requires-Dist: python-magic>=0.4.25
Requires-Dist: python-slugify>=8.0.4
Requires-Dist: redis>=4.1.4
Requires-Dist: rq>=1.11.1
Requires-Dist: sentry-sdk>=2.10.0
Requires-Dist: sqlalchemy>=1.4.46
Requires-Dist: tippecanoe>=2.72.0
Requires-Dist: typer>=0.15.1

![udata-hydra](banner.png)

# udata-hydra

[![CircleCI](https://circleci.com/gh/datagouv/hydra.svg?style=svg)](https://circleci.com/gh/datagouv/hydra)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

`udata-hydra` is an async metadata crawler for [data.gouv.fr](https://www.data.gouv.fr).

URLs are crawled via _aiohttp_, catalog and crawled metadata are stored in a _PostgreSQL_ database.

Since it's called _hydra_, it also has mythical powers embedded:
- analyse remote resource metadata over time to detect changes in the smartest way possible
- if the remote resource is tabular (csv or excel-like), convert it to a PostgreSQL table, ready for APIfication, and to parquet to offer another distribution of the data
- if the remote resource is a geojson, convert it to PMTiles to offer another distribution of the data
- send crawl and analysis info to a udata instance

## 🏗️ Architecture schema

The architecture for the full workflow is the following:

![Full workflow architecture](docs/archi-idd-IDD.drawio.png)


The hydra crawler is one of the components of the architecture. It will check if resource is available, analyse the type of file if the resource has been modified, and analyse the CSV content. It will also convert CSV resources to database tables and send the data to a udata instance.

![Crawler architecture](docs/hydra.drawio.png)

## 📦 Dependencies

This project uses `libmagic`, which needs to be installed on your system, e.g.:

`brew install libmagic` on MacOS, or `sudo apt-get install libmagic-dev` on Linux.

This project uses Python >=3.11 and [uv](https://docs.astral.sh/uv/) to manage dependencies.

## 🚀 Installation

### With uv (recommended)
```bash
uv sync
```

### With pip
```bash
pip3 install -e .
```

## 🖥️ CLI

### Create database structure

Install udata-hydra dependencies and cli (see Installation section above), then migrate the DB with:

`uv run udata-hydra migrate`

### Load (UPSERT) latest catalog version from data.gouv.fr

`uv run udata-hydra load-catalog`

## 🕷️ Crawler

`uv run udata-hydra-crawl`

It will crawl (forever) the catalog according to the config set in `config.toml`, with a default config in `udata_hydra/config_default.toml`.

`BATCH_SIZE` URLs are queued at each loop run.

The crawler will start with URLs never checked and then proceed with URLs crawled before `CHECK_DELAYS` interval. It will then wait until something changes (catalog or time).

There's a by-domain backoff mechanism. The crawler will wait when, for a given domain in a given batch, `BACKOFF_NB_REQ` is exceeded in a period of `BACKOFF_PERIOD` seconds. It will retry until the backoff is lifted.

If an URL matches one of the `EXCLUDED_PATTERNS`, it will never be checked.

## ⚙️ Worker

A job queuing system is used to process long-running tasks. Launch the worker with the following command:

`uv run rq worker -c udata_hydra.worker`

To monitor worker status:

`uv run rq info -c udata_hydra.worker --interval 1`

To empty all the queues:

`uv run rq empty -c udata_hydra.worker low default high`

## 📊 CSV conversion to database

Converted CSV tables will be stored in the database specified via `config.DATABASE_URL_CSV`. For tests it's the same database as for the catalog. Locally, `docker compose` will launch two distinct database containers.

## 🧪 Tests

To run the tests, you need to launch the test database with `docker compose --profile test up -d`.

Make sure the dependencies are installed (including dev dependencies) with `uv sync` (see Installation section above).

Then you can run the tests with `uv run pytest`.

To run a specific test file, you can pass the path to the file to pytest, like this: `uv run pytest tests/test_file.py`.

To run a specific test function, you can pass the path to the file and the name of the function to pytest, like this: `uv run pytest tests/test_api/test_api_checks.py::test_get_latest_check`.

If you would like to see print statements as they are executed, you can pass the -s flag to pytest (`uv run pytest -s`). However, note that this can sometimes be difficult to parse.

### 🎯 Tests coverage

Pytest automatically uses the `coverage` package to generate a coverage report, which is displayed at the end of the test run in the terminal.
The coverage is configured in the `pyproject.toml` file, in the `[tool.pytest.ini_options]` section.
You can also override the coverage report configuration when running the tests by passing some flags like `--cov-report` to pytest. See [the pytest-cov documentation](https://pytest-cov.readthedocs.io/en/latest/config.html) for more information.

### 📈 Performance benchmarking

Hydra includes performance benchmarks to track and compare the performance of different operations on large files.
These benchmarks help identify performance regressions and improvements across different commits.

#### How it works

Performance benchmarks are automatically executed on CI runners when pushing to the `benchmarks` branch. The benchmarks test three key operations:

1. **CSV analysis** on large files using integrated test data
2. **CSV to GeoJSON conversion** on large files using the `TEST_GEOCSV_URL` configured in CI
3. **GeoJSON to PMTiles conversion** on large files using the `TEST_GEOCSV_URL` configured in CI

#### Benchmark execution

Benchmarks run on:
- **[CircleCI](https://app.circleci.com/pipelines/github/datagouv/hydra)** ([workflow file](https://github.com/datagouv/hydra/blob/main/.circleci/config.yml)) - available as a manually triggerable pipeline after a push to `benchmarks` branch
- **[GitHub Actions](https://github.com/datagouv/hydra/actions/workflows/benchmark.yml)** ([workflow file](https://github.com/datagouv/hydra/blob/main/.github/workflows/benchmark.yml)) - triggered automatically on pushes to `benchmarks` branch

Using two different CI systems allows for performance comparison across different environments and gives a way to avoid exhausting CI time limits.

#### Metrics collected

Each benchmark run collects **execution time** in seconds, **commit information** (hash, author) and **runner specifications** (CPU cores, memory, Python version, runner class), which are stored in [`.benchmarks/benchmarks.csv`](https://github.com/datagouv/hydra/blob/benchmarks/.benchmarks/benchmarks.csv).

More specifically:
- `datetime` - when the test was run
- `test_name` - which test was executed
- `input_file` - URL or path of the input test data file used
- `ci` - which CI system ran the test (github or circleci)
- `execution_time_seconds` - performance measurement
- `commit_author` - who made the commit
- `commit_id` - the commit hash (7 characters)
- `runner_class` - CircleCI/GitHub Actions runner type
- `runner_cpu` - number of CPU cores
- `runner_memory` - available memory in MB
- `python_version` - Python version used

Results are committed and pushed back to the `benchmarks` branch, creating a historical performance tracking dataset.

#### Viewing results

You can view the current benchmark results at: [benchmarks.csv](https://github.com/datagouv/hydra/blob/benchmarks/.benchmarks/benchmarks.csv)

#### Running benchmarks locally

To run performance benchmarks locally, you can use the CLI commands:

```bash
# Convert CSV to GeoJSON
uv run udata-hydra convert-csv-to-geojson /path/to/large/file.csv

# Convert GeoJSON to PMTiles
uv run udata-hydra convert-geojson-to-pmtiles /path/to/large/file.geojson
```

These commands allow you to test performance improvements locally before pushing to the benchmarks branch.

## 🔌 API

The API will need a Bearer token for each request on protected endpoints (any endpoint that isn't a `GET`).
The token is configured in the `config.toml` file as `API_KEY`, and has a default value set in the `udata_hydra/config_default.toml` file.

If you're using hydra as an external service to receive resource events from [udata](https://github.com/opendatateam/udata), then udata needs to also configure this
API key in its `udata.cfg` file:

```python
# Whether udata should publish the resource events
PUBLISH_ON_RESOURCE_EVENTS = True
# Where to publish the events
RESOURCES_ANALYSER_URI = "http://localhost:8000"
# The API key that hydra needs
RESOURCES_ANALYSER_API_KEY = "api_key_to_change"
```

### 🚀 Run

```bash
# Install dependencies (see Installation section above)
uv run adev runserver udata_hydra/app.py
```
By default, the app will listen on `localhost:8000`.
You can check the status of the app with `curl http://localhost:8000/api/health`.

### 🛣️ Routes/endpoints

The API serves the following endpoints:

*Related to checks:*
- `GET` on `/api/checks/latest?url={url}&resource_id={resource_id}` to get the latest check for a given URL and/or `resource_id`
- `GET` on `/api/checks/all?url={url}&resource_id={resource_id}` to get all checks for a given URL and/or `resource_id`
- `GET` on `/api/checks/aggregate?group_by={column}&created_at={date}` to get checks occurrences grouped by a `column` for a specific `date`

*Related to resources:*
- `GET` on `/api/resources/{resource_id}` to get a resource in the DB "catalog" table from its `resource_id`
- `POST` on `/api/resources` to receive a resource creation event from a source. It will create a new resource in the DB "catalog" table and mark it as priority for next crawling
- `PUT` on `/api/resources/{resource_id}` to update a resource in the DB "catalog" table
- `DELETE` on `/api/resources/{resource_id}` to delete a resource in the DB "catalog" table

> :warning: **Warning: the following routes are deprecated and will be removed in the future:**
> - `POST` on `/api/resource/created` -> use `POST` on `/api/resources/` instead
> - `POST` on `/api/resource/updated` -> use `PUT` on `/api/resources/` instead
> - `POST` on `/api/resource/deleted` -> use `DELETE` on `/api/resources/` instead

*Related to resources exceptions:*
- `GET` on `/api/resources-exceptions` to get the list of all resources exceptions
- `POST` on `/api/resources-exceptions` to create a new resource exception in the DB
- `PUT` on `/api/resources-exceptions/{resource_id}` to update a resource exception in the DB
- `DELETE` on `/api/resources-exceptions/{resource_id}` to delete a resource exception from the DB

*Related to some status and health check:*
- `GET` on `/api/status/crawler` to get the crawling status
- `GET` on `/api/status/worker` to get the worker status
- `GET` on `/api/stats` to get the crawling stats
- `GET` on `/api/health` to get the API version number and environment

You may want to use a helper such as [Bruno](https://www.usebruno.com/) to handle API calls, in which case all the endpoints are ready to use [here](https://github.com/datagouv/api-calls).
More details about some endpoints are provided below with examples, but not for all of them:

#### Get latest check

Works with `?url={url}` and `?resource_id={resource_id}`.

```bash
$ curl -s "http://localhost:8000/api/checks/latest?url=http://opendata-sig.saintdenis.re/datasets/661e19974bcc48849bbff7c9637c5c28_1.csv" | json_pp
{
   "status" : 200,
   "catalog_id" : 64148,
   "deleted" : false,
   "error" : null,
   "created_at" : "2021-02-06T12:19:08.203055",
   "response_time" : 0.830198049545288,
   "url" : "http://opendata-sig.saintdenis.re/datasets/661e19974bcc48849bbff7c9637c5c28_1.csv",
   "domain" : "opendata-sig.saintdenis.re",
   "timeout" : false,
   "id" : 114750,
   "dataset_id" : "5c34944606e3e73d4a551889",
   "resource_id" : "b3678c59-5b35-43ad-9379-fce29e5b56fe",
   "headers" : {
      "content-disposition" : "attachment; filename=\"xn--Dlimitation_des_cantons-bcc.csv\"",
      "server" : "openresty",
      "x-amz-meta-cachetime" : "191",
      "last-modified" : "Wed, 29 Apr 2020 02:19:04 GMT",
      "content-encoding" : "gzip",
      "content-type" : "text/csv",
      "cache-control" : "must-revalidate",
      "etag" : "\"20415964703d9ccc4815d7126aa3a6d8\"",
      "content-length" : "207",
      "date" : "Sat, 06 Feb 2021 12:19:08 GMT",
      "x-amz-meta-contentlastmodified" : "2018-11-19T09:38:28.490Z",
      "connection" : "keep-alive",
      "vary" : "Accept-Encoding"
   }
}
```

#### Get all checks for an URL or resource

Works with `?url={url}` and `?resource_id={resource_id}`.

```bash
$ curl -s "http://localhost:8000/api/checks/all?url=http://www.drees.sante.gouv.fr/IMG/xls/er864.xls" | json_pp
[
   {
      "domain" : "www.drees.sante.gouv.fr",
      "dataset_id" : "53d6eadba3a72954d9dd62f5",
      "timeout" : false,
      "deleted" : false,
      "response_time" : null,
      "error" : "Cannot connect to host www.drees.sante.gouv.fr:443 ssl:True [SSLCertVerificationError: (1, \"[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'www.drees.sante.gouv.fr'. (_ssl.c:1122)\")]",
      "catalog_id" : 232112,
      "url" : "http://www.drees.sante.gouv.fr/IMG/xls/er864.xls",
      "headers" : {},
      "id" : 165107,
      "created_at" : "2021-02-06T14:32:47.675854",
      "resource_id" : "93dfd449-9d26-4bb0-a6a9-ee49b1b8a4d7",
      "status" : null
   },
   {
      "timeout" : false,
      "deleted" : false,
      "response_time" : null,
      "error" : "Cannot connect to host www.drees.sante.gouv.fr:443 ssl:True [SSLCertVerificationError: (1, \"[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'www.drees.sante.gouv.fr'. (_ssl.c:1122)\")]",
      "domain" : "www.drees.sante.gouv.fr",
      "dataset_id" : "53d6eadba3a72954d9dd62f5",
      "created_at" : "2020-12-24T17:06:58.158125",
      "resource_id" : "93dfd449-9d26-4bb0-a6a9-ee49b1b8a4d7",
      "status" : null,
      "catalog_id" : 232112,
      "url" : "http://www.drees.sante.gouv.fr/IMG/xls/er864.xls",
      "headers" : {},
      "id" : 65092
   }
]
```

#### Get checks occurrences grouped by a column for a specific date

Works with `?group_by={column}` and `?created_at={date}`.
`date` should be a date in format `YYYY-MM-DD` or the default keyword `today`.

```bash
$ curl -s "http://localhost:8000/api/checks/aggregate?group_by=domain&created_at=today" | json_pp
[
  {
    "value": "www.geo2france.fr",
    "count": 4
  },
  {
    "value": "static.data.gouv.fr",
    "count": 4
  },
  {
    "value": "grandestprod.data4citizen.com",
    "count": 3
  },
  {
    "value": "www.datasud.fr",
    "count": 2
  },
  {
    "value": "koumoul.com",
    "count": 2
  },
  {
    "value": "opendata.aude.fr",
    "count": 2
  },
  {
    "value": "departement-ain.opendata.arcgis.com",
    "count": 2
  },
  {
    "value": "opendata.agglo-larochelle.fr",
    "count": 1
  }
]
```

#### Adding a resource exception

```bash
$ curl   -X POST http://localhost:8000/api/resources-exceptions \
         -H 'Authorization: Bearer <myAPIkey>' \
         -d '{
            "resource_id": "123e4567-e89b-12d3-a456-426614174000",
            "table_indexes": {
                  "siren": "index"
            },
            "comment": "This is a comment for the resource exception."
         }'
```

...or, if you don't want to add table indexes and a comment:
```bash
$ curl  -X POST localhost:8000/api/resources-exceptions \
        -H 'Authorization: Bearer <myAPIkey>' \
        -d '{"resource_id": "f868cca6-8da1-4369-a78d-47463f19a9a3"}'
```

#### Updating a resource exception

```bash
$ curl   -X PUT http://localhost:8000/api/resources-exceptions/f868cca6-8da1-4369-a78d-47463f19a9a3 \
         -H "Authorization: Bearer <myAPIkey>" \
         -d '{
            "table_indexes": {
                  "siren": "index",
                  "code_postal": "index"
            },
            "comment": "Updated comment for the resource exception."
         }'
```

#### Deleting a resource exception

```bash
$ curl  -X DELETE http://localhost:8000/api/resources-exceptions/f868cca6-8da1-4369-a78d-47463f19a9a3 \
        -H "Authorization: Bearer <myAPIkey>"
```

#### Get crawling status

```bash
$ curl -s "http://localhost:8000/api/status/crawler" | json_pp
{
   "fresh_checks_percentage" : 0.4,
   "pending_checks" : 142153,
   "total" : 142687,
   "fresh_checks" : 534,
   "checks_percentage" : 0.4,
   "resources_statuses_count": {
      "null": 195339,
      "BACKOFF": 0,
      "CRAWLING_URL": 0,
      "TO_ANALYSE_RESOURCE": 1,
      "ANALYSING_RESOURCE": 0,
      "TO_ANALYSE_CSV": 0,
      "ANALYSING_CSV": 0,
      "INSERTING_IN_DB": 0,
      "CONVERTING_TO_PARQUET": 0
  }
}
```

#### Get worker status

```bash
$ curl -s "http://localhost:8000/api/status/worker" | json_pp
{
   "queued" : {
      "default" : 0,
      "high" : 825,
      "low" : 655
   }
}
```

#### Get crawling stats

```bash
$ curl -s "http://localhost:8000/api/stats" | json_pp
{
   "status" : [
      {
         "count" : 525,
         "percentage" : 98.3,
         "label" : "ok"
      },
      {
         "label" : "error",
         "percentage" : 1.3,
         "count" : 7
      },
      {
         "label" : "timeout",
         "percentage" : 0.4,
         "count" : 2
      }
   ],
   "status_codes" : [
      {
         "code" : 200,
         "count" : 413,
         "percentage" : 78.7
      },
      {
         "code" : 501,
         "percentage" : 12.4,
         "count" : 65
      },
      {
         "percentage" : 6.1,
         "count" : 32,
         "code" : 404
      },
      {
         "code" : 500,
         "percentage" : 2.7,
         "count" : 14
      },
      {
         "code" : 502,
         "count" : 1,
         "percentage" : 0.2
      }
   ]
}
```

## 🔗 Using Webhook integration

**Set the config values**

Create a `config.toml` where your service and commands are launched, or specify a path to a TOML file via the `HYDRA_SETTINGS` environment variable. `config.toml` or equivalent will override values from `udata_hydra/config_default.toml`, lookup there for values that can/need to be defined.

```toml
UDATA_URI = "https://dev.local:7000/api/2"
UDATA_URI_API_KEY = "example.api.key"
SENTRY_DSN = "https://{my-sentry-dsn}"
```

The webhook integration sends HTTP messages to `udata` when resources are analysed or checked to fill resources extras.

Regarding analysis, there is a phase called "change detection". It will try to guess if a resource has been modified based on different criteria:
- harvest modified date in catalog
- content-length and last-modified headers
- checksum comparison over time

The payload should look something like:

```json
{
   "analysis:content-length": 91661,
   "analysis:mime-type": "application/zip",
   "analysis:checksum": "bef1de04601dedaf2d127418759b16915ba083be",
   "analysis:last-modified-at": "2022-11-27T23:00:54.762000",
   "analysis:last-modified-detection": "harvest-resource-metadata",
}
```

## 🛠️ Development

### 🐳 Docker compose

A single `docker-compose.yml` file is provided with profiles to manage different environments:
- Default services: `database` and `database-csv` (PostgreSQL containers for catalog/metadata and CSV conversion)
- `test` profile: `test-database` (ephemeral test database)
- `broker` profile: `broker` (Redis broker)

Usage:
- Development: `docker compose up -d` (or `docker compose --profile broker up -d` if Redis is needed)
- Tests: `docker compose --profile test up -d` (broker not needed, queue functionality is mocked)
- Broker only: `docker compose --profile broker up -d`

### 📝 Logging & Debugging

The log level can be adjusted using the environment variable LOG_LEVEL.
For example, to set the log level to `DEBUG` when initializing the database, use `LOG_LEVEL="DEBUG" udata-hydra init_db `.

### 📋 Writing a migration

1. Add a file named `migrations/{YYYYMMDD}_{description}.sql` and write the SQL you need to perform migration.
2. `udata-hydra migrate` will migrate the database as needed.

## 🚀 Deployment

3 services need to be deployed for the full stack to run:
- worker
- api / app
- crawler

Refer to each section to learn how to launch them. The only differences from dev to prod are:
- use `HYDRA_SETTINGS` env var to point to your custom `config.toml`
- use `HYDRA_APP_SOCKET_PATH` to configure where aiohttp should listen to a [reverse proxy connection (eg nginx)](https://docs.aiohttp.org/en/stable/deployment.html#nginx-configuration) and use `udata-hydra-app` to launch the app server

## 🤝 Contributing

Before contributing to the repository and making any PR, it is necessary to initialize the pre-commit hooks:
```bash
pre-commit install
```
Once this is done, code formatting and linting, as well as import sorting, will be automatically checked before each commit.

If you cannot use pre-commit, it is necessary to format, lint, and sort imports with [Ruff](https://astral.sh/ruff/) for linting and formatting, and [ty](https://docs.astral.sh/ty/) for type checking. **Either running these commands manually or installing the pre-commit hook is required before submitting contributions.**
```bash
# Lint (including import sorting) and format code
uv run ruff check --fix && uv run ruff format

# Type check (ty)
uv run ty check
```

By default `ty check` checks the project root; pass paths to check specific files or directories. See the [ty CLI reference](https://docs.astral.sh/ty/reference/cli/) for options.

### 🏷️ Releases and versioning

The release process uses the [`tag_version.sh`](tag_version.sh) script to create git tags, GitHub releases and update [CHANGELOG.md](CHANGELOG.md) automatically. Package version numbers are automatically derived from git tags using [setuptools_scm](https://github.com/pypa/setuptools_scm), so no manual version updates are needed in `pyproject.toml`.

**Prerequisites**: [GitHub CLI](https://cli.github.com/) must be installed and authenticated, and you must be on the main branch with a clean working directory.

```bash
# Create a new release
./tag_version.sh <version>

# Example
./tag_version.sh 2.5.0

# Dry run to see what would happen
./tag_version.sh 2.5.0 --dry-run
```

The script automatically:
- Extracts commits since the last tag and formats them for CHANGELOG.md
- Identifies breaking changes (commits with `!:` in the subject)
- Creates a git tag and pushes it to the remote repository
- Creates a GitHub release with the changelog content
