Directory structure:
└── sentinel/
    ├── README.md
    ├── main.py
    ├── pyproject.toml
    ├── .python-version
    ├── sentinel/
    │   ├── __init__.py
    │   ├── config.py
    │   ├── core.py
    │   ├── emitter/
    │   │   ├── __init__.py
    │   │   ├── formatter.py
    │   │   └── server.py
    │   ├── ingestor/
    │   │   ├── __init__.py
    │   │   ├── buffer.py
    │   │   ├── features.py
    │   │   └── scraper.py
    │   ├── pipeline/
    │   │   ├── __init__.py
    │   │   ├── drift.py
    │   │   ├── predictor.py
    │   │   ├── registry.py
    │   │   ├── scheduler.py
    │   │   ├── trainer.py
    │   │   ├── versioning.py
    │   │   └── models/
    │   │       ├── __init__.py
    │   │       ├── arima.py
    │   │       ├── base.py
    │   │       ├── linear.py
    │   │       ├── sgd.py
    │   │       └── smoothing.py
    │   └── utils/
    │       ├── __init__.py
    │       ├── cron.py
    │       ├── logging.py
    │       ├── time.py
    │       └── validation.py
    └── tests/
        ├── __init__.py
        ├── test_emitter/
        │   ├── __init__.py
        │   ├── test_formatter.py
        │   └── test_server.py
        ├── test_ingestor/
        │   ├── __init__.py
        │   ├── test_buffer.py
        │   ├── test_features.py
        │   └── test_scraper.py
        ├── test_pipeline/
        │   ├── __init__.py
        │   ├── test_drift.py
        │   ├── test_predictor.py
        │   ├── test_registry.py
        │   ├── test_scheduler.py
        │   └── test_trainer.py
        └── test_utils/
            ├── __init__.py
            ├── test_cron.py
            └── test_time.py

================================================
FILE: README.md
================================================
[Empty file]


================================================
FILE: main.py
================================================
def main():
    print("Hello from sentinal!")


if __name__ == "__main__":
    main()



================================================
FILE: pyproject.toml
================================================
[project]
name = "sentinel"
version = "0.1.0"
description = "MLOps pipeline sitting between Prometheus and Grafana â€” pulls metrics, trains models, emits predictions"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "numpy>=1.26.0",
    "httpx>=0.27.0",
    "joblib>=1.3.0",
    "prometheus-client>=0.20.0",
    "croniter>=2.0.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-cov>=5.0.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.pytest.ini_options]
testpaths = ["tests"]


================================================
FILE: .python-version
================================================
3.11



================================================
FILE: sentinel/__init__.py
================================================
# sentinel/__init__.py

from sentinel.core import Sentinel
from sentinel.config import SentinelConfig, WatchConfig
from sentinel.pipeline.models import (
    BaseModel,
    LinearTrendModel,
    ExponentialSmoothingModel,
    ARIMAModel,
    SGDRegressorModel,
)

__all__ = [
    "Sentinel",
    "SentinelConfig",
    "WatchConfig",
    "BaseModel",
    "LinearTrendModel",
    "ExponentialSmoothingModel",
    "ARIMAModel",
    "SGDRegressorModel",
]


================================================
FILE: sentinel/config.py
================================================
# sentinel/config.py

from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from sentinel.pipeline.models.base import BaseModel


@dataclass
class WatchConfig:
    metric: str
    labels: dict[str, str] = field(default_factory=dict)
    model_class: type[BaseModel] | None = None
    cron: str = "0 */6 * * *"
    granularity: str = "1m"
    horizon: str = "5m"
    lookback: str = "30m"
    drift_finetune_threshold: float = 0.1
    drift_retrain_threshold: float = 0.3


@dataclass
class SentinelConfig:
    prometheus_url: str = "http://localhost:9090"
    emitter_port: int = 8080
    artifact_store: str = "./sentinel_artifacts"
    max_versions_per_metric: int = 5
    watches: list[WatchConfig] = field(default_factory=list)
    scrape_timeout: int = 10
    emit_confidence_bounds: bool = False


================================================
FILE: sentinel/core.py
================================================
# sentinel/core.py

import threading
import time
from datetime import datetime, timezone, timedelta
from sentinel.config import SentinelConfig, WatchConfig
from sentinel.ingestor.scraper import PrometheusScraper
from sentinel.ingestor.buffer import BufferRegistry
from sentinel.pipeline.drift import DriftMonitor
from sentinel.pipeline.versioning import VersionStore
from sentinel.pipeline.registry import ModelRegistry
from sentinel.pipeline.trainer import Trainer
from sentinel.pipeline.predictor import Predictor
from sentinel.pipeline.scheduler import MetricScheduler
from sentinel.emitter.server import EmitterServer, MetricEmitter
from sentinel.utils.validation import validate_sentinel_config
from sentinel.utils.logging import get_logger, configure_logging
from sentinel.utils.time import parse_duration_to_seconds

logger = get_logger(__name__)

# how often the ingestor pulls fresh data from Prometheus
_INGESTOR_TICK_SECONDS = 10


class Sentinel:
    """
    Top-level orchestrator. Wires together all three parts:

        Data Ingestor  â€” scrapes Prometheus, fills buffers
        MLOps Pipeline â€” trains, predicts, monitors drift, schedules retraining
        Data Emitter   â€” serves /metrics with predictions for Grafana

    Usage:

        from sentinel import Sentinel
        from sentinel.config import SentinelConfig, WatchConfig
        from sentinel.pipeline.models import ExponentialSmoothingModel

        config = SentinelConfig(
            prometheus_url="http://localhost:9090",
            emitter_port=8080,
            watches=[
                WatchConfig(
                    metric="http_request_duration_seconds",
                    labels={"job": "api"},
                    model_class=ExponentialSmoothingModel,
                    granularity="1m",
                    horizon="5m",
                    lookback="30m",
                    cron="0 */6 * * *",
                )
            ]
        )

        sentinel = Sentinel(config)
        sentinel.start()
    """

    def __init__(self, config: SentinelConfig, log_level: str = "INFO"):
        configure_logging(log_level)
        validate_sentinel_config(config)

        self.config = config
        self._stop_event = threading.Event()

        # shared scraper â€” one HTTP client for all metrics
        self._scraper = PrometheusScraper(
            prometheus_url=config.prometheus_url,
            timeout=config.scrape_timeout,
        )

        # shared buffer registry
        self._buffer_registry = BufferRegistry()

        # per-metric components
        self._drift_monitors: dict[str, DriftMonitor] = {}
        self._registries: dict[str, ModelRegistry] = {}
        self._trainers: dict[str, Trainer] = {}
        self._predictors: dict[str, Predictor] = {}
        self._schedulers: dict[str, MetricScheduler] = {}

        # emitter server
        self._emitter_server = EmitterServer(config)

        # ingestor background thread
        self._ingestor_thread: threading.Thread = None

        self._setup()

    def _setup(self) -> None:
        """
        Instantiate and wire all per-metric components.
        """
        for watch in self.config.watches:
            key = self._metric_key(watch)

            # buffer
            buffer = self._buffer_registry.register(
                key=key,
                metric=watch.metric,
                lookback=watch.lookback,
                granularity=watch.granularity,
            )

            # drift monitor
            drift_monitor = DriftMonitor(
                metric=key,
                finetune_threshold=watch.drift_finetune_threshold,
                retrain_threshold=watch.drift_retrain_threshold,
            )
            self._drift_monitors[key] = drift_monitor

            # version store + model registry
            version_store = VersionStore(
                metric_key=key,
                artifact_store=self.config.artifact_store,
                max_versions=self.config.max_versions_per_metric,
            )
            registry = ModelRegistry(metric_key=key, version_store=version_store)
            self._registries[key] = registry

            # attempt to restore model from disk so we don't cold start
            # on every restart if a trained model already exists
            def make_factory(w):
                def factory():
                    return w.model_class(
                        granularity=w.granularity,
                        horizon=w.horizon,
                        lookback=w.lookback,
                    )
                return factory

            registry.restore_from_disk(make_factory(watch))

            # trainer
            trainer = Trainer(
                watch_config=watch,
                buffer=buffer,
                registry=registry,
            )
            self._trainers[key] = trainer

            # predictor
            predictor = Predictor(
                watch_config=watch,
                buffer=buffer,
                registry=registry,
            )
            self._predictors[key] = predictor

            # scheduler
            def make_train_fn(t):
                def train_fn(severity, drift_score):
                    t.run(drift_severity=severity, drift_score=drift_score)
                return train_fn

            scheduler = MetricScheduler(
                watch_config=watch,
                buffer=buffer,
                drift_monitor=drift_monitor,
                train_fn=make_train_fn(trainer),
            )
            self._schedulers[key] = scheduler

            # emitter
            emitter = MetricEmitter(
                watch_config=watch,
                predictor=predictor,
                drift_monitor=drift_monitor,
                scraper=self._scraper,
                emit_confidence_bounds=self.config.emit_confidence_bounds,
            )
            self._emitter_server.register(emitter)

            logger.info(f"Sentinel wired up metric: {key}")

    def start(self) -> None:
        """
        Start all components. Blocks until stop() is called.
        """
        if not self._scraper.check_connectivity():
            raise RuntimeError(
                f"Cannot reach Prometheus at {self.config.prometheus_url}. "
                f"Check the URL and ensure Prometheus is running."
            )

        logger.info("Sentinel starting...")

        # start schedulers
        for scheduler in self._schedulers.values():
            scheduler.start()

        # start ingestor loop
        self._ingestor_thread = threading.Thread(
            target=self._ingestor_loop,
            name="ingestor",
            daemon=True,
        )
        self._ingestor_thread.start()

        # start emitter server
        self._emitter_server.start()

        logger.info(
            f"Sentinel running. "
            f"Watching {len(self.config.watches)} metric(s). "
            f"Emitting on port {self.config.emitter_port}."
        )

        # block main thread
        try:
            while not self._stop_event.is_set():
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("KeyboardInterrupt received, shutting down...")
            self.stop()

    def stop(self) -> None:
        """
        Gracefully stop all components.
        """
        logger.info("Sentinel stopping...")
        self._stop_event.set()

        for scheduler in self._schedulers.values():
            scheduler.stop()

        self._emitter_server.stop()
        self._scraper.close()

        logger.info("Sentinel stopped.")

    def rollback(self, metric: str, labels: dict[str, str] = None) -> bool:
        """
        Manually roll back the model for a given metric to the previous version.
        Returns True if rollback succeeded.
        """
        key = self._metric_key_from_parts(metric, labels or {})
        registry = self._registries.get(key)
        if registry is None:
            logger.error(f"No registry found for metric key '{key}'")
            return False
        return registry.rollback()

    def status(self) -> dict:
        """
        Returns a snapshot of current state for all watched metrics.
        Useful for health checks and debugging.
        """
        result = {}
        for watch in self.config.watches:
            key = self._metric_key(watch)
            registry = self._registries.get(key)
            drift = self._drift_monitors.get(key)
            buffer = self._buffer_registry.get(key)
            active_version = registry.active_version() if registry else None

            result[key] = {
                "ready": registry.is_ready() if registry else False,
                "active_version": active_version.version_id if active_version else None,
                "active_version_mae": active_version.mae if active_version else None,
                "drift_mae": drift.current_mae() if drift else None,
                "buffer_fill": buffer.fill_fraction() if buffer else None,
            }
        return result

    def _ingestor_loop(self) -> None:
        """
        Continuously pulls fresh data from Prometheus for all watched metrics
        and pushes it into the corresponding buffers.
        """
        while not self._stop_event.is_set():
            for watch in self.config.watches:
                try:
                    self._ingestor_tick(watch)
                except Exception as e:
                    logger.error(f"Ingestor tick failed for {watch.metric}: {e}")
            time.sleep(_INGESTOR_TICK_SECONDS)

    def _ingestor_tick(self, watch: WatchConfig) -> None:
        key = self._metric_key(watch)
        buffer = self._buffer_registry.get(key)

        if buffer is None:
            return

        now = datetime.now(timezone.utc)
        granularity_secs = parse_duration_to_seconds(watch.granularity)

        # pull just the last two granularity steps to stay current
        start = now - timedelta(seconds=granularity_secs * 2)

        samples = self._scraper.fetch_range(
            metric=watch.metric,
            labels=watch.labels,
            start=start,
            end=now,
            granularity=watch.granularity,
        )

        if samples:
            buffer.push_many(samples)
            logger.debug(f"[{key}] ingestor pushed {len(samples)} new samples")

    def _initial_backfill(self, watch: WatchConfig) -> None:
        """
        On startup, backfill the buffer with historical data
        covering the full lookback window so cold start training
        can begin as soon as possible.
        """
        key = self._metric_key(watch)
        buffer = self._buffer_registry.get(key)

        if buffer is None or buffer.is_ready():
            return

        now = datetime.now(timezone.utc)
        lookback_secs = parse_duration_to_seconds(watch.lookback)
        start = now - timedelta(seconds=lookback_secs)

        logger.info(f"[{key}] backfilling {watch.lookback} of historical data...")

        samples = self._scraper.fetch_range(
            metric=watch.metric,
            labels=watch.labels,
            start=start,
            end=now,
            granularity=watch.granularity,
        )

        if samples:
            buffer.push_many(samples)
            logger.info(f"[{key}] backfill complete â€” {len(samples)} samples loaded")
        else:
            logger.warning(f"[{key}] backfill returned no data from Prometheus")

    @staticmethod
    def _metric_key(watch: WatchConfig) -> str:
        if not watch.labels:
            return watch.metric
        label_str = ",".join(f'{k}="{v}"' for k, v in sorted(watch.labels.items()))
        return f"{watch.metric}{{{label_str}}}"

    @staticmethod
    def _metric_key_from_parts(metric: str, labels: dict[str, str]) -> str:
        if not labels:
            return metric
        label_str = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
        return f"{metric}{{{label_str}}}"


================================================
FILE: sentinel/emitter/__init__.py
================================================
# sentinel/emitter/__init__.py

from sentinel.emitter.formatter import (
    format_prediction,
    FormattedPrediction,
    build_metric_key,
)
from sentinel.emitter.server import MetricEmitter, EmitterServer

__all__ = [
    "format_prediction",
    "FormattedPrediction",
    "build_metric_key",
    "MetricEmitter",
    "EmitterServer",
]


================================================
FILE: sentinel/emitter/formatter.py
================================================
# sentinel/emitter/formatter.py

import time
import numpy as np
from datetime import datetime, timezone
from sentinel.pipeline.models.base import PredictionResult
from sentinel.config import WatchConfig
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)

# sentinel appends this suffix to the original metric name
_PREDICTION_SUFFIX = "_sentinel_predicted"
_LOWER_BOUND_SUFFIX = "_sentinel_predicted_lower"
_UPPER_BOUND_SUFFIX = "_sentinel_predicted_upper"


class FormattedPrediction:
    """
    Holds a set of Prometheus-ready metric samples derived from
    a single PredictionResult. One FormattedPrediction per watched metric
    per emitter tick.
    """

    def __init__(
        self,
        metric_name: str,
        labels: dict[str, str],
        steps: list[dict],
    ):
        """
        metric_name : base predicted metric name e.g. "http_request_duration_seconds_sentinel_predicted"
        labels      : label set carried over from the original metric plus sentinel metadata labels
        steps       : list of {"timestamp": float, "value": float, "step": int}
        """
        self.metric_name = metric_name
        self.labels = labels
        self.steps = steps

    def __repr__(self) -> str:
        return (
            f"FormattedPrediction(metric={self.metric_name}, "
            f"labels={self.labels}, steps={len(self.steps)})"
        )


def format_prediction(
    watch_config: WatchConfig,
    result: PredictionResult,
    emit_confidence_bounds: bool = False,
) -> list[FormattedPrediction]:
    """
    Convert a PredictionResult into a list of FormattedPredictions
    ready to be registered as Prometheus gauges.

    One FormattedPrediction is always produced for the predicted values.
    Two more are produced if emit_confidence_bounds=True and bounds exist.

    Labels added by Sentinel on top of the original metric labels:
        sentinel_horizon : prediction horizon string e.g. "5m"
        sentinel_version : model version id e.g. "v3"
        sentinel_step    : step index within the horizon (1-indexed)
    """
    base_labels = dict(watch_config.labels)
    version = result.model_version or "unknown"
    output = []

    predicted_steps = []
    for i, (val, ts) in enumerate(zip(result.values, result.timestamps)):
        predicted_steps.append({
            "timestamp": float(ts),
            "value": float(val),
            "step": i + 1,
        })

    output.append(FormattedPrediction(
        metric_name=watch_config.metric + _PREDICTION_SUFFIX,
        labels={
            **base_labels,
            "sentinel_horizon": watch_config.horizon,
            "sentinel_version": version,
        },
        steps=predicted_steps,
    ))

    if emit_confidence_bounds and result.lower_bound is not None and result.upper_bound is not None:
        lower_steps = []
        upper_steps = []
        for i, (lo, hi, ts) in enumerate(zip(result.lower_bound, result.upper_bound, result.timestamps)):
            lower_steps.append({"timestamp": float(ts), "value": float(lo), "step": i + 1})
            upper_steps.append({"timestamp": float(ts), "value": float(hi), "step": i + 1})

        output.append(FormattedPrediction(
            metric_name=watch_config.metric + _LOWER_BOUND_SUFFIX,
            labels={**base_labels, "sentinel_horizon": watch_config.horizon, "sentinel_version": version},
            steps=lower_steps,
        ))

        output.append(FormattedPrediction(
            metric_name=watch_config.metric + _UPPER_BOUND_SUFFIX,
            labels={**base_labels, "sentinel_horizon": watch_config.horizon, "sentinel_version": version},
            steps=upper_steps,
        ))

    return output


def build_metric_key(metric: str, labels: dict[str, str]) -> str:
    """
    Build a unique string key for a metric+labelset combination.
    Used by the server to look up and update the correct Gauge.
    """
    if not labels:
        return metric
    label_str = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
    return f"{metric}{{{label_str}}}"


================================================
FILE: sentinel/emitter/server.py
================================================
# sentinel/emitter/server.py

import threading
import time
from typing import Optional
from prometheus_client import Gauge, CollectorRegistry, start_http_server, REGISTRY
from sentinel.emitter.formatter import format_prediction, FormattedPrediction, build_metric_key
from sentinel.pipeline.predictor import Predictor
from sentinel.pipeline.drift import DriftMonitor
from sentinel.ingestor.scraper import PrometheusScraper
from sentinel.config import WatchConfig, SentinelConfig
from sentinel.utils.time import parse_duration_to_seconds
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)

# how often the emitter tick runs in seconds
_EMITTER_TICK_SECONDS = 10


