Source code for scitex_ml.classification.timeseries._TimeSeriesCalendarSplit

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Timestamp: "2025-09-22 17:15:00 (ywatanabe)"
# File: _TimeSeriesCalendarSplit.py

import scitex_io

__FILE__ = "_TimeSeriesCalendarSplit.py"

"""
Functionalities:
  - Implements calendar-based time series cross-validation
  - Splits data based on calendar intervals (monthly, weekly, daily)
  - Ensures temporal order preservation with no data leakage
  - Supports flexible interval definitions (D, W, M, Q, Y)
  - Provides visualization with scatter plots showing actual data points
  - Useful for financial data, sales forecasting, seasonal patterns

Dependencies:
  - packages:
    - numpy
    - pandas
    - sklearn
    - matplotlib
    - scitex

IO:
  - input-files:
    - None (generates synthetic calendar-based data for demonstration)
  - output-files:
    - ./calendar_splits_demo.png (visualization with scatter plots)
"""

"""Imports"""
import argparse
import os
import sys
from typing import Iterator, Literal, Optional, Tuple, Union

import matplotlib.patches as patches
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import BaseCrossValidator
from sklearn.utils.validation import _num_samples

import scitex_logging as logging

# Import timestamp normalizer (internally uses to_datetime helper)
from ._normalize_timestamp import normalize_timestamp, to_datetime

logger = logging.getLogger(__name__)