class MetricEmitter:
    """
    Manages the Prometheus Gauge objects for a single watched metric
    and updates them on each emitter tick.

    One Gauge per prediction step within the horizon. Each Gauge carries
    the step index as a label so Grafana can plot all horizon steps
    on the same panel or separately.

    Also handles feeding actual values back to the DriftMonitor
    so drift detection has ground truth to compare against.
    """

    def __init__(
        self,
        watch_config: WatchConfig,
        predictor: Predictor,
        drift_monitor: DriftMonitor,
        scraper: PrometheusScraper,
        emit_confidence_bounds: bool = False,
        registry: CollectorRegistry = REGISTRY,
    ):
        self.watch_config = watch_config
        self.predictor = predictor
        self.drift_monitor = drift_monitor
        self.scraper = scraper
        self.emit_confidence_bounds = emit_confidence_bounds
        self._registry = registry

        # gauge_key -> Gauge
        self._gauges: dict[str, Gauge] = {}
        self._gauge_lock = threading.Lock()

        # last set of predictions keyed by step for drift comparison
        # step -> (predicted_value, predicted_timestamp)
        self._pending_predictions: dict[int, tuple[float, float]] = {}
        self._pending_lock = threading.Lock()

    def tick(self) -> None:
        """
        Run one emitter cycle:
            1. Run inference via Predictor
            2. Format predictions
            3. Update Gauges
            4. Feed actual values to DriftMonitor for past predictions
        """
        self._feed_drift_actuals()

        result = self.predictor.predict()
        if result is None:
            logger.debug(f"[{self.watch_config.metric}] no prediction available this tick")
            return

        formatted_list = format_prediction(
            watch_config=self.watch_config,
            result=result,
            emit_confidence_bounds=self.emit_confidence_bounds,
        )

        for formatted in formatted_list:
            self._update_gauges(formatted)

        # store predictions for drift comparison on next ticks
        with self._pending_lock:
            for step_data in formatted_list[0].steps:  # use main prediction series
                step = step_data["step"]
                self._pending_predictions[step] = (
                    step_data["value"],
                    step_data["timestamp"],
                )

    def _update_gauges(self, formatted: FormattedPrediction) -> None:
        """
        For each step in the prediction, set the corresponding Gauge value.
        Creates the Gauge lazily on first encounter.
        """
        for step_data in formatted.steps:
            step = step_data["step"]
            value = step_data["value"]

            labels_with_step = {**formatted.labels, "sentinel_step": str(step)}
            gauge_key = build_metric_key(formatted.metric_name, labels_with_step)

            gauge = self._get_or_create_gauge(
                gauge_key=gauge_key,
                metric_name=formatted.metric_name,
                label_names=list(labels_with_step.keys()),
            )

            try:
                gauge.labels(**labels_with_step).set(value)
            except Exception as e:
                logger.error(f"Failed to set gauge '{gauge_key}': {e}")

    def _get_or_create_gauge(
        self,
        gauge_key: str,
        metric_name: str,
        label_names: list[str],
    ) -> Gauge:
        with self._gauge_lock:
            if gauge_key not in self._gauges:
                # sanitize metric name for Prometheus â€” only alphanumeric and underscores
                safe_name = metric_name.replace(".", "_").replace("-", "_")
                try:
                    g = Gauge(
                        name=safe_name,
                        documentation=f"Sentinel prediction for {self.watch_config.metric}",
                        labelnames=label_names,
                        registry=self._registry,
                    )
                    self._gauges[gauge_key] = g
                    logger.debug(f"Created Gauge '{safe_name}' with labels {label_names}")
                except ValueError:
                    # Gauge already registered (e.g. after hot reload), fetch existing
                    self._gauges[gauge_key] = self._registry._names_to_collectors.get(safe_name)
            return self._gauges[gauge_key]

    def _feed_drift_actuals(self) -> None:
        """
        For each pending prediction whose timestamp has now passed,
        fetch the actual value from Prometheus and record the residual
        in the DriftMonitor.
        """
        now = time.time()
        due_steps = []

        with self._pending_lock:
            for step, (predicted_val, predicted_ts) in list(self._pending_predictions.items()):
                if predicted_ts <= now:
                    due_steps.append((step, predicted_val, predicted_ts))

            for step, _, _ in due_steps:
                del self._pending_predictions[step]

        for step, predicted_val, predicted_ts in due_steps:
            actual = self.scraper.fetch_latest(
                metric=self.watch_config.metric,
                labels=self.watch_config.labels,
            )
            if actual is not None:
                _, actual_val = actual
                self.drift_monitor.record(predicted_val, actual_val)
                logger.debug(
                    f"[{self.watch_config.metric}] drift record step={step} "
                    f"predicted={predicted_val:.4f} actual={actual_val:.4f}"
                )


class EmitterServer:
    """
    Runs the /metrics HTTP server and orchestrates all MetricEmitters.
    One EmitterServer per Sentinel instance.

    On each tick it calls tick() on every MetricEmitter in a loop.
    The HTTP server is started once and serves all registered Gauges
    via prometheus_client's built-in exposition.
    """

    def __init__(self, config: SentinelConfig):
        self.config = config
        self._emitters: list[MetricEmitter] = []
        self._stop_event = threading.Event()
        self._thread: Optional[threading.Thread] = None

    def register(self, emitter: MetricEmitter) -> None:
        self._emitters.append(emitter)

    def start(self) -> None:
        """
        Start the /metrics HTTP server and the emitter loop thread.
        """
        start_http_server(self.config.emitter_port)
        logger.info(f"Sentinel /metrics server started on port {self.config.emitter_port}")

        self._thread = threading.Thread(
            target=self._loop,
            name="emitter-server",
            daemon=True,
        )
        self._thread.start()
        logger.info("Emitter loop started")

    def stop(self) -> None:
        self._stop_event.set()
        if self._thread:
            self._thread.join(timeout=10)
        logger.info("Emitter server stopped")

    def _loop(self) -> None:
        while not self._stop_event.is_set():
            for emitter in self._emitters:
                try:
                    emitter.tick()
                except Exception as e:
                    logger.error(
                        f"Emitter tick failed for {emitter.watch_config.metric}: {e}"
                    )
            time.sleep(_EMITTER_TICK_SECONDS)


================================================
FILE: sentinel/ingestor/__init__.py
================================================
# sentinel/ingestor/__init__.py

from sentinel.ingestor.scraper import PrometheusScraper
from sentinel.ingestor.buffer import MetricBuffer, BufferRegistry
from sentinel.ingestor.features import (
    build_lag_features,
    build_feature_matrix,
    build_prediction_input,
)

__all__ = [
    "PrometheusScraper",
    "MetricBuffer",
    "BufferRegistry",
    "build_lag_features",
    "build_feature_matrix",
    "build_prediction_input",
]


================================================
FILE: sentinel/ingestor/buffer.py
================================================
# sentinel/ingestor/buffer.py

import threading
import numpy as np
from collections import deque
from datetime import datetime, timezone
from sentinel.utils.logging import get_logger
from sentinel.utils.time import parse_duration_to_seconds, parse_duration_to_steps

logger = get_logger(__name__)


class MetricBuffer:
    """
    Sliding window buffer for a single metric+labelset.
    Holds (timestamp, value) pairs up to the configured lookback window.
    Thread-safe â€” ingestor writes, trainer and drift monitor read concurrently.

    The buffer is the single source of truth for raw metric history inside Sentinel.
    Training, feature engineering, and drift detection all read from here.
    """

    def __init__(self, metric: str, lookback: str, granularity: str):
        """
        metric      : metric name, used for logging only
        lookback    : maximum duration to retain e.g. "30m"
        granularity : expected resolution of incoming data e.g. "1m"
        """
        self.metric = metric
        self.lookback = lookback
        self.granularity = granularity

        self._lookback_secs = parse_duration_to_seconds(lookback)
        self._max_steps = parse_duration_to_steps(lookback, granularity)

        # deque with maxlen automatically evicts oldest entries
        self._data: deque[tuple[float, float]] = deque(maxlen=self._max_steps)
        self._lock = threading.RLock()

    def push(self, timestamp: float, value: float) -> None:
        """
        Append a single (timestamp, value) pair.
        Oldest entries are evicted automatically once maxlen is reached.
        """
        with self._lock:
            self._data.append((timestamp, value))

    def push_many(self, samples: list[tuple[float, float]]) -> None:
        """
        Append a batch of (timestamp, value) pairs in order.
        Used during cold start when pulling historical range from Prometheus.
        """
        with self._lock:
            for ts, val in samples:
                self._data.append((ts, val))
        logger.debug(f"[{self.metric}] buffer loaded {len(samples)} samples")

    def get_values(self) -> np.ndarray:
        """
        Returns a numpy array of values in chronological order.
        """
        with self._lock:
            return np.array([v for _, v in self._data], dtype=float)

    def get_timestamps(self) -> np.ndarray:
        """
        Returns a numpy array of unix timestamps in chronological order.
        """
        with self._lock:
            return np.array([ts for ts, _ in self._data], dtype=float)

    def get_samples(self) -> list[tuple[float, float]]:
        """
        Returns all (timestamp, value) pairs as a list.
        """
        with self._lock:
            return list(self._data)

    def get_recent(self, n: int) -> np.ndarray:
        """
        Returns the n most recent values.
        If fewer than n values exist, returns all available.
        """
        with self._lock:
            data = list(self._data)
        recent = data[-n:] if len(data) >= n else data
        return np.array([v for _, v in recent], dtype=float)

    def is_ready(self) -> bool:
        """
        Returns True if the buffer has accumulated enough data
        to fill the full lookback window.
        Trainer checks this before triggering cold start training.
        """
        with self._lock:
            return len(self._data) >= self._max_steps

    def current_size(self) -> int:
        with self._lock:
            return len(self._data)

    def capacity(self) -> int:
        return self._max_steps

    def fill_fraction(self) -> float:
        """
        Returns how full the buffer is as a fraction between 0.0 and 1.0.
        Useful for logging cold start progress.
        """
        with self._lock:
            return len(self._data) / self._max_steps

    def clear(self) -> None:
        with self._lock:
            self._data.clear()
        logger.debug(f"[{self.metric}] buffer cleared")

    def __len__(self) -> int:
        with self._lock:
            return len(self._data)

    def __repr__(self) -> str:
        return (
            f"MetricBuffer(metric={self.metric}, "
            f"size={self.current_size()}/{self.capacity()}, "
            f"lookback={self.lookback}, "
            f"granularity={self.granularity})"
        )


class BufferRegistry:
    """
    Holds one MetricBuffer per watched metric.
    Core.py creates this once and passes it to both the ingestor and pipeline.
    """

    def __init__(self):
        self._buffers: dict[str, MetricBuffer] = {}
        self._lock = threading.Lock()

    def register(
        self,
        key: str,
        metric: str,
        lookback: str,
        granularity: str,
    ) -> MetricBuffer:
        """
        Register a new buffer for a metric.
        key is typically metric_name + label fingerprint.
        Returns the created buffer.
        """
        with self._lock:
            if key in self._buffers:
                logger.warning(f"Buffer already registered for key '{key}', returning existing.")
                return self._buffers[key]
            buf = MetricBuffer(metric=metric, lookback=lookback, granularity=granularity)
            self._buffers[key] = buf
            logger.debug(f"Registered buffer for key '{key}'")
            return buf

    def get(self, key: str) -> MetricBuffer | None:
        with self._lock:
            return self._buffers.get(key)

    def all_ready(self) -> bool:
        """
        Returns True only if every registered buffer has reached capacity.
        Used by scheduler to gate cold start training.
        """
        with self._lock:
            return all(buf.is_ready() for buf in self._buffers.values())

    def keys(self) -> list[str]:
        with self._lock:
            return list(self._buffers.keys())

    def __repr__(self) -> str:
        with self._lock:
            summaries = ", ".join(
                f"{k}({buf.fill_fraction():.0%})"
                for k, buf in self._buffers.items()
            )
        return f"BufferRegistry([{summaries}])"


================================================
FILE: sentinel/ingestor/features.py
================================================
# sentinel/ingestor/features.py

import numpy as np
from sentinel.utils.time import parse_duration_to_steps
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)


def build_lag_features(
    values: np.ndarray,
    n_lags: int,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Build a supervised learning dataset from a univariate time series
    using lag features.

    Given a series [v0, v1, v2, ..., vN], for n_lags=3 this produces:

        X (features):           y (targets):
        [v0, v1, v2]         ->  v3
        [v1, v2, v3]         ->  v4
        ...
        [vN-3, vN-2, vN-1]  ->  vN

    X shape: (n_samples, n_lags)
    y shape: (n_samples,)

    n_samples = len(values) - n_lags
    """
    if len(values) <= n_lags:
        raise ValueError(
            f"Not enough values to build lag features. "
            f"Need more than {n_lags} values, got {len(values)}."
        )

    X = np.array([values[i:i + n_lags] for i in range(len(values) - n_lags)])
    y = values[n_lags:]

    return X, y


def build_feature_matrix(
    values: np.ndarray,
    lookback: str,
    granularity: str,
    include_rolling_mean: bool = True,
    include_rolling_std: bool = True,
    include_time_features: bool = True,
    timestamps: np.ndarray = None,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Full feature engineering pipeline. Builds lag features and optionally
    appends rolling statistics and time-based features.

    values       : raw metric values in chronological order
    lookback     : lookback window e.g. "30m"
    granularity  : resolution e.g. "1m"
    timestamps   : unix timestamps aligned with values, required if
                   include_time_features=True

    Returns (X, y) where X has shape (n_samples, n_features).
    """
    n_lags = parse_duration_to_steps(lookback, granularity)
    X_lag, y = build_lag_features(values, n_lags)

    feature_blocks = [X_lag]

    if include_rolling_mean:
        rolling_mean = _rolling_mean(values, window=n_lags)
        # align to X rows â€” rolling mean is computed up to each sample's last lag
        rolling_mean_aligned = rolling_mean[n_lags:]
        feature_blocks.append(rolling_mean_aligned.reshape(-1, 1))

    if include_rolling_std:
        rolling_std = _rolling_std(values, window=n_lags)
        rolling_std_aligned = rolling_std[n_lags:]
        feature_blocks.append(rolling_std_aligned.reshape(-1, 1))

    if include_time_features and timestamps is not None:
        time_feats = _time_features(timestamps[n_lags:])
        feature_blocks.append(time_feats)
    elif include_time_features and timestamps is None:
        logger.warning(
            "include_time_features=True but no timestamps provided. "
            "Skipping time features."
        )

    X = np.hstack(feature_blocks)

    return X, y


def build_prediction_input(
    values: np.ndarray,
    lookback: str,
    granularity: str,
    include_rolling_mean: bool = True,
    include_rolling_std: bool = True,
    include_time_features: bool = True,
    timestamps: np.ndarray = None,
) -> np.ndarray:
    """
    Build the feature vector for the most recent observation.
    This is the input passed to model.predict() at inference time.

    Returns X with shape (1, n_features).
    """
    n_lags = parse_duration_to_steps(lookback, granularity)

    if len(values) < n_lags:
        raise ValueError(
            f"Need at least {n_lags} values to build prediction input, "
            f"got {len(values)}."
        )

    lag_window = values[-n_lags:].reshape(1, -1)
    feature_blocks = [lag_window]

    if include_rolling_mean:
        mean_val = np.mean(values[-n_lags:]).reshape(1, 1)
        feature_blocks.append(mean_val)

    if include_rolling_std:
        std_val = np.std(values[-n_lags:]).reshape(1, 1)
        feature_blocks.append(std_val)

    if include_time_features and timestamps is not None:
        latest_ts = timestamps[-1:]
        time_feats = _time_features(latest_ts)
        feature_blocks.append(time_feats)

    return np.hstack(feature_blocks)


def _rolling_mean(values: np.ndarray, window: int) -> np.ndarray:
    """
    Compute rolling mean with the given window size.
    First (window-1) values are filled with the expanding mean.
    """
    result = np.empty(len(values))
    for i in range(len(values)):
        start = max(0, i - window + 1)
        result[i] = np.mean(values[start:i + 1])
    return result


def _rolling_std(values: np.ndarray, window: int) -> np.ndarray:
    """
    Compute rolling standard deviation with the given window size.
    First (window-1) values are filled with the expanding std.
    """
    result = np.empty(len(values))
    for i in range(len(values)):
        start = max(0, i - window + 1)
        chunk = values[start:i + 1]
        result[i] = np.std(chunk) if len(chunk) > 1 else 0.0
    return result


def _time_features(timestamps: np.ndarray) -> np.ndarray:
    """
    Extract cyclical time features from unix timestamps.
    Encodes hour-of-day and day-of-week as sine/cosine pairs
    so the model sees time as continuous rather than categorical.

    Returns array of shape (n, 4):
        col 0: sin(hour_of_day)
        col 1: cos(hour_of_day)
        col 2: sin(day_of_week)
        col 3: cos(day_of_week)
    """
    hours = (timestamps % 86400) / 3600          # 0..23
    days = (timestamps // 86400) % 7             # 0..6

    hour_sin = np.sin(2 * np.pi * hours / 24)
    hour_cos = np.cos(2 * np.pi * hours / 24)
    day_sin = np.sin(2 * np.pi * days / 7)
    day_cos = np.cos(2 * np.pi * days / 7)

    return np.stack([hour_sin, hour_cos, day_sin, day_cos], axis=1)


================================================
FILE: sentinel/ingestor/scraper.py
================================================
# sentinel/ingestor/scraper.py

import time
import httpx
from datetime import datetime, timezone
from sentinel.utils.logging import get_logger
from sentinel.utils.time import (
    parse_duration_to_seconds,
    align_timestamp_to_granularity,
)

logger = get_logger(__name__)


class PrometheusScraper:
    """
    Pulls time series data from Prometheus HTTP API.
    One scraper instance is shared across all watched metrics.
    Uses httpx for sync HTTP â€” keeps it simple, no async complexity
    since scraping runs in its own background thread.
    """

    def __init__(self, prometheus_url: str, timeout: int = 10):
        self.prometheus_url = prometheus_url.rstrip("/")
        self.timeout = timeout
        self._client = httpx.Client(timeout=self.timeout)

    def fetch_range(
        self,
        metric: str,
        labels: dict[str, str],
        start: datetime,
        end: datetime,
        granularity: str,
    ) -> list[tuple[float, float]]:
        """
        Fetch a range of samples for a metric+label set from Prometheus.

        Returns a list of (unix_timestamp, value) tuples sorted ascending.
        Empty list if no data found.

        metric      : metric name e.g. "http_request_duration_seconds"
        labels      : label filters e.g. {"job": "api", "status": "200"}
        start       : range start datetime (UTC)
        end         : range end datetime (UTC)
        granularity : step size e.g. "1m"
        """
        query = self._build_selector(metric, labels)
        step = granularity  # Prometheus accepts e.g. "1m", "30s" directly

        params = {
            "query": query,
            "start": start.timestamp(),
            "end": end.timestamp(),
            "step": step,
        }

        url = f"{self.prometheus_url}/api/v1/query_range"

        try:
            response = self._client.get(url, params=params)
            response.raise_for_status()
        except httpx.HTTPStatusError as e:
            logger.error(
                f"Prometheus returned HTTP {e.response.status_code} "
                f"for query '{query}': {e.response.text}"
            )
            return []
        except httpx.RequestError as e:
            logger.error(f"Failed to reach Prometheus at {url}: {e}")
            return []

        data = response.json()

        if data.get("status") != "success":
            logger.warning(
                f"Prometheus query returned non-success status for '{query}': {data}"
            )
            return []

        results = data.get("data", {}).get("result", [])

        if not results:
            logger.debug(f"No data returned from Prometheus for query '{query}'")
            return []

        # take the first matching series
        # one model per metric+labelset so there should only be one
        values = results[0].get("values", [])

        return [(float(ts), float(val)) for ts, val in values]

    def fetch_latest(
        self,
        metric: str,
        labels: dict[str, str],
    ) -> tuple[float, float] | None:
        """
        Fetch the single most recent sample for a metric+label set.
        Used by the drift monitor and emitter to get current value.

        Returns (unix_timestamp, value) or None if no data.
        """
        query = self._build_selector(metric, labels)
        url = f"{self.prometheus_url}/api/v1/query"
        params = {
            "query": query,
            "time": datetime.now(timezone.utc).timestamp(),
        }

        try:
            response = self._client.get(url, params=params)
            response.raise_for_status()
        except httpx.HTTPStatusError as e:
            logger.error(
                f"Prometheus returned HTTP {e.response.status_code} "
                f"for instant query '{query}': {e.response.text}"
            )
            return None
        except httpx.RequestError as e:
            logger.error(f"Failed to reach Prometheus at {url}: {e}")
            return None

        data = response.json()

        if data.get("status") != "success":
            return None

        results = data.get("data", {}).get("result", [])

        if not results:
            return None

        ts, val = results[0].get("value", [None, None])

        if ts is None or val is None:
            return None

        return (float(ts), float(val))

    def check_connectivity(self) -> bool:
        """
        Ping Prometheus /-/healthy endpoint.
        Used at startup to fail fast if Prometheus is unreachable.
        """
        url = f"{self.prometheus_url}/-/healthy"
        try:
            response = self._client.get(url)
            return response.status_code == 200
        except httpx.RequestError:
            return False

    def close(self) -> None:
        self._client.close()

    def _build_selector(self, metric: str, labels: dict[str, str]) -> str:
        """
        Build a Prometheus instant vector selector string.

        Examples:
            "http_requests_total"
            'http_requests_total{job="api",status="200"}'
        """
        if not labels:
            return metric
        label_str = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
        return f"{metric}{{{label_str}}}"


================================================
FILE: sentinel/pipeline/__init__.py
================================================
# sentinel/pipeline/__init__.py

from sentinel.pipeline.drift import DriftMonitor, DriftResult, DriftSeverity
from sentinel.pipeline.versioning import ModelVersion, VersionStore
from sentinel.pipeline.registry import ModelRegistry

__all__ = [
    "DriftMonitor",
    "DriftResult",
    "DriftSeverity",
    "ModelVersion",
    "VersionStore",
    "ModelRegistry",
]


================================================
FILE: sentinel/pipeline/drift.py
================================================
# sentinel/pipeline/drift.py

import threading
import numpy as np
from collections import deque
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)


class DriftSeverity:
    NONE = "none"
    LOW = "low"        # finetune
    HIGH = "high"      # full retrain


class DriftResult:
    def __init__(self, severity: str, mae: float, threshold_finetune: float, threshold_retrain: float):
        self.severity = severity
        self.mae = mae
        self.threshold_finetune = threshold_finetune
        self.threshold_retrain = threshold_retrain

    def __repr__(self):
        return (
            f"DriftResult(severity={self.severity}, mae={self.mae:.4f}, "
            f"finetune_threshold={self.threshold_finetune}, "
            f"retrain_threshold={self.threshold_retrain})"
        )


class DriftMonitor:
    """
    Rolling MAE drift monitor for a single metric.

    Maintains a sliding window of (predicted, actual) pairs.
    On each check, computes MAE over the window and classifies
    drift severity against the configured thresholds.

    Severity rules:
        mae < finetune_threshold  -> NONE
        mae >= finetune_threshold -> LOW  (trigger finetune)
        mae >= retrain_threshold  -> HIGH (trigger full retrain)
    """

    def __init__(
        self,
        metric: str,
        finetune_threshold: float,
        retrain_threshold: float,
        window_size: int = 60,
    ):
        """
        metric              : metric name for logging
        finetune_threshold  : MAE above this triggers finetune
        retrain_threshold   : MAE above this triggers full retrain
        window_size         : number of recent predictions to compute MAE over
        """
        self.metric = metric
        self.finetune_threshold = finetune_threshold
        self.retrain_threshold = retrain_threshold
        self.window_size = window_size

        self._residuals: deque[float] = deque(maxlen=window_size)
        self._lock = threading.Lock()

    def record(self, predicted: float, actual: float) -> None:
        """
        Record a single predicted vs actual pair.
        Called by the emitter each time a new actual value arrives
        for a timestep that was previously predicted.
        """
        with self._lock:
            self._residuals.append(abs(predicted - actual))

    def record_many(self, predicted: np.ndarray, actual: np.ndarray) -> None:
        """
        Record a batch of predicted vs actual pairs.
        """
        with self._lock:
            for p, a in zip(predicted, actual):
                self._residuals.append(abs(float(p) - float(a)))

    def check(self) -> DriftResult:
        """
        Compute current MAE and classify drift severity.
        Returns DriftResult with severity, current MAE, and thresholds.
        """
        with self._lock:
            if not self._residuals:
                return DriftResult(
                    severity=DriftSeverity.NONE,
                    mae=0.0,
                    threshold_finetune=self.finetune_threshold,
                    threshold_retrain=self.retrain_threshold,
                )
            mae = float(np.mean(self._residuals))

        if mae >= self.retrain_threshold:
            severity = DriftSeverity.HIGH
        elif mae >= self.finetune_threshold:
            severity = DriftSeverity.LOW
        else:
            severity = DriftSeverity.NONE

        logger.debug(f"[{self.metric}] drift check: MAE={mae:.4f} severity={severity}")

        return DriftResult(
            severity=severity,
            mae=mae,
            threshold_finetune=self.finetune_threshold,
            threshold_retrain=self.retrain_threshold,
        )

    def current_mae(self) -> float:
        with self._lock:
            if not self._residuals:
                return 0.0
            return float(np.mean(self._residuals))

    def sample_count(self) -> int:
        with self._lock:
            return len(self._residuals)

    def reset(self) -> None:
        """
        Clear residual history. Called after a successful retrain
        so the new model starts with a clean drift slate.
        """
        with self._lock:
            self._residuals.clear()
        logger.debug(f"[{self.metric}] drift monitor reset")

    def __repr__(self) -> str:
        return (
            f"DriftMonitor(metric={self.metric}, "
            f"mae={self.current_mae():.4f}, "
            f"samples={self.sample_count()}/{self.window_size})"
        )


================================================
FILE: sentinel/pipeline/predictor.py
================================================
# sentinel/pipeline/predictor.py

import numpy as np
from datetime import datetime, timezone, timedelta
from typing import Optional
from sentinel.pipeline.registry import ModelRegistry
from sentinel.pipeline.models.base import PredictionResult
from sentinel.ingestor.buffer import MetricBuffer
from sentinel.ingestor.features import build_prediction_input
from sentinel.config import WatchConfig
from sentinel.utils.time import parse_duration_to_seconds
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)


class Predictor:
    """
    Runs inference for a single metric using the currently active model.

    Called on every emitter tick. Reads the latest window from the buffer,
    builds the prediction input, runs model.predict(), and returns a
    PredictionResult with absolute timestamps attached.

    Returns None if the model is not ready yet (cold start).
    """

    def __init__(
        self,
        watch_config: WatchConfig,
        buffer: MetricBuffer,
        registry: ModelRegistry,
    ):
        self.watch_config = watch_config
        self.buffer = buffer
        self.registry = registry

    def predict(self) -> Optional[PredictionResult]:
        """
        Run inference and return a PredictionResult with absolute timestamps.
        Returns None if the model is not ready or the buffer is insufficient.
        """
        if not self.registry.is_ready():
            logger.debug(f"[{self._metric_key()}] model not ready, skipping prediction")
            return None

        values = self.buffer.get_values()
        timestamps = self.buffer.get_timestamps()

        n_lags = self._n_lags()

        if len(values) < n_lags:
            logger.debug(
                f"[{self._metric_key()}] buffer has {len(values)} values, "
                f"need {n_lags} for prediction input"
            )
            return None

        try:
            X = build_prediction_input(
                values=values,
                lookback=self.watch_config.lookback,
                granularity=self.watch_config.granularity,
                timestamps=timestamps,
            )
        except ValueError as e:
            logger.error(f"[{self._metric_key()}] failed to build prediction input: {e}")
            return None

        model = self.registry.get_model()

        try:
            result = model.predict(X)
        except Exception as e:
            logger.error(f"[{self._metric_key()}] model.predict() failed: {e}")
            return None

        # attach absolute unix timestamps to each prediction step
        result.timestamps = self._build_timestamps(len(result.values))

        # attach active version id
        active = self.registry.active_version()
        if active:
            result.model_version = active.version_id

        return result

    def _build_timestamps(self, n_steps: int) -> np.ndarray:
        """
        Build absolute unix timestamps for each prediction step.
        Steps are offset from now by granularity increments.
        """
        now = datetime.now(timezone.utc).timestamp()
        granularity_secs = parse_duration_to_seconds(self.watch_config.granularity)
        return np.array([
            now + (i + 1) * granularity_secs
            for i in range(n_steps)
        ])

    def _n_lags(self) -> int:
        from sentinel.utils.time import parse_duration_to_steps
        return parse_duration_to_steps(
            self.watch_config.lookback,
            self.watch_config.granularity,
        )

    def _metric_key(self) -> str:
        cfg = self.watch_config
        if not cfg.labels:
            return cfg.metric
        label_str = ",".join(f'{k}="{v}"' for k, v in sorted(cfg.labels.items()))
        return f"{cfg.metric}{{{label_str}}}"


================================================
FILE: sentinel/pipeline/registry.py
================================================
# sentinel/pipeline/registry.py

import threading
from typing import Optional
from sentinel.pipeline.models.base import BaseModel
from sentinel.pipeline.versioning import VersionStore, ModelVersion
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)


class ModelRegistry:
    """
    Holds the live model reference for a single metric key.
    Handles atomic model swaps on promotion and rollback.

    Works with VersionStore for persistence and metadata.
    The registry owns the in-memory model object.
    VersionStore owns the on-disk metadata and artifacts.
    """

    def __init__(self, metric_key: str, version_store: VersionStore):
        self.metric_key = metric_key
        self.version_store = version_store

        self._model: Optional[BaseModel] = None
        self._active_version: Optional[ModelVersion] = None
        self._lock = threading.RLock()

    def get_model(self) -> Optional[BaseModel]:
        """
        Returns the currently active model instance.
        Returns None if no model has been promoted yet (cold start).
        """
        with self._lock:
            return self._model

    def promote(self, model: BaseModel, version: ModelVersion) -> None:
        """
        Atomically swap in a newly trained model.
        Saves the artifact, updates version store, and replaces
        the in-memory model reference in a single lock acquisition.

        Called by trainer.py after successful training + validation.
        """
        artifact_path = self.version_store.artifact_path_for(version.version_id)
        model.save(artifact_path)
        version.artifact_path = artifact_path

        with self._lock:
            self.version_store.register(version)
            self.version_store.promote(version.version_id)
            self._model = model
            self._active_version = version

        logger.info(
            f"[{self.metric_key}] promoted model version {version.version_id} "
            f"mae={version.mae:.4f}"
        )

    def rollback(self) -> bool:
        """
        Roll back to the previous stable version.
        Loads the previous model artifact from disk and swaps it in atomically.
        Returns True if rollback succeeded, False if no previous version exists.
        """
        previous = self.version_store.rollback()

        if previous is None:
            logger.warning(f"[{self.metric_key}] rollback failed â€” no previous version available")
            return False

        # reconstruct model instance from artifact
        model = self._load_model_from_version(previous)
        if model is None:
            return False

        with self._lock:
            self._model = model
            self._active_version = previous

        logger.info(f"[{self.metric_key}] rollback complete â€” now serving {previous.version_id}")
        return True

    def restore_from_disk(self, model_factory) -> bool:
        """
        On Sentinel startup, check if an active version exists on disk
        and restore it into memory so predictions resume without retraining.

        model_factory: callable that returns a new unfitted BaseModel instance
                       e.g. lambda: ExponentialSmoothingModel("1m", "5m", "30m")
        """
        active = self.version_store.get_active()

        if active is None:
            logger.info(f"[{self.metric_key}] no persisted model found, cold start required")
            return False

        model = model_factory()
        try:
            model.load(active.artifact_path)
        except Exception as e:
            logger.error(
                f"[{self.metric_key}] failed to load model from {active.artifact_path}: {e}"
            )
            return False

        with self._lock:
            self._model = model
            self._active_version = active

        logger.info(
            f"[{self.metric_key}] restored model version {active.version_id} from disk"
        )
        return True

    def active_version(self) -> Optional[ModelVersion]:
        with self._lock:
            return self._active_version

    def is_ready(self) -> bool:
        """
        Returns True if a fitted model is available for inference.
        """
        with self._lock:
            return self._model is not None and self._model.is_fitted

    def _load_model_from_version(self, version: ModelVersion) -> Optional[BaseModel]:
        """
        Reconstruct a model instance from a version record.
        Uses the model_class name stored in the version metadata
        to import and instantiate the correct class.
        """
        import importlib
        import sentinel.pipeline.models as models_module

        model_class = getattr(models_module, version.model_class, None)
        if model_class is None:
            logger.error(
                f"[{self.metric_key}] unknown model class '{version.model_class}' "
                f"in version {version.version_id}"
            )
            return None

        extra = version.extra or {}
        granularity = extra.get("granularity", "1m")
        horizon = extra.get("horizon", "5m")
        lookback = extra.get("lookback", "30m")

        model = model_class(granularity=granularity, horizon=horizon, lookback=lookback)
        try:
            model.load(version.artifact_path)
        except Exception as e:
            logger.error(
                f"[{self.metric_key}] failed to load artifact "
                f"{version.artifact_path}: {e}"
            )
            return None

        return model

    def __repr__(self) -> str:
        v = self._active_version
        return (
            f"ModelRegistry(metric={self.metric_key}, "
            f"ready={self.is_ready()}, "
            f"version={v.version_id if v else None})"
        )


================================================
FILE: sentinel/pipeline/scheduler.py
================================================
# sentinel/pipeline/scheduler.py

import threading
import time
from datetime import datetime, timezone
from typing import Callable
from sentinel.pipeline.drift import DriftMonitor, DriftSeverity
from sentinel.ingestor.buffer import MetricBuffer
from sentinel.config import WatchConfig
from sentinel.utils.cron import seconds_until_next_fire
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)

# how often the scheduler wakes up to check cron and drift
_TICK_INTERVAL_SECONDS = 10


class MetricScheduler:
    """
    Manages the training schedule for a single metric.

    Two triggers:
        1. Cron â€” fires on the configured cron schedule
        2. Drift â€” fires when DriftMonitor reports LOW or HIGH severity

    On each trigger, calls the provided train_fn with the appropriate
    drift severity so Trainer knows which policy to apply.

    Runs in a dedicated daemon thread per metric.
    """

    def __init__(
        self,
        watch_config: WatchConfig,
        buffer: MetricBuffer,
        drift_monitor: DriftMonitor,
        train_fn: Callable[[str, float], None],
    ):
        """
        watch_config  : config for this metric
        buffer        : buffer to check readiness before training
        drift_monitor : drift monitor to poll for severity
        train_fn      : callable(drift_severity, drift_score) -> None
                        provided by core.py, calls Trainer.run()
        """
        self.watch_config = watch_config
        self.buffer = buffer
        self.drift_monitor = drift_monitor
        self.train_fn = train_fn

        self._stop_event = threading.Event()
        self._thread: threading.Thread = None
        self._last_cron_fire: datetime = None
        self._cold_start_done: bool = False

    def start(self) -> None:
        self._thread = threading.Thread(
            target=self._loop,
            name=f"scheduler-{self.watch_config.metric}",
            daemon=True,
        )
        self._thread.start()
        logger.info(f"[{self._metric_key()}] scheduler started cron='{self.watch_config.cron}'")

    def stop(self) -> None:
        self._stop_event.set()
        if self._thread:
            self._thread.join(timeout=5)
        logger.info(f"[{self._metric_key()}] scheduler stopped")

    def _loop(self) -> None:
        while not self._stop_event.is_set():
            try:
                self._tick()
            except Exception as e:
                logger.error(f"[{self._metric_key()}] scheduler tick error: {e}")
            time.sleep(_TICK_INTERVAL_SECONDS)

    def _tick(self) -> None:
        now = datetime.now(timezone.utc)

        # gate everything on buffer readiness
        if not self.buffer.is_ready():
            pct = self.buffer.fill_fraction() * 100
            logger.debug(f"[{self._metric_key()}] buffer {pct:.0f}% full, waiting for cold start window")
            return

        # cold start â€” first training run once buffer is full
        if not self._cold_start_done:
            logger.info(f"[{self._metric_key()}] buffer ready, triggering cold start training")
            self._fire(DriftSeverity.NONE, 0.0)
            self._cold_start_done = True
            self._last_cron_fire = now
            return

        # drift check â€” higher priority than cron
        drift_result = self.drift_monitor.check()
        if drift_result.severity != DriftSeverity.NONE:
            logger.info(
                f"[{self._metric_key()}] drift detected severity={drift_result.severity} "
                f"mae={drift_result.mae:.4f}, triggering retrain"
            )
            self._fire(drift_result.severity, drift_result.mae)
            self.drift_monitor.reset()
            self._last_cron_fire = now
            return

        # cron check
        if self._cron_due(now):
            logger.info(f"[{self._metric_key()}] cron fired, triggering scheduled retrain")
            self._fire(DriftSeverity.NONE, 0.0)
            self._last_cron_fire = now

    def _fire(self, severity: str, drift_score: float) -> None:
        """
        Dispatch training job in a separate thread so the scheduler
        loop is never blocked by a long training run.
        """
        t = threading.Thread(
            target=self.train_fn,
            args=(severity, drift_score),
            name=f"trainer-{self.watch_config.metric}",
            daemon=True,
        )
        t.start()

    def _cron_due(self, now: datetime) -> bool:
        """
        Returns True if the cron schedule has fired since the last
        recorded fire time.
        """
        if self._last_cron_fire is None:
            return False
        try:
            secs = seconds_until_next_fire(self.watch_config.cron, base=self._last_cron_fire)
            elapsed = (now - self._last_cron_fire).total_seconds()
            return elapsed >= secs
        except Exception as e:
            logger.error(f"[{self._metric_key()}] cron check failed: {e}")
            return False

    def _metric_key(self) -> str:
        cfg = self.watch_config
        if not cfg.labels:
            return cfg.metric
        label_str = ",".join(f'{k}="{v}"' for k, v in sorted(cfg.labels.items()))
        return f"{cfg.metric}{{{label_str}}}"