[docs] class TimeSeriesCalendarSplit(BaseCrossValidator): """ Calendar-based time series cross-validation splitter. Splits data based on calendar intervals (e.g., months, weeks, days). Ensures temporal order is preserved and no data leakage occurs. Parameters ---------- interval : str Time interval for splitting. Options: - 'D': Daily - 'W': Weekly - 'M': Monthly - 'Q': Quarterly - 'Y': Yearly Or any pandas frequency string n_train_intervals : int Number of intervals to use for training n_test_intervals : int Number of intervals to use for testing (default: 1) gap_intervals : int Number of intervals to skip between train and test (default: 0) step_intervals : int Number of intervals to step forward for next fold (default: 1) Examples -------- >>> from scitex_ml.classification import TimeSeriesCalendarSplit >>> import pandas as pd >>> import numpy as np >>> >>> # Create sample data with daily timestamps >>> dates = pd.date_range('2023-01-01', '2023-12-31', freq='D') >>> X = np.random.randn(len(dates), 10) >>> y = np.random.randint(0, 2, len(dates)) >>> >>> # Monthly splits: 6 months train, 1 month test >>> tscal = TimeSeriesCalendarSplit(interval='M', n_train_intervals=6) >>> for train_idx, test_idx in tscal.split(X, y, timestamps=dates): ... print(f"Train: {dates[train_idx[0]]:%Y-%m} to {dates[train_idx[-1]]:%Y-%m}") ... print(f"Test: {dates[test_idx[0]]:%Y-%m} to {dates[test_idx[-1]]:%Y-%m}") """
[docs] def __init__( self, interval: str = "M", n_train_intervals: int = 12, n_test_intervals: int = 1, n_val_intervals: int = 0, gap_intervals: int = 0, step_intervals: int = 1, random_state: Optional[int] = None, ): self.interval = interval self.n_train_intervals = n_train_intervals self.n_test_intervals = n_test_intervals self.n_val_intervals = n_val_intervals self.gap_intervals = gap_intervals self.step_intervals = step_intervals self.random_state = random_state self.rng = np.random.default_rng(random_state)
[docs] def split( self, X: np.ndarray, y: Optional[np.ndarray] = None, timestamps: Optional[Union[np.ndarray, pd.DatetimeIndex]] = None, groups: Optional[np.ndarray] = None, ) -> Iterator[Tuple[np.ndarray, np.ndarray]]: """ Generate calendar-based train/test splits. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data y : array-like, shape (n_samples,), optional Target variable timestamps : array-like or pd.DatetimeIndex, shape (n_samples,) Timestamps for each sample (required) groups : array-like, shape (n_samples,), optional Group labels (not used in this splitter) Yields ------ train : ndarray Training set indices test : ndarray Test set indices """ if timestamps is None: raise ValueError("timestamps must be provided for calendar-based splitting") n_samples = _num_samples(X) indices = np.arange(n_samples) # Convert timestamps to pandas datetime if needed if not isinstance(timestamps, pd.DatetimeIndex): # Use normalizer to handle various formats # Convert each timestamp to datetime then to pandas DatetimeIndex datetime_list = [] for ts in timestamps: dt = to_datetime(ts) # Remove timezone info for pandas compatibility if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) datetime_list.append(dt) timestamps = pd.DatetimeIndex(datetime_list) # Create DataFrame for easier manipulation df = pd.DataFrame({"index": indices, "timestamp": timestamps}) # Sort by timestamp df = df.sort_values("timestamp") # Group by the specified interval df["interval"] = df["timestamp"].dt.to_period(self.interval) unique_intervals = df["interval"].unique() # Calculate total intervals needed per fold intervals_per_fold = ( self.n_train_intervals + self.gap_intervals + self.n_test_intervals ) # Generate splits n_intervals = len(unique_intervals) start_idx = 0 while start_idx + intervals_per_fold <= n_intervals: # Define train intervals train_end = start_idx + self.n_train_intervals train_intervals = unique_intervals[start_idx:train_end] # Define test intervals (after gap) test_start = train_end + self.gap_intervals test_end = test_start + self.n_test_intervals if test_end > n_intervals: break test_intervals = unique_intervals[test_start:test_end] # Get indices for train and test train_mask = df["interval"].isin(train_intervals) test_mask = df["interval"].isin(test_intervals) train_indices = df.loc[train_mask, "index"].values test_indices = df.loc[test_mask, "index"].values yield train_indices, test_indices # Move to next fold start_idx += self.step_intervals
[docs] def split_with_val( self, X: np.ndarray, y: Optional[np.ndarray] = None, timestamps: Optional[Union[np.ndarray, pd.DatetimeIndex]] = None, groups: Optional[np.ndarray] = None, ) -> Iterator[Tuple[np.ndarray, np.ndarray, np.ndarray]]: """ Generate calendar-based train/validation/test splits. The validation set comes after training but before test, maintaining temporal order: train < val < test. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data y : array-like, shape (n_samples,), optional Target variable timestamps : array-like or pd.DatetimeIndex, shape (n_samples,) Timestamps for each sample (required) groups : array-like, shape (n_samples,), optional Group labels (not used in this splitter) Yields ------ train : ndarray Training set indices val : ndarray Validation set indices test : ndarray Test set indices """ if timestamps is None: raise ValueError("timestamps must be provided for calendar-based splitting") n_samples = _num_samples(X) indices = np.arange(n_samples) # Convert timestamps to pandas datetime if needed if not isinstance(timestamps, pd.DatetimeIndex): # Use normalizer to handle various formats datetime_list = [] for ts in timestamps: dt = to_datetime(ts) # Remove timezone info for pandas compatibility if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) datetime_list.append(dt) timestamps = pd.DatetimeIndex(datetime_list) # Create DataFrame for easier manipulation df = pd.DataFrame({"index": indices, "timestamp": timestamps}) # Sort by timestamp df = df.sort_values("timestamp") # Group by the specified interval df["interval"] = df["timestamp"].dt.to_period(self.interval) unique_intervals = df["interval"].unique() # Calculate total intervals needed per fold including validation intervals_per_fold = ( self.n_train_intervals + self.n_val_intervals + self.gap_intervals + self.n_test_intervals ) # Generate splits n_intervals = len(unique_intervals) start_idx = 0 while start_idx + intervals_per_fold <= n_intervals: # Define train intervals train_end = start_idx + self.n_train_intervals train_intervals = unique_intervals[start_idx:train_end] # Define validation intervals (after train) val_start = train_end val_end = val_start + self.n_val_intervals val_intervals = ( unique_intervals[val_start:val_end] if self.n_val_intervals > 0 else [] ) # Define test intervals (after validation and gap) test_start = ( val_end + self.gap_intervals if self.n_val_intervals > 0 else train_end + self.gap_intervals ) test_end = test_start + self.n_test_intervals if test_end > n_intervals: break test_intervals = unique_intervals[test_start:test_end] # Get indices for train, validation, and test train_mask = df["interval"].isin(train_intervals) val_mask = ( df["interval"].isin(val_intervals) if len(val_intervals) > 0 else pd.Series([False] * len(df)) ) test_mask = df["interval"].isin(test_intervals) train_indices = df.loc[train_mask, "index"].values val_indices = ( df.loc[val_mask, "index"].values if self.n_val_intervals > 0 else np.array([]) ) test_indices = df.loc[test_mask, "index"].values yield train_indices, val_indices, test_indices # Move to next fold start_idx += self.step_intervals
[docs] def get_n_splits(self, X=None, y=None, timestamps=None): """ Calculate number of splits. Parameters ---------- X : array-like, optional Not used directly y : array-like, optional Not used timestamps : array-like or pd.DatetimeIndex, optional Timestamps to determine number of possible splits Returns ------- n_splits : int Number of splits. Returns -1 if timestamps is None. """ if timestamps is None: return -1 # Can't determine without timestamps # Convert timestamps to pandas datetime if needed if not isinstance(timestamps, pd.DatetimeIndex): # Use normalizer to handle various formats # Convert each timestamp to datetime then to pandas DatetimeIndex datetime_list = [] for ts in timestamps: dt = to_datetime(ts) # Remove timezone info for pandas compatibility if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) datetime_list.append(dt) timestamps = pd.DatetimeIndex(datetime_list) # Count unique intervals intervals = timestamps.to_period(self.interval).unique() n_intervals = len(intervals) # Calculate how many complete folds we can create intervals_per_fold = ( self.n_train_intervals + self.gap_intervals + self.n_test_intervals ) if n_intervals < intervals_per_fold: return 0 # Calculate number of possible splits with stepping n_splits = (n_intervals - intervals_per_fold) // self.step_intervals + 1 return max(0, n_splits)
[docs] def plot_splits(self, X, y=None, timestamps=None, figsize=(12, 6), save_path=None): """ Visualize the train/test splits as timeline rectangles with scatter plots. Parameters ---------- X : array-like Training data (used to determine data size) y : array-like, optional Target variable (used for color-coding scatter points) timestamps : array-like or pd.DatetimeIndex Timestamps for each sample figsize : tuple, default (12, 6) Figure size (width, height) save_path : str, optional Path to save the plot Returns ------- fig : matplotlib.figure.Figure The created figure Examples -------- >>> splitter = TimeSeriesCalendarSplit(interval='M', n_train_intervals=6) >>> fig = splitter.plot_splits(X, timestamps=dates) >>> fig.savefig('calendar_splits.png') """ # matplotlib is always available in SciTeX if timestamps is None: raise ValueError( "timestamps must be provided for calendar split visualization" ) # Get all splits splits = list(self.split(X, y, timestamps)) if not splits: raise ValueError( "No splits generated. Check data size and splitter parameters." ) # Convert timestamps for plotting if not isinstance(timestamps, pd.DatetimeIndex): datetime_list = [] for ts in timestamps: dt = to_datetime(ts) if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) datetime_list.append(dt) timestamps = pd.DatetimeIndex(datetime_list) # Create figure fig, ax = plt.subplots(figsize=figsize) # Jitter strength for scatter plots jitter_strength = 0.15 # Plot each fold for fold, (train_idx, test_idx) in enumerate(splits): y_pos = fold # Train period rectangle train_start = timestamps[train_idx[0]] train_end = timestamps[train_idx[-1]] train_width = ( train_end - train_start ).total_seconds() / 86400 # Convert to days train_rect = patches.Rectangle( (train_start, y_pos - 0.3), pd.Timedelta(days=train_width), 0.6, linewidth=1, edgecolor="blue", facecolor="lightblue", alpha=0.3, label="Train Set (range)" if fold == 0 else "", ) ax.add_patch(train_rect) # Add scatter plot for training data points train_times = timestamps[train_idx] train_jitter = np.random.normal(0, jitter_strength, len(train_idx)) # Color by class if y is provided if y is not None: train_colors = [ "darkblue" if yi == 0 else "navy" for yi in y[train_idx] ] else: train_colors = "darkblue" ax.scatter( train_times, y_pos + train_jitter, c=train_colors, s=20, alpha=0.6, marker="o", label="Train data points" if fold == 0 else "", zorder=10, ) # Test period rectangle test_start = timestamps[test_idx[0]] test_end = timestamps[test_idx[-1]] test_width = (test_end - test_start).total_seconds() / 86400 test_rect = patches.Rectangle( (test_start, y_pos - 0.3), pd.Timedelta(days=test_width), 0.6, linewidth=1, edgecolor="red", facecolor="lightcoral", alpha=0.3, label="Test Set (range)" if fold == 0 else "", ) ax.add_patch(test_rect) # Add scatter plot for test data points test_times = timestamps[test_idx] test_jitter = np.random.normal(0, jitter_strength, len(test_idx)) # Color by class if y is provided if y is not None: test_colors = [ "darkred" if yi == 0 else "firebrick" for yi in y[test_idx] ] else: test_colors = "darkred" ax.scatter( test_times, y_pos + test_jitter, c=test_colors, s=20, alpha=0.6, marker="^", label="Test data points" if fold == 0 else "", zorder=10, ) # Format plot ax.set_ylim(-0.5, len(splits) - 0.5) ax.set_xlim(timestamps.min(), timestamps.max()) ax.set_xlabel("Time") ax.set_ylabel("Fold") ax.set_title( f"Time Series Calendar Split Visualization\\n" f"Interval: {self.interval}, Train: {self.n_train_intervals}, " f"Test: {self.n_test_intervals}" ) # Set y-ticks ax.set_yticks(range(len(splits))) ax.set_yticklabels([f"Fold {i}" for i in range(len(splits))]) # Add legend ax.legend(loc="upper right") # Format x-axis ax.tick_params(axis="x", rotation=45) plt.tight_layout() if save_path: fig.savefig(save_path, dpi=150, bbox_inches="tight") return fig
"""Functions & Classes""" def main(args) -> int: """Demonstrate TimeSeriesCalendarSplit functionality. Args: args: Command line arguments Returns: int: Exit status """ logger.info("Demonstrating TimeSeriesCalendarSplit functionality") # Generate test data with calendar-based timestamps np.random.seed(42) n_samples = args.n_samples # Create daily timestamps over several months start_date = pd.Timestamp(args.start_date) timestamps = pd.date_range(start=start_date, periods=n_samples, freq=args.data_freq) # Generate features and target X = np.random.randn(n_samples, 5) y = np.random.randint(0, 2, n_samples) logger.info(f"Generated test data: {n_samples} samples") logger.info( f"Date range: {timestamps[0].strftime('%Y-%m-%d')} to {timestamps[-1].strftime('%Y-%m-%d')}" ) logger.info(f"Data frequency: {args.data_freq}") # Create calendar splitter splitter = TimeSeriesCalendarSplit( interval=args.interval, n_train_intervals=args.n_train_intervals, n_test_intervals=args.n_test_intervals, gap_intervals=args.gap_intervals, step_intervals=args.step_intervals, ) logger.info(f"Calendar split configuration:") logger.info(f" Interval: {args.interval}") logger.info(f" Train intervals: {args.n_train_intervals}") logger.info(f" Test intervals: {args.n_test_intervals}") logger.info(f" Gap intervals: {args.gap_intervals}") logger.info(f" Step intervals: {args.step_intervals}") # Test splits splits = [] for fold, (train_idx, test_idx) in enumerate( splitter.split(X, y, timestamps=timestamps) ): if fold >= args.max_folds: break splits.append((train_idx, test_idx)) train_start = timestamps[train_idx[0]].strftime("%Y-%m-%d") train_end = timestamps[train_idx[-1]].strftime("%Y-%m-%d") test_start = timestamps[test_idx[0]].strftime("%Y-%m-%d") test_end = timestamps[test_idx[-1]].strftime("%Y-%m-%d") logger.info(f"Fold {fold}:") logger.info(f" Train: {train_start} to {train_end} ({len(train_idx)} samples)") logger.info(f" Test: {test_start} to {test_end} ({len(test_idx)} samples)") # Verify temporal order train_times = timestamps[train_idx] test_times = timestamps[test_idx] temporal_ok = train_times.max() < test_times.min() status = "✓" if temporal_ok else "✗" logger.info(f" Temporal order: {status}") # Generate visualization logger.info("Generating calendar split visualization") fig = splitter.plot_splits(X, y, timestamps) # Save using SciTeX framework scitex_io.save(fig, "./calendar_splits_demo.png", symlink_from_cwd=True) plt.close(fig) logger.info("TimeSeriesCalendarSplit demonstration completed successfully") return 0 def parse_args() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Demonstrate TimeSeriesCalendarSplit with calendar-based intervals" ) parser.add_argument( "--n-samples", type=int, default=365, help="Number of samples to generate (default: %(default)s)", ) parser.add_argument( "--start-date", type=str, default="2023-01-01", help="Start date for time series (default: %(default)s)", ) parser.add_argument( "--data-freq", type=str, default="D", help="Frequency of data points (D=daily, H=hourly) (default: %(default)s)", ) parser.add_argument( "--interval", type=str, default="M", help="Calendar interval (D=daily, W=weekly, M=monthly) (default: %(default)s)", ) parser.add_argument( "--n-train-intervals", type=int, default=6, help="Number of intervals for training (default: %(default)s)", ) parser.add_argument( "--n-test-intervals", type=int, default=1, help="Number of intervals for testing (default: %(default)s)", ) parser.add_argument( "--gap-intervals", type=int, default=0, help="Gap intervals between train and test (default: %(default)s)", ) parser.add_argument( "--step-intervals", type=int, default=1, help="Step intervals between folds (default: %(default)s)", ) parser.add_argument( "--max-folds", type=int, default=3, help="Maximum number of folds to demonstrate (default: %(default)s)", ) args = parser.parse_args() return args def run_main() -> None: """Initialize scitex framework, run main function, and cleanup.""" global CONFIG, CC, sys, plt, rng import sys import matplotlib.pyplot as plt import scitex as stx args = parse_args() CONFIG, sys.stdout, sys.stderr, plt, CC, rng = stx.session.start( sys, plt, args=args, file=__FILE__, sdir_suffix=None, verbose=False, agg=True, ) exit_status = main(args) stx.session.close( CONFIG, verbose=False, notify=False, message="", exit_status=exit_status, ) if __name__ == "__main__": run_main() # EOF