================================================
FILE: sentinel/pipeline/trainer.py
================================================
# sentinel/pipeline/trainer.py

import threading
from datetime import datetime, timezone
from typing import Optional
from sentinel.pipeline.models.base import BaseModel, TrainingResult
from sentinel.pipeline.versioning import ModelVersion, VersionStore
from sentinel.pipeline.registry import ModelRegistry
from sentinel.pipeline.drift import DriftSeverity
from sentinel.ingestor.buffer import MetricBuffer
from sentinel.ingestor.features import build_feature_matrix
from sentinel.config import WatchConfig
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)

# fraction of buffer to hold out for validation
_HOLDOUT_FRACTION = 0.1


class Trainer:
    """
    Orchestrates training and finetuning for a single metric.

    On each training run:
        1. Pulls data from the MetricBuffer
        2. Builds feature matrix via features.py
        3. Splits into train/holdout
        4. Calls fit() or partial_fit() based on drift severity
        5. Validates on holdout
        6. Promotes new model via ModelRegistry if validation passes
        7. Resets drift monitor after successful promotion
    """

    def __init__(
        self,
        watch_config: WatchConfig,
        buffer: MetricBuffer,
        registry: ModelRegistry,
    ):
        self.watch_config = watch_config
        self.buffer = buffer
        self.registry = registry
        self._lock = threading.Lock()

    def run(
        self,
        drift_severity: str = DriftSeverity.NONE,
        drift_score: float = 0.0,
    ) -> Optional[TrainingResult]:
        """
        Execute a training run. Thread-safe â€” scheduler calls this
        from a background thread.

        drift_severity : determines whether to fit() or partial_fit()
        drift_score    : MAE at the time of the trigger, logged in version metadata

        Returns TrainingResult on success, None on failure.
        """
        with self._lock:
            return self._run_internal(drift_severity, drift_score)

    def _run_internal(
        self,
        drift_severity: str,
        drift_score: float,
    ) -> Optional[TrainingResult]:

        metric_key = self._metric_key()
        logger.info(
            f"[{metric_key}] training run started "
            f"severity={drift_severity} drift_score={drift_score:.4f}"
        )

        values = self.buffer.get_values()
        timestamps = self.buffer.get_timestamps()

        if len(values) < 2:
            logger.warning(f"[{metric_key}] not enough data to train, skipping")
            return None

        # build feature matrix
        try:
            X, y = build_feature_matrix(
                values=values,
                lookback=self.watch_config.lookback,
                granularity=self.watch_config.granularity,
                timestamps=timestamps,
            )
        except ValueError as e:
            logger.error(f"[{metric_key}] feature build failed: {e}")
            return None

        if len(X) < 4:
            logger.warning(f"[{metric_key}] too few samples after feature build ({len(X)}), skipping")
            return None

        # train/holdout split
        split = max(1, int(len(X) * (1 - _HOLDOUT_FRACTION)))
        X_train, y_train = X[:split], y[:split]
        X_val, y_val = X[split:], y[split:]

        # instantiate or reuse model
        current_model = self.registry.get_model()
        policy = self._determine_policy(drift_severity, current_model)

        model = self._get_or_create_model()

        try:
            if policy == "full_retrain" or not model.is_fitted:
                result = model.fit(X_train, y_train)
                result.training_policy = "full_retrain"
            else:
                result = model.partial_fit(X_train, y_train)
                result.training_policy = "finetune"
        except Exception as e:
            logger.error(f"[{metric_key}] model training failed: {e}")
            return None

        # validate on holdout
        if len(X_val) > 0:
            val_result = self._validate(model, X_val, y_val)
            if val_result is None:
                logger.warning(f"[{metric_key}] validation failed, not promoting model")
                return None
            result.mae = val_result["mae"]
            result.mape = val_result["mape"]

        # build version record
        version_id = self.registry.version_store.next_version_id()
        version = ModelVersion(
            version_id=version_id,
            metric_key=metric_key,
            model_class=type(model).__name__,
            trained_at=datetime.now(timezone.utc).isoformat(),
            training_policy=result.training_policy,
            drift_score_at_trigger=drift_score,
            mae=result.mae,
            mape=result.mape,
            n_samples=result.n_samples,
            artifact_path="",  # set by registry.promote()
            extra={
                "granularity": self.watch_config.granularity,
                "horizon": self.watch_config.horizon,
                "lookback": self.watch_config.lookback,
            }
        )

        self.registry.promote(model, version)

        logger.info(
            f"[{metric_key}] training complete â€” version={version_id} "
            f"policy={result.training_policy} mae={result.mae:.4f} mape={result.mape:.2f}%"
        )

        return result

    def _validate(self, model: BaseModel, X_val: "np.ndarray", y_val: "np.ndarray") -> Optional[dict]:
        import numpy as np
        try:
            pred_result = model.predict(X_val[0:1])
            # for validation we do single-step ahead comparison only
            y_pred = np.array([
                float(model.predict(X_val[i:i + 1]).values[0])
                for i in range(len(X_val))
            ])
            mae = float(np.mean(np.abs(y_val - y_pred)))
            mape = float(np.mean(np.abs((y_val - y_pred) / (y_val + 1e-8)))) * 100
            return {"mae": mae, "mape": mape}
        except Exception as e:
            logger.error(f"Validation error: {e}")
            return None

    def _get_or_create_model(self) -> BaseModel:
        """
        For full retrain, always create a fresh model instance.
        For finetune, return the existing model from registry.
        """
        existing = self.registry.get_model()
        if existing is not None:
            return existing

        cls = self.watch_config.model_class
        return cls(
            granularity=self.watch_config.granularity,
            horizon=self.watch_config.horizon,
            lookback=self.watch_config.lookback,
        )

    def _determine_policy(self, drift_severity: str, current_model: Optional[BaseModel]) -> str:
        if current_model is None or not current_model.is_fitted:
            return "full_retrain"
        if drift_severity == DriftSeverity.HIGH:
            return "full_retrain"
        if drift_severity == DriftSeverity.LOW:
            return "finetune"
        # cron-scheduled run with no drift â€” do a full retrain to keep fresh
        return "full_retrain"

    def _metric_key(self) -> str:
        cfg = self.watch_config
        if not cfg.labels:
            return cfg.metric
        label_str = ",".join(f'{k}="{v}"' for k, v in sorted(cfg.labels.items()))
        return f"{cfg.metric}{{{label_str}}}"


================================================
FILE: sentinel/pipeline/versioning.py
================================================
# sentinel/pipeline/versioning.py

import os
import json
import threading
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Optional
from sentinel.utils.logging import get_logger

logger = get_logger(__name__)


@dataclass
class ModelVersion:
    """
    Metadata record for a single trained model version.
    Stored as JSON alongside the serialized model artifact.
    """
    version_id: str
    metric_key: str
    model_class: str
    trained_at: str                   # ISO8601 UTC
    training_policy: str              # "full_retrain" or "finetune"
    drift_score_at_trigger: float
    mae: float
    mape: float
    n_samples: int
    artifact_path: str
    status: str = "active"            # "active", "retired", "rolled_back"
    extra: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, d: dict) -> "ModelVersion":
        return cls(**d)


class VersionStore:
    """
    Manages model version metadata for a single metric key.
    Persists version records as JSON files in the artifact store.
    Handles pruning old versions beyond max_versions.

    Directory layout:
        artifact_store/
            {metric_key}/
                versions.json          <- version index
                v1/
                    model.joblib
                v2/
                    model.joblib
                ...
    """

    def __init__(self, metric_key: str, artifact_store: str, max_versions: int = 5):
        self.metric_key = metric_key
        self.artifact_store = artifact_store
        self.max_versions = max_versions

        self._lock = threading.Lock()
        self._base_dir = os.path.join(artifact_store, self._sanitize_key(metric_key))
        self._index_path = os.path.join(self._base_dir, "versions.json")
        self._versions: list[ModelVersion] = []

        os.makedirs(self._base_dir, exist_ok=True)
        self._load_index()

    def next_version_id(self) -> str:
        with self._lock:
            n = len(self._versions) + 1
            return f"v{n}"

    def artifact_path_for(self, version_id: str) -> str:
        """
        Returns the file path where a model artifact for this version should be saved.
        Creates the directory if it doesn't exist.
        """
        version_dir = os.path.join(self._base_dir, version_id)
        os.makedirs(version_dir, exist_ok=True)
        return os.path.join(version_dir, "model.joblib")

    def register(self, version: ModelVersion) -> None:
        """
        Add a new version to the index and persist.
        Prunes oldest retired versions if over max_versions.
        """
        with self._lock:
            self._versions.append(version)
            self._prune()
            self._save_index()
        logger.info(
            f"[{self.metric_key}] registered version {version.version_id} "
            f"policy={version.training_policy} mae={version.mae:.4f}"
        )

    def get_active(self) -> Optional[ModelVersion]:
        """
        Returns the currently active version, or None if no active version exists.
        """
        with self._lock:
            active = [v for v in self._versions if v.status == "active"]
            return active[-1] if active else None

    def get_all(self) -> list[ModelVersion]:
        with self._lock:
            return list(self._versions)

    def get_by_id(self, version_id: str) -> Optional[ModelVersion]:
        with self._lock:
            for v in self._versions:
                if v.version_id == version_id:
                    return v
            return None

    def promote(self, version_id: str) -> None:
        """
        Set a version as active, retire all others.
        Called after successful training + validation.
        """
        with self._lock:
            for v in self._versions:
                if v.version_id == version_id:
                    v.status = "active"
                elif v.status == "active":
                    v.status = "retired"
            self._save_index()
        logger.info(f"[{self.metric_key}] promoted version {version_id} to active")

    def rollback(self) -> Optional[ModelVersion]:
        """
        Retire the current active version and restore the most recent
        previously active (non-rolled-back) version.
        Returns the restored version, or None if no previous version exists.
        """
        with self._lock:
            active = [v for v in self._versions if v.status == "active"]
            retired = [v for v in self._versions if v.status == "retired"]

            if not active:
                logger.warning(f"[{self.metric_key}] rollback called but no active version")
                return None

            if not retired:
                logger.warning(f"[{self.metric_key}] rollback called but no retired version to restore")
                return None

            current = active[-1]
            current.status = "rolled_back"

            previous = retired[-1]
            previous.status = "active"

            self._save_index()

        logger.info(
            f"[{self.metric_key}] rolled back from {current.version_id} "
            f"to {previous.version_id}"
        )
        return previous

    def _prune(self) -> None:
        """
        Remove oldest retired versions beyond max_versions.
        Active and rolled_back versions are never pruned.
        """
        retired = [v for v in self._versions if v.status == "retired"]
        if len(self._versions) <= self.max_versions:
            return

        to_prune = retired[:len(self._versions) - self.max_versions]
        for v in to_prune:
            self._versions.remove(v)
            artifact = v.artifact_path
            if os.path.exists(artifact):
                try:
                    os.remove(artifact)
                    version_dir = os.path.dirname(artifact)
                    if not os.listdir(version_dir):
                        os.rmdir(version_dir)
                except OSError as e:
                    logger.warning(f"[{self.metric_key}] failed to delete artifact {artifact}: {e}")
            logger.debug(f"[{self.metric_key}] pruned version {v.version_id}")

    def _save_index(self) -> None:
        data = [v.to_dict() for v in self._versions]
        with open(self._index_path, "w") as f:
            json.dump(data, f, indent=2)

    def _load_index(self) -> None:
        if not os.path.exists(self._index_path):
            self._versions = []
            return
        with open(self._index_path, "r") as f:
            data = json.load(f)
        self._versions = [ModelVersion.from_dict(d) for d in data]
        logger.debug(f"[{self.metric_key}] loaded {len(self._versions)} versions from index")

    @staticmethod
    def _sanitize_key(key: str) -> str:
        return key.replace("{", "").replace("}", "").replace(",", "_").replace('"', "").replace("=", "-")


================================================
FILE: sentinel/pipeline/models/__init__.py
================================================
# sentinel/pipeline/models/__init__.py

from sentinel.pipeline.models.base import BaseModel, PredictionResult, TrainingResult
from sentinel.pipeline.models.linear import LinearTrendModel
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel
from sentinel.pipeline.models.arima import ARIMAModel
from sentinel.pipeline.models.sgd import SGDRegressorModel

__all__ = [
    "BaseModel",
    "PredictionResult",
    "TrainingResult",
    "LinearTrendModel",
    "ExponentialSmoothingModel",
    "ARIMAModel",
    "SGDRegressorModel",
]


================================================
FILE: sentinel/pipeline/models/arima.py
================================================
# sentinel/pipeline/models/arima.py

import numpy as np
import joblib
from sentinel.pipeline.models.base import BaseModel, PredictionResult, TrainingResult
from sentinel.utils.time import parse_duration_to_steps


class ARIMAModel(BaseModel):
    """
    ARIMA(p, d, q) implemented without external dependencies.
    Uses simple AR(p) after differencing d times.
    MA(q) terms approximated via residual correction.
    Suitable for stationary or trend-stationary metrics.
    """

    def __init__(
        self,
        granularity: str,
        horizon: str,
        lookback: str,
        p: int = 3,
        d: int = 1,
        q: int = 1,
    ):
        super().__init__(granularity, horizon, lookback)
        self.p = p
        self.d = d
        self.q = q
        self._ar_coeffs: np.ndarray = None
        self._ma_coeffs: np.ndarray = None
        self._diff_init: list = []   # stores values needed to invert differencing
        self._residuals: np.ndarray = None
        self._horizon_steps = parse_duration_to_steps(horizon, granularity)

    def _difference(self, y: np.ndarray, d: int):
        diffs = [y.copy()]
        for _ in range(d):
            diffs.append(np.diff(diffs[-1]))
        return diffs

    def _invert_difference(self, forecast: np.ndarray, diffs: list) -> np.ndarray:
        result = forecast.copy()
        for orig in reversed(diffs[:-1]):
            result = np.cumsum(np.hstack([orig[-1], result]))
        return result

    def _fit_ar(self, y: np.ndarray) -> np.ndarray:
        n = len(y)
        if n <= self.p:
            return np.zeros(self.p)
        X = np.array([y[i:n - self.p + i] for i in range(self.p)]).T
        target = y[self.p:]
        coeffs, _, _, _ = np.linalg.lstsq(X, target, rcond=None)
        return coeffs

    def fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        diffs = self._difference(y, self.d)
        self._diff_init = diffs
        stationary = diffs[-1]

        self._ar_coeffs = self._fit_ar(stationary)

        # compute residuals for MA correction
        n = len(stationary)
        fitted = np.array([
            np.dot(self._ar_coeffs, stationary[i:i + self.p])
            for i in range(n - self.p)
        ])
        self._residuals = stationary[self.p:] - fitted

        # fit MA coefficients on residuals
        if self.q > 0 and len(self._residuals) > self.q:
            self._ma_coeffs = self._fit_ar(self._residuals)[:self.q]
        else:
            self._ma_coeffs = np.zeros(self.q)

        self._is_fitted = True

        mae = float(np.mean(np.abs(self._residuals)))
        mape = float(np.mean(np.abs(self._residuals / (stationary[self.p:] + 1e-8)))) * 100

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="full_retrain",
            extra={"p": self.p, "d": self.d, "q": self.q}
        )

    def partial_fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        if not self._is_fitted:
            return self.fit(X, y)
        # for ARIMA finetune we do a full refit on the new window
        # keeping p, d, q fixed
        return self.fit(X, y)

    def predict(self, X: np.ndarray) -> PredictionResult:
        if not self._is_fitted:
            raise RuntimeError("Model is not fitted yet.")

        diffs = self._diff_init
        stationary = diffs[-1].tolist()
        residuals = self._residuals.tolist() if self._residuals is not None else []

        forecasts = []
        for step in range(self._horizon_steps):
            ar_input = stationary[-self.p:]
            ar_val = np.dot(self._ar_coeffs, ar_input)

            ma_val = 0.0
            if self.q > 0 and len(residuals) >= self.q:
                ma_val = np.dot(self._ma_coeffs, residuals[-self.q:])

            val = ar_val + ma_val
            forecasts.append(val)
            stationary.append(val)
            residuals.append(0.0)  # future residuals unknown, assume zero

        forecast_arr = np.array(forecasts)
        forecast_arr = self._invert_difference(forecast_arr, diffs)

        timestamps = np.arange(self._horizon_steps, dtype=float)

        return PredictionResult(values=forecast_arr, timestamps=timestamps)

    def save(self, path: str) -> None:
        joblib.dump({
            "ar_coeffs": self._ar_coeffs,
            "ma_coeffs": self._ma_coeffs,
            "diff_init": self._diff_init,
            "residuals": self._residuals,
            "p": self.p,
            "d": self.d,
            "q": self.q,
            "horizon_steps": self._horizon_steps,
        }, path)

    def load(self, path: str) -> None:
        state = joblib.load(path)
        self._ar_coeffs = state["ar_coeffs"]
        self._ma_coeffs = state["ma_coeffs"]
        self._diff_init = state["diff_init"]
        self._residuals = state["residuals"]
        self.p = state["p"]
        self.d = state["d"]
        self.q = state["q"]
        self._horizon_steps = state["horizon_steps"]
        self._is_fitted = True


================================================
FILE: sentinel/pipeline/models/base.py
================================================
# sentinel/pipeline/models/base.py

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
import numpy as np


@dataclass
class PredictionResult:
    values: np.ndarray
    timestamps: np.ndarray
    lower_bound: Optional[np.ndarray] = None
    upper_bound: Optional[np.ndarray] = None
    model_version: Optional[str] = None


@dataclass
class TrainingResult:
    mae: float
    mape: float
    n_samples: int
    training_policy: str  # "full_retrain" or "finetune"
    extra: dict = field(default_factory=dict)


class BaseModel(ABC):

    def __init__(self, granularity: str, horizon: str, lookback: str):
        self.granularity = granularity
        self.horizon = horizon
        self.lookback = lookback
        self._is_fitted = False

    @abstractmethod
    def fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult: ...

    @abstractmethod
    def partial_fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult: ...

    @abstractmethod
    def predict(self, X: np.ndarray) -> PredictionResult: ...

    @abstractmethod
    def save(self, path: str) -> None: ...

    @abstractmethod
    def load(self, path: str) -> None: ...

    @property
    def is_fitted(self) -> bool:
        return self._is_fitted

    def __repr__(self) -> str:
        return (
            f"{self.__class__.__name__}("
            f"granularity={self.granularity}, "
            f"horizon={self.horizon}, "
            f"lookback={self.lookback}, "
            f"fitted={self._is_fitted})"
        )


================================================
FILE: sentinel/pipeline/models/linear.py
================================================
# sentinel/pipeline/models/linear.py

import numpy as np
import joblib
from dataclasses import dataclass
from sentinel.pipeline.models.base import BaseModel, PredictionResult, TrainingResult
from sentinel.utils.time import parse_duration_to_steps


class LinearTrendModel(BaseModel):
    """
    Simple linear regression model using least squares.
    Best for metrics with a clear linear trend and no seasonality.
    Fastest model in the pack, good baseline.
    """

    def __init__(self, granularity: str, horizon: str, lookback: str):
        super().__init__(granularity, horizon, lookback)
        self._coefficients: np.ndarray = None
        self._intercept: float = 0.0
        self._horizon_steps = parse_duration_to_steps(horizon, granularity)

    def fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        X_bias = np.hstack([np.ones((X.shape[0], 1)), X])
        coeffs, _, _, _ = np.linalg.lstsq(X_bias, y, rcond=None)
        self._intercept = coeffs[0]
        self._coefficients = coeffs[1:]
        self._is_fitted = True

        y_pred = X_bias @ coeffs
        mae = float(np.mean(np.abs(y - y_pred)))
        mape = float(np.mean(np.abs((y - y_pred) / (y + 1e-8)))) * 100

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="full_retrain"
        )

    def partial_fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        if not self._is_fitted:
            return self.fit(X, y)

        # incremental least squares update using new data only
        X_bias = np.hstack([np.ones((X.shape[0], 1)), X])
        coeffs, _, _, _ = np.linalg.lstsq(X_bias, y, rcond=None)
        # blend old and new coefficients 70/30
        new_coeffs = coeffs[1:]
        self._coefficients = 0.7 * self._coefficients + 0.3 * new_coeffs
        self._intercept = 0.7 * self._intercept + 0.3 * coeffs[0]

        y_pred = X_bias @ np.hstack([self._intercept, self._coefficients])
        mae = float(np.mean(np.abs(y - y_pred)))
        mape = float(np.mean(np.abs((y - y_pred) / (y + 1e-8)))) * 100

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="finetune"
        )

    def predict(self, X: np.ndarray) -> PredictionResult:
        if not self._is_fitted:
            raise RuntimeError("Model is not fitted yet.")

        values = []
        current = X.copy()

        for _ in range(self._horizon_steps):
            val = self._intercept + current @ self._coefficients
            values.append(float(val))
            # roll window forward
            current = np.roll(current, -1)
            current[0, -1] = val

        values = np.array(values)
        timestamps = np.arange(self._horizon_steps, dtype=float)

        return PredictionResult(values=values, timestamps=timestamps)

    def save(self, path: str) -> None:
        joblib.dump({
            "coefficients": self._coefficients,
            "intercept": self._intercept,
            "horizon_steps": self._horizon_steps,
        }, path)

    def load(self, path: str) -> None:
        state = joblib.load(path)
        self._coefficients = state["coefficients"]
        self._intercept = state["intercept"]
        self._horizon_steps = state["horizon_steps"]
        self._is_fitted = True


================================================
FILE: sentinel/pipeline/models/sgd.py
================================================
# sentinel/pipeline/models/sgd.py

import numpy as np
import joblib
from sentinel.pipeline.models.base import BaseModel, PredictionResult, TrainingResult
from sentinel.utils.time import parse_duration_to_steps


class SGDRegressorModel(BaseModel):
    """
    Online learning model using stochastic gradient descent.
    Best for metrics that evolve continuously and benefit from
    incremental updates. Most finetune-friendly model in the pack.
    Implements linear regression with SGD updates manually to
    avoid sklearn dependency at the core.
    """

    def __init__(
        self,
        granularity: str,
        horizon: str,
        lookback: str,
        learning_rate: float = 0.01,
        epochs: int = 10,
        l2: float = 1e-4,
    ):
        super().__init__(granularity, horizon, lookback)
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.l2 = l2
        self._weights: np.ndarray = None
        self._bias: float = 0.0
        self._horizon_steps = parse_duration_to_steps(horizon, granularity)

    def _init_weights(self, n_features: int):
        self._weights = np.zeros(n_features)
        self._bias = 0.0

    def _sgd_update(self, X: np.ndarray, y: np.ndarray):
        n = len(y)
        for _ in range(self.epochs):
            indices = np.random.permutation(n)
            for i in indices:
                xi = X[i]
                yi = y[i]
                pred = np.dot(self._weights, xi) + self._bias
                error = pred - yi
                self._weights -= self.learning_rate * (error * xi + self.l2 * self._weights)
                self._bias -= self.learning_rate * error

    def fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        self._init_weights(X.shape[1])
        self._sgd_update(X, y)
        self._is_fitted = True

        y_pred = X @ self._weights + self._bias
        mae = float(np.mean(np.abs(y - y_pred)))
        mape = float(np.mean(np.abs((y - y_pred) / (y + 1e-8)))) * 100

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="full_retrain",
            extra={"learning_rate": self.learning_rate, "epochs": self.epochs}
        )

    def partial_fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        if not self._is_fitted:
            return self.fit(X, y)

        # continue SGD from current weights without reinitializing
        self._sgd_update(X, y)

        y_pred = X @ self._weights + self._bias
        mae = float(np.mean(np.abs(y - y_pred)))
        mape = float(np.mean(np.abs((y - y_pred) / (y + 1e-8)))) * 100

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="finetune"
        )

    def predict(self, X: np.ndarray) -> PredictionResult:
        if not self._is_fitted:
            raise RuntimeError("Model is not fitted yet.")

        values = []
        current = X.copy()

        for _ in range(self._horizon_steps):
            val = float(np.dot(self._weights, current[0]) + self._bias)
            values.append(val)
            current = np.roll(current, -1)
            current[0, -1] = val

        values = np.array(values)
        timestamps = np.arange(self._horizon_steps, dtype=float)

        return PredictionResult(values=values, timestamps=timestamps)

    def save(self, path: str) -> None:
        joblib.dump({
            "weights": self._weights,
            "bias": self._bias,
            "learning_rate": self.learning_rate,
            "epochs": self.epochs,
            "l2": self.l2,
            "horizon_steps": self._horizon_steps,
        }, path)

    def load(self, path: str) -> None:
        state = joblib.load(path)
        self._weights = state["weights"]
        self._bias = state["bias"]
        self.learning_rate = state["learning_rate"]
        self.epochs = state["epochs"]
        self.l2 = state["l2"]
        self._horizon_steps = state["horizon_steps"]
        self._is_fitted = True


================================================
FILE: sentinel/pipeline/models/smoothing.py
================================================
# sentinel/pipeline/models/smoothing.py

import numpy as np
import joblib
from sentinel.pipeline.models.base import BaseModel, PredictionResult, TrainingResult
from sentinel.utils.time import parse_duration_to_steps


class ExponentialSmoothingModel(BaseModel):
    """
    Double exponential smoothing (Holt's method).
    Handles level and trend. Good for metrics with a trend
    but no strong seasonality. Lightweight and interpretable.
    """

    def __init__(
        self,
        granularity: str,
        horizon: str,
        lookback: str,
        alpha: float = 0.3,
        beta: float = 0.1,
    ):
        super().__init__(granularity, horizon, lookback)
        self.alpha = alpha  # level smoothing factor
        self.beta = beta    # trend smoothing factor
        self._level: float = 0.0
        self._trend: float = 0.0
        self._horizon_steps = parse_duration_to_steps(horizon, granularity)

    def _holt_fit(self, y: np.ndarray):
        level = y[0]
        trend = y[1] - y[0]
        for val in y[1:]:
            prev_level = level
            level = self.alpha * val + (1 - self.alpha) * (level + trend)
            trend = self.beta * (level - prev_level) + (1 - self.beta) * trend
        self._level = level
        self._trend = trend

    def _compute_fitted_values(self, y: np.ndarray) -> np.ndarray:
        fitted = []
        level = y[0]
        trend = y[1] - y[0]
        for val in y[1:]:
            fitted.append(level + trend)
            prev_level = level
            level = self.alpha * val + (1 - self.alpha) * (level + trend)
            trend = self.beta * (level - prev_level) + (1 - self.beta) * trend
        return np.array(fitted)

    def fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        self._holt_fit(y)
        self._is_fitted = True

        fitted = self._compute_fitted_values(y)
        residuals = y[1:] - fitted
        mae = float(np.mean(np.abs(residuals)))
        mape = float(np.mean(np.abs(residuals / (y[1:] + 1e-8)))) * 100

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="full_retrain",
            extra={"alpha": self.alpha, "beta": self.beta}
        )

    def partial_fit(self, X: np.ndarray, y: np.ndarray) -> TrainingResult:
        if not self._is_fitted:
            return self.fit(X, y)

        # update level and trend on recent data only
        for val in y:
            prev_level = self._level
            self._level = self.alpha * val + (1 - self.alpha) * (self._level + self._trend)
            self._trend = self.beta * (self._level - prev_level) + (1 - self.beta) * self._trend

        fitted = self._compute_fitted_values(y)
        residuals = y[1:] - fitted if len(y) > 1 else np.array([0.0])
        mae = float(np.mean(np.abs(residuals)))
        mape = float(np.mean(np.abs(residuals / (y[1:] + 1e-8)))) * 100 if len(y) > 1 else 0.0

        return TrainingResult(
            mae=mae,
            mape=mape,
            n_samples=len(y),
            training_policy="finetune"
        )

    def predict(self, X: np.ndarray) -> PredictionResult:
        if not self._is_fitted:
            raise RuntimeError("Model is not fitted yet.")

        values = []
        for step in range(1, self._horizon_steps + 1):
            values.append(self._level + step * self._trend)

        values = np.array(values)
        timestamps = np.arange(self._horizon_steps, dtype=float)

        return PredictionResult(values=values, timestamps=timestamps)

    def save(self, path: str) -> None:
        joblib.dump({
            "level": self._level,
            "trend": self._trend,
            "alpha": self.alpha,
            "beta": self.beta,
            "horizon_steps": self._horizon_steps,
        }, path)

    def load(self, path: str) -> None:
        state = joblib.load(path)
        self._level = state["level"]
        self._trend = state["trend"]
        self.alpha = state["alpha"]
        self.beta = state["beta"]
        self._horizon_steps = state["horizon_steps"]
        self._is_fitted = True


================================================
FILE: sentinel/utils/__init__.py
================================================
# sentinel/utils/__init__.py

from sentinel.utils.time import (
    parse_duration_to_seconds,
    parse_duration_to_steps,
    steps_to_timedeltas,
    align_timestamp_to_granularity,
)
from sentinel.utils.cron import (
    is_valid_cron,
    get_next_fire_time,
    get_previous_fire_time,
    seconds_until_next_fire,
    seconds_since_last_fire,
    get_fire_times_between,
)
from sentinel.utils.logging import get_logger, configure_logging
from sentinel.utils.validation import (
    ConfigValidationError,
    validate_watch_config,
    validate_sentinel_config,
)

__all__ = [
    "parse_duration_to_seconds",
    "parse_duration_to_steps",
    "steps_to_timedeltas",
    "align_timestamp_to_granularity",
    "is_valid_cron",
    "get_next_fire_time",
    "get_previous_fire_time",
    "seconds_until_next_fire",
    "seconds_since_last_fire",
    "get_fire_times_between",
    "get_logger",
    "configure_logging",
    "ConfigValidationError",
    "validate_watch_config",
    "validate_sentinel_config",
]


================================================
FILE: sentinel/utils/cron.py
================================================
# sentinel/utils/cron.py

from croniter import croniter
from datetime import datetime, timezone


def is_valid_cron(cron_string: str) -> bool:
    """
    Returns True if the cron string is valid, False otherwise.
    """
    return croniter.is_valid(cron_string)


def get_next_fire_time(cron_string: str, base: datetime = None) -> datetime:
    """
    Get the next datetime this cron expression will fire.

    base: datetime to compute from, defaults to now UTC.
    """
    if not is_valid_cron(cron_string):
        raise ValueError(f"Invalid cron string: '{cron_string}'")

    base = base or datetime.now(timezone.utc)
    cron = croniter(cron_string, base)
    return cron.get_next(datetime)


def get_previous_fire_time(cron_string: str, base: datetime = None) -> datetime:
    """
    Get the most recent datetime this cron expression fired before base.

    Useful for computing the cold start window â€” how long ago did the
    last scheduled training run happen?
    """
    if not is_valid_cron(cron_string):
        raise ValueError(f"Invalid cron string: '{cron_string}'")

    base = base or datetime.now(timezone.utc)
    cron = croniter(cron_string, base)
    return cron.get_prev(datetime)


def seconds_until_next_fire(cron_string: str, base: datetime = None) -> float:
    """
    Returns how many seconds until the next cron fire time.
    """
    base = base or datetime.now(timezone.utc)
    next_fire = get_next_fire_time(cron_string, base)
    return (next_fire - base).total_seconds()


def seconds_since_last_fire(cron_string: str, base: datetime = None) -> float:
    """
    Returns how many seconds have elapsed since the last cron fire time.
    Used by the scheduler to determine if cold start window has elapsed.
    """
    base = base or datetime.now(timezone.utc)
    prev_fire = get_previous_fire_time(cron_string, base)
    return (base - prev_fire).total_seconds()


def get_fire_times_between(
    cron_string: str,
    start: datetime,
    end: datetime,
) -> list[datetime]:
    """
    Returns all fire times between start and end (inclusive of start).
    Useful for backfilling or auditing missed training runs.
    """
    if not is_valid_cron(cron_string):
        raise ValueError(f"Invalid cron string: '{cron_string}'")

    cron = croniter(cron_string, start)
    times = []
    while True:
        fire = cron.get_next(datetime)
        if fire > end:
            break
        times.append(fire)
    return times


================================================
FILE: sentinel/utils/logging.py
================================================
# sentinel/utils/logging.py

import logging
import sys
from typing import Optional


_SENTINEL_LOGGER_NAME = "sentinel"


def get_logger(name: Optional[str] = None) -> logging.Logger:
    """
    Returns a logger namespaced under 'sentinel'.

    Usage:
        logger = get_logger(__name__)
        logger.info("Scraper started")
    """
    logger_name = f"{_SENTINEL_LOGGER_NAME}.{name}" if name else _SENTINEL_LOGGER_NAME
    return logging.getLogger(logger_name)


def configure_logging(level: str = "INFO") -> None:
    """
    Configure the root sentinel logger with a clean formatter.
    Should be called once at startup in core.py.

    level: "DEBUG", "INFO", "WARNING", "ERROR"
    """
    numeric_level = getattr(logging, level.upper(), logging.INFO)

    logger = logging.getLogger(_SENTINEL_LOGGER_NAME)
    logger.setLevel(numeric_level)

    if logger.handlers:
        return  # already configured, don't add duplicate handlers

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(numeric_level)

    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%S",
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.propagate = False


================================================
FILE: sentinel/utils/time.py
================================================
# sentinel/utils/time.py

import re
from datetime import timedelta


# maps unit suffix to seconds
_UNIT_TO_SECONDS: dict[str, int] = {
    "s": 1,
    "m": 60,
    "h": 3600,
    "d": 86400,
    "w": 604800,
}


def parse_duration_to_seconds(duration: str) -> int:
    """
    Parse a human-readable duration string into total seconds.

    Examples:
        "30s" -> 30
        "5m"  -> 300
        "1h"  -> 3600
        "2d"  -> 172800
    """
    duration = duration.strip().lower()
    match = re.fullmatch(r"(\d+)([smhdw])", duration)
    if not match:
        raise ValueError(
            f"Invalid duration format: '{duration}'. "
            f"Expected a number followed by s, m, h, d, or w. e.g. '5m', '1h', '30s'."
        )
    value = int(match.group(1))
    unit = match.group(2)
    return value * _UNIT_TO_SECONDS[unit]


def parse_duration_to_steps(duration: str, granularity: str) -> int:
    """
    Compute how many granularity-sized steps fit in duration.

    Examples:
        parse_duration_to_steps("30m", "1m")  -> 30
        parse_duration_to_steps("1h",  "5m")  -> 12
        parse_duration_to_steps("1h",  "30s") -> 120

    Raises ValueError if duration is not evenly divisible by granularity.
    """
    duration_secs = parse_duration_to_seconds(duration)
    granularity_secs = parse_duration_to_seconds(granularity)

    if granularity_secs == 0:
        raise ValueError("Granularity must be greater than zero.")

    if duration_secs % granularity_secs != 0:
        raise ValueError(
            f"Duration '{duration}' ({duration_secs}s) is not evenly divisible "
            f"by granularity '{granularity}' ({granularity_secs}s)."
        )

    return duration_secs // granularity_secs


def steps_to_timedeltas(
    n_steps: int,
    granularity: str,
    start_offset_seconds: int = 0,
) -> list[timedelta]:
    """
    Generate a list of timedeltas representing future prediction timestamps.

    start_offset_seconds: offset from now to start at, default 0 (immediate next step).

    Example:
        steps_to_timedeltas(3, "5m") ->
            [timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15)]
    """
    granularity_secs = parse_duration_to_seconds(granularity)
    return [
        timedelta(seconds=start_offset_seconds + (i + 1) * granularity_secs)
        for i in range(n_steps)
    ]


def align_timestamp_to_granularity(timestamp: float, granularity: str) -> float:
    """
    Floor a unix timestamp to the nearest granularity boundary.

    Example:
        align_timestamp_to_granularity(1700000123.0, "5m") -> 1700000100.0
    """
    granularity_secs = parse_duration_to_seconds(granularity)
    return float(int(timestamp) // granularity_secs * granularity_secs)


================================================
FILE: sentinel/utils/validation.py
================================================
# sentinel/utils/validation.py

from sentinel.utils.time import parse_duration_to_seconds, parse_duration_to_steps
from sentinel.utils.cron import is_valid_cron


class ConfigValidationError(Exception):
    pass


def validate_duration(value: str, field_name: str) -> None:
    """
    Raises ConfigValidationError if value is not a valid duration string.
    """
    try:
        parse_duration_to_seconds(value)
    except ValueError as e:
        raise ConfigValidationError(f"Invalid duration for '{field_name}': {e}")


def validate_cron(value: str, field_name: str) -> None:
    """
    Raises ConfigValidationError if value is not a valid cron string.
    """
    if not is_valid_cron(value):
        raise ConfigValidationError(
            f"Invalid cron string for '{field_name}': '{value}'. "
            f"Expected standard 5-field cron e.g. '0 */6 * * *'."
        )


def validate_horizon_divisible_by_granularity(horizon: str, granularity: str) -> None:
    """
    Raises ConfigValidationError if horizon is not evenly divisible by granularity.
    """
    try:
        parse_duration_to_steps(horizon, granularity)
    except ValueError as e:
        raise ConfigValidationError(str(e))


def validate_lookback_greater_than_horizon(lookback: str, horizon: str) -> None:
    """
    Raises ConfigValidationError if lookback <= horizon.
    A lookback window must be larger than the prediction horizon.
    """
    lookback_secs = parse_duration_to_seconds(lookback)
    horizon_secs = parse_duration_to_seconds(horizon)
    if lookback_secs <= horizon_secs:
        raise ConfigValidationError(
            f"lookback '{lookback}' must be greater than horizon '{horizon}'."
        )


def validate_thresholds(finetune: float, retrain: float) -> None:
    """
    Raises ConfigValidationError if drift thresholds are misconfigured.
    finetune threshold must be less than retrain threshold.
    Both must be positive.
    """
    if finetune <= 0 or retrain <= 0:
        raise ConfigValidationError(
            "Drift thresholds must be positive floats."
        )
    if finetune >= retrain:
        raise ConfigValidationError(
            f"drift_finetune_threshold ({finetune}) must be less than "
            f"drift_retrain_threshold ({retrain})."
        )


def validate_watch_config(watch) -> None:
    """
    Full validation pass on a WatchConfig instance.
    Raises ConfigValidationError on first failure found.
    """
    if not watch.metric or not isinstance(watch.metric, str):
        raise ConfigValidationError("WatchConfig.metric must be a non-empty string.")

    if watch.model_class is None:
        raise ConfigValidationError(
            f"WatchConfig for '{watch.metric}' has no model_class set."
        )

    validate_cron(watch.cron, "cron")
    validate_duration(watch.granularity, "granularity")
    validate_duration(watch.horizon, "horizon")
    validate_duration(watch.lookback, "lookback")
    validate_horizon_divisible_by_granularity(watch.horizon, watch.granularity)
    validate_lookback_greater_than_horizon(watch.lookback, watch.horizon)
    validate_thresholds(watch.drift_finetune_threshold, watch.drift_retrain_threshold)


def validate_sentinel_config(config) -> None:
    """
    Full validation pass on a SentinelConfig instance.
    """
    if not config.prometheus_url:
        raise ConfigValidationError("SentinelConfig.prometheus_url must not be empty.")

    if not (1 <= config.emitter_port <= 65535):
        raise ConfigValidationError(
            f"emitter_port {config.emitter_port} is not a valid port number."
        )

    if config.max_versions_per_metric < 1:
        raise ConfigValidationError(
            "max_versions_per_metric must be at least 1."
        )

    if not config.watches:
        raise ConfigValidationError(
            "SentinelConfig.watches is empty â€” nothing to watch."
        )

    for watch in config.watches:
        validate_watch_config(watch)


================================================
FILE: tests/__init__.py
================================================
[Empty file]


================================================
FILE: tests/test_emitter/__init__.py
================================================
[Empty file]


================================================
FILE: tests/test_emitter/test_formatter.py
================================================
# tests/test_emitter/test_formatter.py

import pytest
import numpy as np
from sentinel.emitter.formatter import (
    format_prediction,
    build_metric_key,
    FormattedPrediction,
)
from sentinel.pipeline.models.base import PredictionResult
from sentinel.config import WatchConfig
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel


def _make_watch(labels=None):
    return WatchConfig(
        metric="http_request_duration_seconds",
        labels=labels or {"job": "api"},
        model_class=ExponentialSmoothingModel,
        granularity="1m",
        horizon="5m",
        lookback="30m",
    )


def _make_result(n_steps=5, with_bounds=False):
    values = np.linspace(1.0, 2.0, n_steps)
    timestamps = np.array([1700000060.0 * (i + 1) for i in range(n_steps)])
    lower = values - 0.1 if with_bounds else None
    upper = values + 0.1 if with_bounds else None
    return PredictionResult(
        values=values,
        timestamps=timestamps,
        lower_bound=lower,
        upper_bound=upper,
        model_version="v1",
    )


class TestFormatPrediction:

    def test_returns_one_formatted_prediction_by_default(self):
        watch = _make_watch()
        result = _make_result()
        formatted = format_prediction(watch, result, emit_confidence_bounds=False)
        assert len(formatted) == 1

    def test_returns_three_with_confidence_bounds(self):
        watch = _make_watch()
        result = _make_result(with_bounds=True)
        formatted = format_prediction(watch, result, emit_confidence_bounds=True)
        assert len(formatted) == 3

    def test_metric_name_has_suffix(self):
        watch = _make_watch()
        result = _make_result()
        formatted = format_prediction(watch, result)
        assert formatted[0].metric_name == "http_request_duration_seconds_sentinel_predicted"

    def test_steps_count_matches_horizon(self):
        watch = _make_watch()
        result = _make_result(n_steps=5)
        formatted = format_prediction(watch, result)
        assert len(formatted[0].steps) == 5

    def test_steps_are_one_indexed(self):
        watch = _make_watch()
        result = _make_result(n_steps=3)
        formatted = format_prediction(watch, result)
        steps = [s["step"] for s in formatted[0].steps]
        assert steps == [1, 2, 3]

    def test_labels_carry_original_labels(self):
        watch = _make_watch(labels={"job": "api"})
        result = _make_result()
        formatted = format_prediction(watch, result)
        assert formatted[0].labels["job"] == "api"

    def test_labels_include_sentinel_metadata(self):
        watch = _make_watch()
        result = _make_result()
        formatted = format_prediction(watch, result)
        assert "sentinel_horizon" in formatted[0].labels
        assert "sentinel_version" in formatted[0].labels

    def test_sentinel_version_matches_result(self):
        watch = _make_watch()
        result = _make_result()
        result.model_version = "v3"
        formatted = format_prediction(watch, result)
        assert formatted[0].labels["sentinel_version"] == "v3"

    def test_no_bounds_emitted_when_result_has_no_bounds(self):
        watch = _make_watch()
        result = _make_result(with_bounds=False)
        formatted = format_prediction(watch, result, emit_confidence_bounds=True)
        assert len(formatted) == 1

    def test_unknown_version_when_model_version_is_none(self):
        watch = _make_watch()
        result = _make_result()
        result.model_version = None
        formatted = format_prediction(watch, result)
        assert formatted[0].labels["sentinel_version"] == "unknown"


class TestBuildMetricKey:

    def test_no_labels(self):
        assert build_metric_key("my_metric", {}) == "my_metric"

    def test_with_labels(self):
        key = build_metric_key("my_metric", {"job": "api", "env": "prod"})
        assert key == 'my_metric{env="prod",job="api"}'

    def test_labels_sorted(self):
        key1 = build_metric_key("m", {"b": "2", "a": "1"})
        key2 = build_metric_key("m", {"a": "1", "b": "2"})
        assert key1 == key2


================================================
FILE: tests/test_emitter/test_server.py
================================================
# tests/test_emitter/test_server.py

import time
import pytest
import numpy as np
from unittest.mock import MagicMock, patch, PropertyMock
from prometheus_client import CollectorRegistry
from sentinel.emitter.server import MetricEmitter, EmitterServer
from sentinel.pipeline.models.base import PredictionResult
from sentinel.pipeline.drift import DriftMonitor
from sentinel.config import WatchConfig, SentinelConfig
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel


def _make_watch():
    return WatchConfig(
        metric="http_request_duration_seconds",
        labels={"job": "api"},
        model_class=ExponentialSmoothingModel,
        granularity="1m",
        horizon="5m",
        lookback="30m",
    )


def _make_prediction(n_steps=5):
    return PredictionResult(
        values=np.linspace(1.0, 2.0, n_steps),
        timestamps=np.array([time.time() + (i + 1) * 60 for i in range(n_steps)]),
        model_version="v1",
    )


def _make_past_prediction(n_steps=5):
    """Prediction whose timestamps have already elapsed â€” triggers drift recording."""
    return PredictionResult(
        values=np.linspace(1.0, 2.0, n_steps),
        timestamps=np.array([time.time() - (i + 1) * 60 for i in range(n_steps)]),
        model_version="v1",
    )


class TestMetricEmitter:

    def _make_emitter(self, prediction=None, registry=None):
        watch = _make_watch()
        predictor = MagicMock()
        predictor.predict.return_value = prediction or _make_prediction()

        drift_monitor = MagicMock(spec=DriftMonitor)
        scraper = MagicMock()
        scraper.fetch_latest.return_value = (time.time(), 1.5)

        reg = registry or CollectorRegistry()

        return MetricEmitter(
            watch_config=watch,
            predictor=predictor,
            drift_monitor=drift_monitor,
            scraper=scraper,
            emit_confidence_bounds=False,
            registry=reg,
        ), predictor, drift_monitor, scraper

    def test_tick_calls_predictor(self):
        emitter, predictor, _, _ = self._make_emitter()
        emitter.tick()
        predictor.predict.assert_called_once()

    def test_tick_does_nothing_when_prediction_is_none(self):
        emitter, predictor, drift_monitor, _ = self._make_emitter(prediction=None)
        predictor.predict.return_value = None
        emitter.tick()
        # no gauges created, no drift recorded
        assert len(emitter._gauges) == 0

    def test_tick_creates_gauges_for_each_step(self):
        reg = CollectorRegistry()
        emitter, _, _, _ = self._make_emitter(registry=reg)
        emitter.tick()
        # 5 steps = 5 gauges
        assert len(emitter._gauges) == 5

    def test_tick_sets_gauge_values(self):
        reg = CollectorRegistry()
        emitter, _, _, _ = self._make_emitter(registry=reg)
        emitter.tick()
        # gauges created without raising
        assert len(emitter._gauges) > 0

    def test_tick_stores_pending_predictions(self):
        emitter, _, _, _ = self._make_emitter()
        emitter.tick()
        assert len(emitter._pending_predictions) == 5

    def test_tick_feeds_drift_when_predictions_elapsed(self):
        emitter, predictor, drift_monitor, scraper = self._make_emitter()

        # first tick stores pending predictions with past timestamps
        predictor.predict.return_value = _make_past_prediction()
        emitter.tick()

        # second tick should detect elapsed timestamps and feed drift
        predictor.predict.return_value = _make_prediction()
        emitter.tick()

        drift_monitor.record.assert_called()

    def test_drift_not_recorded_when_actual_fetch_fails(self):
        emitter, predictor, drift_monitor, scraper = self._make_emitter()

        predictor.predict.return_value = _make_past_prediction()
        scraper.fetch_latest.return_value = None

        emitter.tick()
        emitter.tick()

        drift_monitor.record.assert_not_called()

    def test_confidence_bounds_not_emitted_by_default(self):
        reg = CollectorRegistry()
        emitter, _, _, _ = self._make_emitter(registry=reg)

        result = _make_prediction()
        result.lower_bound = np.ones(5) * 0.9
        result.upper_bound = np.ones(5) * 1.1
        emitter.predictor.predict.return_value = result

        emitter.tick()
        # only main series gauges â€” no bound gauges
        assert all("lower" not in k and "upper" not in k for k in emitter._gauges)

    def test_confidence_bounds_emitted_when_enabled(self):
        watch = _make_watch()
        predictor = MagicMock()
        result = _make_prediction()
        result.lower_bound = np.ones(5) * 0.9
        result.upper_bound = np.ones(5) * 1.1
        predictor.predict.return_value = result

        drift_monitor = MagicMock(spec=DriftMonitor)
        scraper = MagicMock()
        reg = CollectorRegistry()

        emitter = MetricEmitter(
            watch_config=watch,
            predictor=predictor,
            drift_monitor=drift_monitor,
            scraper=scraper,
            emit_confidence_bounds=True,
            registry=reg,
        )

        emitter.tick()
        # main + lower + upper = 15 gauges (5 steps each)
        assert len(emitter._gauges) == 15

    def test_second_tick_reuses_existing_gauges(self):
        reg = CollectorRegistry()
        emitter, _, _, _ = self._make_emitter(registry=reg)

        emitter.tick()
        gauge_count_after_first = len(emitter._gauges)

        emitter.tick()
        gauge_count_after_second = len(emitter._gauges)

        assert gauge_count_after_first == gauge_count_after_second


class TestEmitterServer:

    def test_register_adds_emitter(self):
        config = SentinelConfig(emitter_port=19090)
        server = EmitterServer(config)
        emitter = MagicMock()
        server.register(emitter)
        assert emitter in server._emitters

    def test_stop_sets_stop_event(self):
        config = SentinelConfig(emitter_port=19091)
        server = EmitterServer(config)
        server.stop()
        assert server._stop_event.is_set()

    def test_loop_calls_tick_on_all_emitters(self):
        config = SentinelConfig(emitter_port=19092)
        server = EmitterServer(config)

        emitter1 = MagicMock()
        emitter2 = MagicMock()
        server.register(emitter1)
        server.register(emitter2)

        # patch sleep and stop after first pass by setting event inside sleep
        def stop_after_one(*args, **kwargs):
            server._stop_event.set()

        with patch("sentinel.emitter.server.time.sleep", side_effect=stop_after_one):
            server._loop()

        emitter1.tick.assert_called_once()
        emitter2.tick.assert_called_once()

    def test_loop_continues_after_emitter_exception(self):
        config = SentinelConfig(emitter_port=19093)
        server = EmitterServer(config)

        bad_emitter = MagicMock()
        bad_emitter.tick.side_effect = RuntimeError("boom")
        bad_emitter.watch_config = MagicMock()
        bad_emitter.watch_config.metric = "bad_metric"

        good_emitter = MagicMock()
        server.register(bad_emitter)
        server.register(good_emitter)

        def stop_after_one(*args, **kwargs):
            server._stop_event.set()

        with patch("sentinel.emitter.server.time.sleep", side_effect=stop_after_one):
            server._loop()

        good_emitter.tick.assert_called_once()

    def test_start_launches_thread(self):
        config = SentinelConfig(emitter_port=19094)
        server = EmitterServer(config)

        with patch("sentinel.emitter.server.start_http_server"):
            server.start()

        assert server._thread is not None
        assert server._thread.is_alive()
        server.stop()

    def test_start_starts_http_server_on_configured_port(self):
        config = SentinelConfig(emitter_port=19095)
        server = EmitterServer(config)

        with patch("sentinel.emitter.server.start_http_server") as mock_http:
            server.start()
            mock_http.assert_called_once_with(19095)

        server.stop()


================================================
FILE: tests/test_ingestor/__init__.py
================================================
[Empty file]


================================================
FILE: tests/test_ingestor/test_buffer.py
================================================
# tests/test_ingestor/test_buffer.py

import time
import pytest
import numpy as np
from sentinel.ingestor.buffer import MetricBuffer, BufferRegistry


class TestMetricBuffer:

    def _make_buffer(self, lookback="5m", granularity="1m"):
        return MetricBuffer(metric="test_metric", lookback=lookback, granularity=granularity)

    def test_push_single(self):
        buf = self._make_buffer()
        buf.push(1000.0, 42.0)
        assert len(buf) == 1

    def test_push_many(self):
        buf = self._make_buffer()
        samples = [(float(i), float(i * 2)) for i in range(10)]
        buf.push_many(samples)
        assert len(buf) == 5  # maxlen is 5 (5m / 1m)

    def test_evicts_oldest_on_overflow(self):
        buf = self._make_buffer()
        for i in range(10):
            buf.push(float(i), float(i))
        values = buf.get_values()
        assert values[0] == 5.0  # first 5 evicted

    def test_get_values_returns_ndarray(self):
        buf = self._make_buffer()
        buf.push(1.0, 99.0)
        values = buf.get_values()
        assert isinstance(values, np.ndarray)
        assert values[0] == 99.0

    def test_get_timestamps_returns_ndarray(self):
        buf = self._make_buffer()
        buf.push(1234.0, 99.0)
        timestamps = buf.get_timestamps()
        assert isinstance(timestamps, np.ndarray)
        assert timestamps[0] == 1234.0

    def test_get_samples(self):
        buf = self._make_buffer()
        buf.push(1.0, 2.0)
        buf.push(3.0, 4.0)
        samples = buf.get_samples()
        assert samples == [(1.0, 2.0), (3.0, 4.0)]

    def test_get_recent(self):
        buf = self._make_buffer()
        for i in range(5):
            buf.push(float(i), float(i))
        recent = buf.get_recent(3)
        assert list(recent) == [2.0, 3.0, 4.0]

    def test_get_recent_fewer_than_n(self):
        buf = self._make_buffer()
        buf.push(1.0, 10.0)
        recent = buf.get_recent(10)
        assert len(recent) == 1

    def test_is_ready_false_when_not_full(self):
        buf = self._make_buffer()
        buf.push(1.0, 1.0)
        assert buf.is_ready() is False

    def test_is_ready_true_when_full(self):
        buf = self._make_buffer()
        for i in range(5):
            buf.push(float(i), float(i))
        assert buf.is_ready() is True

    def test_fill_fraction(self):
        buf = self._make_buffer()
        buf.push(1.0, 1.0)
        buf.push(2.0, 2.0)
        assert buf.fill_fraction() == pytest.approx(2 / 5)

    def test_fill_fraction_full(self):
        buf = self._make_buffer()
        for i in range(5):
            buf.push(float(i), float(i))
        assert buf.fill_fraction() == pytest.approx(1.0)

    def test_capacity(self):
        buf = self._make_buffer()
        assert buf.capacity() == 5

    def test_clear(self):
        buf = self._make_buffer()
        for i in range(5):
            buf.push(float(i), float(i))
        buf.clear()
        assert len(buf) == 0

    def test_thread_safety(self):
        import threading
        buf = self._make_buffer(lookback="10m", granularity="1m")
        errors = []

        def writer():
            try:
                for i in range(50):
                    buf.push(float(i), float(i))
            except Exception as e:
                errors.append(e)

        threads = [threading.Thread(target=writer) for _ in range(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()

        assert errors == []


class TestBufferRegistry:

    def test_register_and_get(self):
        reg = BufferRegistry()
        buf = reg.register(key="m1", metric="m1", lookback="5m", granularity="1m")
        assert reg.get("m1") is buf

    def test_register_duplicate_returns_existing(self):
        reg = BufferRegistry()
        buf1 = reg.register(key="m1", metric="m1", lookback="5m", granularity="1m")
        buf2 = reg.register(key="m1", metric="m1", lookback="5m", granularity="1m")
        assert buf1 is buf2

    def test_get_missing_returns_none(self):
        reg = BufferRegistry()
        assert reg.get("nonexistent") is None

    def test_all_ready_false_when_empty_buffers(self):
        reg = BufferRegistry()
        reg.register(key="m1", metric="m1", lookback="5m", granularity="1m")
        assert reg.all_ready() is False

    def test_all_ready_true_when_all_full(self):
        reg = BufferRegistry()
        buf = reg.register(key="m1", metric="m1", lookback="5m", granularity="1m")
        for i in range(5):
            buf.push(float(i), float(i))
        assert reg.all_ready() is True

    def test_keys(self):
        reg = BufferRegistry()
        reg.register(key="m1", metric="m1", lookback="5m", granularity="1m")
        reg.register(key="m2", metric="m2", lookback="5m", granularity="1m")
        assert set(reg.keys()) == {"m1", "m2"}


================================================
FILE: tests/test_ingestor/test_features.py
================================================
# tests/test_ingestor/test_features.py

import pytest
import numpy as np
from sentinel.ingestor.features import (
    build_lag_features,
    build_feature_matrix,
    build_prediction_input,
)


class TestBuildLagFeatures:

    def test_basic_shape(self):
        values = np.arange(10, dtype=float)
        X, y = build_lag_features(values, n_lags=3)
        assert X.shape == (7, 3)
        assert y.shape == (7,)

    def test_first_row(self):
        values = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
        X, y = build_lag_features(values, n_lags=3)
        assert list(X[0]) == [1.0, 2.0, 3.0]
        assert y[0] == 4.0

    def test_last_row(self):
        values = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
        X, y = build_lag_features(values, n_lags=3)
        assert list(X[-1]) == [2.0, 3.0, 4.0]
        assert y[-1] == 5.0

    def test_not_enough_values_raises(self):
        values = np.array([1.0, 2.0, 3.0])
        with pytest.raises(ValueError):
            build_lag_features(values, n_lags=3)

    def test_single_lag(self):
        values = np.array([1.0, 2.0, 3.0])
        X, y = build_lag_features(values, n_lags=1)
        assert X.shape == (2, 1)
        assert y.shape == (2,)


class TestBuildFeatureMatrix:

    def _make_values(self, n=50):
        return np.linspace(0, 10, n)

    def _make_timestamps(self, n=50):
        base = 1700000000.0
        return np.array([base + i * 60 for i in range(n)])

    def test_output_shapes_consistent(self):
        values = self._make_values()
        X, y = build_feature_matrix(
            values=values,
            lookback="5m",
            granularity="1m",
            include_rolling_mean=True,
            include_rolling_std=True,
            include_time_features=False,
        )
        assert X.shape[0] == y.shape[0]
        assert X.shape[1] > 5  # lags + rolling mean + rolling std

    def test_with_time_features(self):
        values = self._make_values()
        timestamps = self._make_timestamps()
        X, y = build_feature_matrix(
            values=values,
            lookback="5m",
            granularity="1m",
            include_time_features=True,
            timestamps=timestamps,
        )
        assert X.shape[1] == 5 + 1 + 1 + 4  # lags + mean + std + 4 time features

    def test_without_optional_features(self):
        values = self._make_values()
        X, y = build_feature_matrix(
            values=values,
            lookback="5m",
            granularity="1m",
            include_rolling_mean=False,
            include_rolling_std=False,
            include_time_features=False,
        )
        assert X.shape[1] == 5  # only lag features

    def test_time_features_without_timestamps_skipped(self):
        values = self._make_values()
        X, y = build_feature_matrix(
            values=values,
            lookback="5m",
            granularity="1m",
            include_rolling_mean=False,
            include_rolling_std=False,
            include_time_features=True,
            timestamps=None,
        )
        assert X.shape[1] == 5  # time features skipped gracefully


class TestBuildPredictionInput:

    def _make_values(self, n=50):
        return np.linspace(0, 10, n)

    def _make_timestamps(self, n=50):
        base = 1700000000.0
        return np.array([base + i * 60 for i in range(n)])

    def test_output_shape(self):
        values = self._make_values()
        X = build_prediction_input(
            values=values,
            lookback="5m",
            granularity="1m",
            include_rolling_mean=False,
            include_rolling_std=False,
            include_time_features=False,
        )
        assert X.shape == (1, 5)

    def test_uses_last_n_values(self):
        values = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
        X = build_prediction_input(
            values=values,
            lookback="5m",
            granularity="1m",
            include_rolling_mean=False,
            include_rolling_std=False,
            include_time_features=False,
        )
        assert list(X[0]) == [3.0, 4.0, 5.0, 6.0, 7.0]

    def test_not_enough_values_raises(self):
        values = np.array([1.0, 2.0])
        with pytest.raises(ValueError):
            build_prediction_input(
                values=values,
                lookback="5m",
                granularity="1m",
                include_rolling_mean=False,
                include_rolling_std=False,
                include_time_features=False,
            )

    def test_with_all_features(self):
        values = self._make_values()
        timestamps = self._make_timestamps()
        X = build_prediction_input(
            values=values,
            lookback="5m",
            granularity="1m",
            include_rolling_mean=True,
            include_rolling_std=True,
            include_time_features=True,
            timestamps=timestamps,
        )
        assert X.shape == (1, 5 + 1 + 1 + 4)


================================================
FILE: tests/test_ingestor/test_scraper.py
================================================
# tests/test_ingestor/test_scraper.py

import pytest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from sentinel.ingestor.scraper import PrometheusScraper


def _make_scraper():
    return PrometheusScraper(prometheus_url="http://localhost:9090", timeout=5)


def _mock_range_response(values: list):
    return {
        "status": "success",
        "data": {
            "result": [
                {"values": [[ts, str(val)] for ts, val in values]}
            ]
        }
    }


def _mock_instant_response(ts: float, val: float):
    return {
        "status": "success",
        "data": {
            "result": [
                {"value": [ts, str(val)]}
            ]
        }
    }


class TestBuildSelector:

    def test_no_labels(self):
        scraper = _make_scraper()
        assert scraper._build_selector("my_metric", {}) == "my_metric"

    def test_with_single_label(self):
        scraper = _make_scraper()
        result = scraper._build_selector("my_metric", {"job": "api"})
        assert result == 'my_metric{job="api"}'

    def test_with_multiple_labels_sorted(self):
        scraper = _make_scraper()
        result = scraper._build_selector("my_metric", {"job": "api", "env": "prod"})
        assert result == 'my_metric{env="prod",job="api"}'

    def test_labels_always_sorted(self):
        scraper = _make_scraper()
        r1 = scraper._build_selector("m", {"b": "2", "a": "1"})
        r2 = scraper._build_selector("m", {"a": "1", "b": "2"})
        assert r1 == r2


class TestFetchRange:

    def test_returns_samples_on_success(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = _mock_range_response(
            [(1700000060.0, 1.5), (1700000120.0, 2.0)]
        )
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_range(
                metric="http_requests_total",
                labels={"job": "api"},
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
                granularity="1m",
            )

        assert len(result) == 2
        assert result[0] == (1700000060.0, 1.5)
        assert result[1] == (1700000120.0, 2.0)

    def test_returns_empty_on_no_results(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "status": "success",
            "data": {"result": []}
        }
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_range(
                metric="my_metric",
                labels={},
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
                granularity="1m",
            )

        assert result == []

    def test_returns_empty_on_non_success_status(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = {"status": "error", "error": "bad query"}
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_range(
                metric="my_metric",
                labels={},
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
                granularity="1m",
            )

        assert result == []

    def test_returns_empty_on_http_error(self):
        import httpx
        scraper = _make_scraper()

        with patch.object(scraper._client, "get", side_effect=httpx.RequestError("timeout")):
            result = scraper.fetch_range(
                metric="my_metric",
                labels={},
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
                granularity="1m",
            )

        assert result == []

    def test_values_are_floats(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = _mock_range_response(
            [(1700000060, 42)]
        )
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_range(
                metric="m",
                labels={},
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
                granularity="1m",
            )

        ts, val = result[0]
        assert isinstance(ts, float)
        assert isinstance(val, float)

    def test_uses_first_series_only(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "status": "success",
            "data": {
                "result": [
                    {"values": [[1.0, "10"]]},
                    {"values": [[2.0, "20"]]},  # second series, should be ignored
                ]
            }
        }
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_range(
                metric="m", labels={},
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
                granularity="1m",
            )

        assert len(result) == 1
        assert result[0] == (1.0, 10.0)


class TestFetchLatest:

    def test_returns_tuple_on_success(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = _mock_instant_response(1700000060.0, 3.14)
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_latest("my_metric", {})

        assert result == (1700000060.0, 3.14)

    def test_returns_none_on_empty_result(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "status": "success",
            "data": {"result": []}
        }
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_latest("my_metric", {})

        assert result is None

    def test_returns_none_on_request_error(self):
        import httpx
        scraper = _make_scraper()

        with patch.object(scraper._client, "get", side_effect=httpx.RequestError("conn refused")):
            result = scraper.fetch_latest("my_metric", {})

        assert result is None

    def test_returns_none_on_non_success_status(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = {"status": "error"}
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_latest("my_metric", {})

        assert result is None

    def test_values_are_floats(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.json.return_value = _mock_instant_response(1700000000, 99)
        mock_response.raise_for_status = MagicMock()

        with patch.object(scraper._client, "get", return_value=mock_response):
            result = scraper.fetch_latest("my_metric", {})

        ts, val = result
        assert isinstance(ts, float)
        assert isinstance(val, float)


class TestCheckConnectivity:

    def test_returns_true_on_200(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.status_code = 200

        with patch.object(scraper._client, "get", return_value=mock_response):
            assert scraper.check_connectivity() is True

    def test_returns_false_on_non_200(self):
        scraper = _make_scraper()
        mock_response = MagicMock()
        mock_response.status_code = 503

        with patch.object(scraper._client, "get", return_value=mock_response):
            assert scraper.check_connectivity() is False

    def test_returns_false_on_request_error(self):
        import httpx
        scraper = _make_scraper()

        with patch.object(scraper._client, "get", side_effect=httpx.RequestError("refused")):
            assert scraper.check_connectivity() is False


================================================
FILE: tests/test_pipeline/__init__.py
================================================
[Empty file]


================================================
FILE: tests/test_pipeline/test_drift.py
================================================
# tests/test_pipeline/test_drift.py

import pytest
import numpy as np
from sentinel.pipeline.drift import DriftMonitor, DriftSeverity, DriftResult


class TestDriftMonitor:

    def _make_monitor(self, finetune=0.1, retrain=0.3, window=10):
        return DriftMonitor(
            metric="test_metric",
            finetune_threshold=finetune,
            retrain_threshold=retrain,
            window_size=window,
        )

    def test_initial_check_returns_none_severity(self):
        monitor = self._make_monitor()
        result = monitor.check()
        assert result.severity == DriftSeverity.NONE
        assert result.mae == 0.0

    def test_record_single(self):
        monitor = self._make_monitor()
        monitor.record(predicted=1.0, actual=1.05)
        assert monitor.sample_count() == 1

    def test_no_drift_below_threshold(self):
        monitor = self._make_monitor()
        for _ in range(5):
            monitor.record(predicted=1.0, actual=1.01)  # MAE = 0.01
        result = monitor.check()
        assert result.severity == DriftSeverity.NONE

    def test_low_drift_between_thresholds(self):
        monitor = self._make_monitor(finetune=0.1, retrain=0.3)
        for _ in range(5):
            monitor.record(predicted=1.0, actual=1.2)  # MAE = 0.2
        result = monitor.check()
        assert result.severity == DriftSeverity.LOW

    def test_high_drift_above_retrain_threshold(self):
        monitor = self._make_monitor(finetune=0.1, retrain=0.3)
        for _ in range(5):
            monitor.record(predicted=1.0, actual=1.5)  # MAE = 0.5
        result = monitor.check()
        assert result.severity == DriftSeverity.HIGH

    def test_record_many(self):
        monitor = self._make_monitor()
        predicted = np.array([1.0, 2.0, 3.0])
        actual = np.array([1.1, 2.1, 3.1])
        monitor.record_many(predicted, actual)
        assert monitor.sample_count() == 3
        assert monitor.current_mae() == pytest.approx(0.1, abs=1e-6)

    def test_window_evicts_old_samples(self):
        monitor = self._make_monitor(window=3)
        for _ in range(3):
            monitor.record(predicted=1.0, actual=1.5)  # high residuals
        for _ in range(3):
            monitor.record(predicted=1.0, actual=1.01)  # low residuals
        # only last 3 should count
        assert monitor.current_mae() == pytest.approx(0.01, abs=1e-6)

    def test_reset_clears_residuals(self):
        monitor = self._make_monitor()
        for _ in range(5):
            monitor.record(predicted=1.0, actual=2.0)
        monitor.reset()
        assert monitor.sample_count() == 0
        assert monitor.current_mae() == 0.0
        result = monitor.check()
        assert result.severity == DriftSeverity.NONE

    def test_current_mae_empty(self):
        monitor = self._make_monitor()
        assert monitor.current_mae() == 0.0

    def test_drift_result_carries_thresholds(self):
        monitor = self._make_monitor(finetune=0.1, retrain=0.3)
        monitor.record(1.0, 1.2)
        result = monitor.check()
        assert result.threshold_finetune == 0.1
        assert result.threshold_retrain == 0.3

    def test_thread_safety(self):
        import threading
        monitor = self._make_monitor(window=100)
        errors = []

        def recorder():
            try:
                for _ in range(50):
                    monitor.record(1.0, 1.1)
            except Exception as e:
                errors.append(e)

        threads = [threading.Thread(target=recorder) for _ in range(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()

        assert errors == []


================================================
FILE: tests/test_pipeline/test_predictor.py
================================================
# tests/test_pipeline/test_predictor.py

import pytest
import numpy as np
from sentinel.pipeline.versioning import VersionStore
from sentinel.pipeline.registry import ModelRegistry
from sentinel.pipeline.trainer import Trainer
from sentinel.pipeline.predictor import Predictor
from sentinel.ingestor.buffer import MetricBuffer
from sentinel.config import WatchConfig
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel
from sentinel.pipeline.drift import DriftSeverity


def _make_watch():
    return WatchConfig(
        metric="test_metric",
        labels={},
        model_class=ExponentialSmoothingModel,
        granularity="1m",
        horizon="5m",
        lookback="10m",       # was 30m
        cron="0 */6 * * *",
    )


def _fill_buffer(buf, n=20):   # was 35
    for i in range(n):
        buf.push(float(1700000000 + i * 60), float(i) + np.random.normal(0, 0.1))


@pytest.fixture
def trained_setup(tmp_path):
    watch = _make_watch()
    buf = MetricBuffer(metric="test_metric", lookback="10m", granularity="1m")
    _fill_buffer(buf, n=20)
    # rest stays the same
    store = VersionStore(metric_key="test_metric", artifact_store=str(tmp_path), max_versions=5)
    registry = ModelRegistry(metric_key="test_metric", version_store=store)
    trainer = Trainer(watch_config=watch, buffer=buf, registry=registry)
    trainer.run(drift_severity=DriftSeverity.NONE)
    return watch, buf, registry


class TestPredictor:

    def test_returns_none_when_model_not_ready(self, tmp_path):
        watch = _make_watch()
        buf = MetricBuffer(metric="test_metric", lookback="30m", granularity="1m")
        _fill_buffer(buf, n=35)
        store = VersionStore(
            metric_key="test_metric",
            artifact_store=str(tmp_path),
            max_versions=5,
        )
        registry = ModelRegistry(metric_key="test_metric", version_store=store)
        predictor = Predictor(watch_config=watch, buffer=buf, registry=registry)
        result = predictor.predict()
        assert result is None

    def test_returns_prediction_result_after_training(self, trained_setup):
        watch, buf, registry = trained_setup
        predictor = Predictor(watch_config=watch, buffer=buf, registry=registry)
        result = predictor.predict()
        assert result is not None

    def test_prediction_values_length_equals_horizon_steps(self, trained_setup):
        watch, buf, registry = trained_setup
        predictor = Predictor(watch_config=watch, buffer=buf, registry=registry)
        result = predictor.predict()
        # horizon=5m, granularity=1m -> 5 steps
        assert len(result.values) == 5

    def test_prediction_timestamps_in_future(self, trained_setup):
        import time
        watch, buf, registry = trained_setup
        predictor = Predictor(watch_config=watch, buffer=buf, registry=registry)
        result = predictor.predict()
        now = time.time()
        assert all(ts > now for ts in result.timestamps)

    def test_prediction_has_model_version(self, trained_setup):
        watch, buf, registry = trained_setup
        predictor = Predictor(watch_config=watch, buffer=buf, registry=registry)
        result = predictor.predict()
        assert result.model_version is not None

    def test_returns_none_when_buffer_insufficient(self, tmp_path):
        watch = _make_watch()
        buf = MetricBuffer(metric="test_metric", lookback="30m", granularity="1m")
        buf.push(1.0, 1.0)  # only one sample

        store = VersionStore(
            metric_key="test_metric",
            artifact_store=str(tmp_path),
            max_versions=5,
        )
        registry = ModelRegistry(metric_key="test_metric", version_store=store)
        predictor = Predictor(watch_config=watch, buffer=buf, registry=registry)
        result = predictor.predict()
        assert result is None


================================================
FILE: tests/test_pipeline/test_registry.py
================================================
# tests/test_pipeline/test_registry.py

import os
import pytest
import tempfile
import numpy as np
from sentinel.pipeline.versioning import VersionStore, ModelVersion
from sentinel.pipeline.registry import ModelRegistry
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel


def _make_version(version_id="v1", mae=0.05):
    return ModelVersion(
        version_id=version_id,
        metric_key="test_metric",
        model_class="ExponentialSmoothingModel",
        trained_at="2024-01-01T00:00:00+00:00",
        training_policy="full_retrain",
        drift_score_at_trigger=0.0,
        mae=mae,
        mape=1.0,
        n_samples=100,
        artifact_path="",
        extra={"granularity": "1m", "horizon": "5m", "lookback": "30m"},
    )


def _make_model():
    return ExponentialSmoothingModel(
        granularity="1m",
        horizon="5m",
        lookback="30m",
    )


def _fit_model(model):
    y = np.linspace(1, 10, 50)
    X = np.zeros((50, 30))
    model.fit(X, y)
    return model


@pytest.fixture
def tmp_store(tmp_path):
    return VersionStore(
        metric_key="test_metric",
        artifact_store=str(tmp_path),
        max_versions=5,
    )


@pytest.fixture
def registry(tmp_store):
    return ModelRegistry(metric_key="test_metric", version_store=tmp_store)


class TestModelRegistry:

    def test_initial_not_ready(self, registry):
        assert registry.is_ready() is False

    def test_get_model_returns_none_initially(self, registry):
        assert registry.get_model() is None

    def test_promote_makes_registry_ready(self, registry):
        model = _fit_model(_make_model())
        version = _make_version()
        registry.promote(model, version)
        assert registry.is_ready() is True

    def test_promote_sets_active_version(self, registry):
        model = _fit_model(_make_model())
        version = _make_version(version_id="v1")
        registry.promote(model, version)
        assert registry.active_version().version_id == "v1"

    def test_promote_saves_artifact(self, registry, tmp_path):
        model = _fit_model(_make_model())
        version = _make_version(version_id="v1")
        registry.promote(model, version)
        assert os.path.exists(version.artifact_path)

    def test_get_model_returns_promoted_model(self, registry):
        model = _fit_model(_make_model())
        version = _make_version()
        registry.promote(model, version)
        assert registry.get_model() is model

    def test_rollback_with_no_previous_returns_false(self, registry):
        model = _fit_model(_make_model())
        version = _make_version(version_id="v1")
        registry.promote(model, version)
        result = registry.rollback()
        assert result is False

    def test_rollback_restores_previous(self, registry):
        model1 = _fit_model(_make_model())
        version1 = _make_version(version_id="v1")
        registry.promote(model1, version1)

        model2 = _fit_model(_make_model())
        version2 = _make_version(version_id="v2")
        registry.promote(model2, version2)

        assert registry.active_version().version_id == "v2"
        result = registry.rollback()
        assert result is True
        assert registry.active_version().version_id == "v1"

    def test_restore_from_disk_no_artifact(self, registry):
        result = registry.restore_from_disk(lambda: _make_model())
        assert result is False

    def test_restore_from_disk_after_promote(self, registry, tmp_store):
        model = _fit_model(_make_model())
        version = _make_version(version_id="v1")
        registry.promote(model, version)

        new_registry = ModelRegistry(
            metric_key="test_metric",
            version_store=tmp_store,
        )
        result = new_registry.restore_from_disk(lambda: _make_model())
        assert result is True
        assert new_registry.is_ready() is True


================================================
FILE: tests/test_pipeline/test_scheduler.py
================================================
# tests/test_pipeline/test_scheduler.py

import time
import threading
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime, timezone
from sentinel.pipeline.scheduler import MetricScheduler
from sentinel.pipeline.drift import DriftMonitor, DriftSeverity
from sentinel.ingestor.buffer import MetricBuffer
from sentinel.config import WatchConfig
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel


def _make_watch():
    return WatchConfig(
        metric="test_metric",
        labels={},
        model_class=ExponentialSmoothingModel,
        granularity="1m",
        horizon="5m",
        lookback="5m",
        cron="* * * * *",  # every minute for fast tests
    )


def _make_drift_monitor(severity=DriftSeverity.NONE):
    monitor = MagicMock(spec=DriftMonitor)
    result = MagicMock()
    result.severity = severity
    result.mae = 0.0
    monitor.check.return_value = result
    return monitor


def _make_full_buffer():
    buf = MetricBuffer(metric="test_metric", lookback="5m", granularity="1m")
    for i in range(5):
        buf.push(float(i), float(i))
    return buf


def _make_empty_buffer():
    return MetricBuffer(metric="test_metric", lookback="5m", granularity="1m")


class TestMetricScheduler:

    def test_start_and_stop(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()
        train_fn = MagicMock()

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )
        scheduler.start()
        assert scheduler._thread is not None
        assert scheduler._thread.is_alive()
        scheduler.stop()
        scheduler._thread.join(timeout=15)   # was 5 inside stop(), give it more
        assert not scheduler._thread.is_alive()

    def test_cold_start_fires_when_buffer_ready(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()
        train_calls = []

        def train_fn(severity, drift_score):
            train_calls.append((severity, drift_score))

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        # call tick directly to avoid thread timing
        scheduler._tick()

        # give the dispatched thread time to run
        time.sleep(0.1)

        assert len(train_calls) == 1
        assert train_calls[0][0] == DriftSeverity.NONE
        assert scheduler._cold_start_done is True

    def test_cold_start_does_not_fire_when_buffer_not_ready(self):
        watch = _make_watch()
        buf = _make_empty_buffer()
        monitor = _make_drift_monitor()
        train_fn = MagicMock()

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        scheduler._tick()
        time.sleep(0.1)

        train_fn.assert_not_called()
        assert scheduler._cold_start_done is False

    def test_cold_start_fires_only_once(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()
        train_calls = []

        def train_fn(severity, drift_score):
            train_calls.append((severity, drift_score))

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        scheduler._tick()
        scheduler._tick()
        scheduler._tick()
        time.sleep(0.2)

        # cold start only fires once â€” subsequent ticks go to cron/drift path
        cold_start_calls = [c for c in train_calls if not scheduler._cold_start_done or True]
        assert train_calls[0][0] == DriftSeverity.NONE

    def test_drift_triggers_retrain_after_cold_start(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        train_calls = []

        def train_fn(severity, drift_score):
            train_calls.append((severity, drift_score))

        monitor = _make_drift_monitor(severity=DriftSeverity.HIGH)
        monitor.check.return_value.mae = 0.5

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        # simulate cold start already done
        scheduler._cold_start_done = True
        scheduler._last_cron_fire = datetime.now(timezone.utc)

        scheduler._tick()
        time.sleep(0.1)

        assert len(train_calls) == 1
        assert train_calls[0][0] == DriftSeverity.HIGH

    def test_drift_monitor_reset_called_after_drift_trigger(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        train_fn = MagicMock()

        monitor = _make_drift_monitor(severity=DriftSeverity.LOW)
        monitor.check.return_value.mae = 0.2

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        scheduler._cold_start_done = True
        scheduler._last_cron_fire = datetime.now(timezone.utc)

        scheduler._tick()
        time.sleep(0.1)

        monitor.reset.assert_called_once()

    def test_cron_not_due_when_just_fired(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()
        train_fn = MagicMock()

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        scheduler._cold_start_done = True
        scheduler._last_cron_fire = datetime.now(timezone.utc)

        assert scheduler._cron_due(datetime.now(timezone.utc)) is False

    def test_cron_due_after_interval_elapsed(self):
        from datetime import timedelta
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()
        train_fn = MagicMock()

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        scheduler._cold_start_done = True
        # set last fire to 2 minutes ago â€” cron is every minute so it should be due
        scheduler._last_cron_fire = datetime.now(timezone.utc) - timedelta(minutes=2)

        assert scheduler._cron_due(datetime.now(timezone.utc)) is True

    def test_cron_not_due_when_last_fire_is_none(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()
        train_fn = MagicMock()

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        assert scheduler._cron_due(datetime.now(timezone.utc)) is False

    def test_fire_dispatches_in_background_thread(self):
        watch = _make_watch()
        buf = _make_full_buffer()
        monitor = _make_drift_monitor()

        fired_threads = []

        def train_fn(severity, drift_score):
            fired_threads.append(threading.current_thread().name)

        scheduler = MetricScheduler(
            watch_config=watch,
            buffer=buf,
            drift_monitor=monitor,
            train_fn=train_fn,
        )

        scheduler._fire(DriftSeverity.NONE, 0.0)
        time.sleep(0.1)

        assert len(fired_threads) == 1
        # should not have run on the main thread
        assert fired_threads[0] != threading.main_thread().name


================================================
FILE: tests/test_pipeline/test_trainer.py
================================================
# tests/test_pipeline/test_trainer.py

import pytest
import numpy as np
from sentinel.pipeline.drift import DriftSeverity
from sentinel.pipeline.versioning import VersionStore
from sentinel.pipeline.registry import ModelRegistry
from sentinel.pipeline.trainer import Trainer
from sentinel.ingestor.buffer import MetricBuffer
from sentinel.config import WatchConfig
from sentinel.pipeline.models.smoothing import ExponentialSmoothingModel
from sentinel.pipeline.models.linear import LinearTrendModel


def _make_watch_config(model_class=None):
    return WatchConfig(
        metric="test_metric",
        labels={},
        model_class=model_class or ExponentialSmoothingModel,
        granularity="1m",
        horizon="5m",
        lookback="10m",       # 10 lags, needs 11+ values
        cron="0 */6 * * *",
    )


def _fill_buffer(buf, n=35):
    for i in range(n):
        buf.push(float(1700000000 + i * 60), float(i) + np.random.normal(0, 0.1))


@pytest.fixture
def tmp_registry(tmp_path):
    store = VersionStore(
        metric_key="test_metric",
        artifact_store=str(tmp_path),
        max_versions=5,
    )
    return ModelRegistry(metric_key="test_metric", version_store=store)


@pytest.fixture
def buffer():
    buf = MetricBuffer(metric="test_metric", lookback="10m", granularity="1m")
    for i in range(20):      # 20 > 10, plenty of room
        buf.push(float(1700000000 + i * 60), float(i) + np.random.normal(0, 0.1))
    return buf



class TestTrainer:

    def test_run_cold_start_promotes_model(self, buffer, tmp_registry):
        watch = _make_watch_config()
        trainer = Trainer(watch_config=watch, buffer=buffer, registry=tmp_registry)
        result = trainer.run(drift_severity=DriftSeverity.NONE, drift_score=0.0)
        assert result is not None
        assert tmp_registry.is_ready()

    def test_run_returns_training_result(self, buffer, tmp_registry):
        watch = _make_watch_config()
        trainer = Trainer(watch_config=watch, buffer=buffer, registry=tmp_registry)
        result = trainer.run()
        assert result is not None
        assert result.mae >= 0
        assert result.mape >= 0
        assert result.n_samples > 0

    def test_full_retrain_policy_on_high_drift(self, buffer, tmp_registry):
        watch = _make_watch_config()
        trainer = Trainer(watch_config=watch, buffer=buffer, registry=tmp_registry)
        trainer.run(drift_severity=DriftSeverity.NONE)
        result = trainer.run(drift_severity=DriftSeverity.HIGH, drift_score=0.5)
        assert result is not None
        assert result.training_policy == "full_retrain"

    def test_finetune_policy_on_low_drift(self, buffer, tmp_registry):
        watch = _make_watch_config(model_class=LinearTrendModel)
        trainer = Trainer(watch_config=watch, buffer=buffer, registry=tmp_registry)
        trainer.run(drift_severity=DriftSeverity.NONE)
        result = trainer.run(drift_severity=DriftSeverity.LOW, drift_score=0.15)
        assert result is not None
        assert result.training_policy == "finetune"

    def test_run_with_empty_buffer_returns_none(self, tmp_registry):
        watch = _make_watch_config()
        buf = MetricBuffer(metric="test_metric", lookback="30m", granularity="1m")
        trainer = Trainer(watch_config=watch, buffer=buf, registry=tmp_registry)
        result = trainer.run()
        assert result is None

    def test_version_increments_on_each_run(self, buffer, tmp_registry):
        watch = _make_watch_config()
        trainer = Trainer(watch_config=watch, buffer=buffer, registry=tmp_registry)
        trainer.run()
        trainer.run()
        versions = tmp_registry.version_store.get_all()
        assert len(versions) == 2


================================================
FILE: tests/test_utils/__init__.py
================================================
[Empty file]


================================================
FILE: tests/test_utils/test_cron.py
================================================
# tests/test_utils/test_cron.py

import pytest
from datetime import datetime, timezone, timedelta
from sentinel.utils.cron import (
    is_valid_cron,
    get_next_fire_time,
    get_previous_fire_time,
    seconds_until_next_fire,
    seconds_since_last_fire,
    get_fire_times_between,
)


class TestIsValidCron:

    def test_valid_every_hour(self):
        assert is_valid_cron("0 * * * *") is True

    def test_valid_every_six_hours(self):
        assert is_valid_cron("0 */6 * * *") is True

    def test_valid_every_minute(self):
        assert is_valid_cron("* * * * *") is True

    def test_invalid_too_few_fields(self):
        assert is_valid_cron("* * * *") is False

    def test_invalid_out_of_range(self):
        assert is_valid_cron("60 * * * *") is False

    def test_invalid_empty_string(self):
        assert is_valid_cron("") is False

    def test_invalid_random_string(self):
        assert is_valid_cron("every six hours") is False


class TestGetNextFireTime:

    def test_next_is_in_future(self):
        base = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
        next_fire = get_next_fire_time("0 * * * *", base=base)
        assert next_fire > base

    def test_every_hour_increments_by_one_hour(self):
        base = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
        next_fire = get_next_fire_time("0 * * * *", base=base)
        assert next_fire == datetime(2024, 1, 1, 13, 0, 0, tzinfo=timezone.utc)

    def test_invalid_cron_raises(self):
        with pytest.raises(ValueError):
            get_next_fire_time("not a cron")


class TestGetPreviousFireTime:

    def test_prev_is_in_past(self):
        base = datetime(2024, 1, 1, 12, 30, 0, tzinfo=timezone.utc)
        prev_fire = get_previous_fire_time("0 * * * *", base=base)
        assert prev_fire < base

    def test_every_hour_returns_last_hour(self):
        base = datetime(2024, 1, 1, 12, 30, 0, tzinfo=timezone.utc)
        prev_fire = get_previous_fire_time("0 * * * *", base=base)
        assert prev_fire == datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)

    def test_invalid_cron_raises(self):
        with pytest.raises(ValueError):
            get_previous_fire_time("not a cron")


class TestSecondsUntilNextFire:

    def test_returns_positive(self):
        base = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
        secs = seconds_until_next_fire("0 * * * *", base=base)
        assert secs > 0

    def test_roughly_one_hour(self):
        base = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
        secs = seconds_until_next_fire("0 * * * *", base=base)
        assert abs(secs - 3600) < 5


class TestSecondsSinceLastFire:

    def test_returns_positive(self):
        base = datetime(2024, 1, 1, 12, 30, 0, tzinfo=timezone.utc)
        secs = seconds_since_last_fire("0 * * * *", base=base)
        assert secs > 0

    def test_roughly_thirty_minutes(self):
        base = datetime(2024, 1, 1, 12, 30, 0, tzinfo=timezone.utc)
        secs = seconds_since_last_fire("0 * * * *", base=base)
        assert abs(secs - 1800) < 5


class TestGetFireTimesBetween:

    def test_correct_count(self):
        start = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
        end = datetime(2024, 1, 1, 6, 0, 0, tzinfo=timezone.utc)
        fires = get_fire_times_between("0 * * * *", start=start, end=end)
        assert len(fires) == 6

    def test_all_within_range(self):
        start = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
        end = datetime(2024, 1, 1, 3, 0, 0, tzinfo=timezone.utc)
        fires = get_fire_times_between("0 * * * *", start=start, end=end)
        for f in fires:
            assert start < f <= end

    def test_empty_when_no_fires_in_range(self):
        start = datetime(2024, 1, 1, 12, 0, 1, tzinfo=timezone.utc)
        end = datetime(2024, 1, 1, 12, 59, 0, tzinfo=timezone.utc)
        fires = get_fire_times_between("0 * * * *", start=start, end=end)
        assert fires == []

    def test_invalid_cron_raises(self):
        with pytest.raises(ValueError):
            get_fire_times_between(
                "bad cron",
                start=datetime(2024, 1, 1, tzinfo=timezone.utc),
                end=datetime(2024, 1, 2, tzinfo=timezone.utc),
            )


================================================
FILE: tests/test_utils/test_time.py
================================================
# tests/test_utils/test_time.py

import pytest
from datetime import timedelta
from sentinel.utils.time import (
    parse_duration_to_seconds,
    parse_duration_to_steps,
    steps_to_timedeltas,
    align_timestamp_to_granularity,
)


class TestParseDurationToSeconds:

    def test_seconds(self):
        assert parse_duration_to_seconds("30s") == 30

    def test_minutes(self):
        assert parse_duration_to_seconds("5m") == 300

    def test_hours(self):
        assert parse_duration_to_seconds("1h") == 3600

    def test_days(self):
        assert parse_duration_to_seconds("2d") == 172800

    def test_weeks(self):
        assert parse_duration_to_seconds("1w") == 604800

    def test_strips_whitespace(self):
        assert parse_duration_to_seconds("  5m  ") == 300

    def test_case_insensitive(self):
        assert parse_duration_to_seconds("5M") == 300

    def test_invalid_format_raises(self):
        with pytest.raises(ValueError):
            parse_duration_to_seconds("5minutes")

    def test_missing_unit_raises(self):
        with pytest.raises(ValueError):
            parse_duration_to_seconds("300")

    def test_missing_value_raises(self):
        with pytest.raises(ValueError):
            parse_duration_to_seconds("m")

    def test_empty_string_raises(self):
        with pytest.raises(ValueError):
            parse_duration_to_seconds("")


class TestParseDurationToSteps:

    def test_basic(self):
        assert parse_duration_to_steps("30m", "1m") == 30

    def test_hour_to_five_minutes(self):
        assert parse_duration_to_steps("1h", "5m") == 12

    def test_hour_to_thirty_seconds(self):
        assert parse_duration_to_steps("1h", "30s") == 120

    def test_same_duration_and_granularity(self):
        assert parse_duration_to_steps("5m", "5m") == 1

    def test_not_divisible_raises(self):
        with pytest.raises(ValueError):
            parse_duration_to_steps("7m", "5m")

    def test_granularity_larger_than_duration_raises(self):
        with pytest.raises(ValueError):
            parse_duration_to_steps("1m", "5m")

    def test_day_to_hours(self):
        assert parse_duration_to_steps("1d", "1h") == 24


class TestStepsToTimedeltas:

    def test_basic(self):
        result = steps_to_timedeltas(3, "5m")
        assert result == [
            timedelta(minutes=5),
            timedelta(minutes=10),
            timedelta(minutes=15),
        ]

    def test_with_offset(self):
        result = steps_to_timedeltas(2, "1h", start_offset_seconds=3600)
        assert result == [
            timedelta(hours=2),
            timedelta(hours=3),
        ]

    def test_single_step(self):
        result = steps_to_timedeltas(1, "1m")
        assert result == [timedelta(minutes=1)]

    def test_seconds_granularity(self):
        result = steps_to_timedeltas(3, "30s")
        assert result == [
            timedelta(seconds=30),
            timedelta(seconds=60),
            timedelta(seconds=90),
        ]


class TestAlignTimestampToGranularity:

    def test_already_aligned(self):
        ts = 1700000100.0  # already on 1m boundary
        assert align_timestamp_to_granularity(ts, "1m") == 1700000100.0

    def test_floors_to_minute(self):
        ts = 1700000123.0
        assert align_timestamp_to_granularity(ts, "1m") == 1700000100.0

    def test_floors_to_five_minutes(self):
        ts = 1700000423.0  # 123s past a 5m boundary
        result = align_timestamp_to_granularity(ts, "5m")
        assert result % 300 == 0
        assert result <= ts

    def test_floors_to_hour(self):
        ts = 1700003723.0  # some seconds into an hour
        result = align_timestamp_to_granularity(ts, "1h")
        assert result % 3600 == 0
        assert result <= ts

