geronimo.batch

Geronimo Batch Processing Module.

The batch module provides the framework for defining and scheduling recurring machine learning workflows, such as batch inference or automated training jobs.

Key components:

  • BatchPipeline: Defines a sequence of steps to execute for a batch job.
  • Schedule: Specifies when the pipeline should run (e.g., cron expressions).
  • Trigger: Defines events that can start a pipeline execution.

This module helps create robust, scheduled ML pipelines that can run on various execution environments.

 1"""Geronimo Batch Processing Module.
 2
 3The batch module provides the framework for defining and scheduling recurring
 4machine learning workflows, such as batch inference or automated training jobs.
 5
 6Key components:
 7- BatchPipeline: Defines a sequence of steps to execute for a batch job.
 8- Schedule: Specifies when the pipeline should run (e.g., cron expressions).
 9- Trigger: Defines events that can start a pipeline execution.
10
11This module helps create robust, scheduled ML pipelines that can run on various
12execution environments.
13"""
14
15from geronimo.batch.pipeline import BatchPipeline
16from geronimo.batch.schedule import Schedule, Trigger
17
18__all__ = ["BatchPipeline", "Schedule", "Trigger"]
19
20__docformat__ = "google"
class BatchPipeline(abc.ABC):
 12class BatchPipeline(ABC):
 13    """Base class for batch ML pipelines.
 14
 15    Provides a standardized interface for batch jobs with
 16    schedule/trigger support and integrated artifact management.
 17
 18    Example:
 19        ```python
 20        from geronimo.batch import BatchPipeline, Schedule
 21        from myproject.models import CreditRiskModel
 22
 23        class DailyScoringPipeline(BatchPipeline):
 24            model_class = CreditRiskModel
 25            schedule = Schedule.daily(hour=6)
 26
 27            def run(self):
 28                # Load data
 29                data = self.model.features.data_source.load()
 30
 31                # Transform features
 32                X = self.model.features.transform(data)
 33
 34                # Predict
 35                predictions = self.model.predict(X)
 36
 37                # Save results
 38                self.save_results(predictions)
 39
 40        # Execute
 41        pipeline = DailyScoringPipeline()
 42        pipeline.initialize()
 43        pipeline.execute()
 44        ```
 45    """
 46
 47    # Override in subclass
 48    model_class: type["Model"] = None
 49    schedule: Optional["Schedule"] = None
 50    trigger: Optional["Trigger"] = None
 51    artifact_project: Optional[str] = None
 52    artifact_version: Optional[str] = None
 53
 54    model: Optional["Model"]
 55    """The loaded model instance."""
 56
 57    _is_initialized: bool
 58    """Internal flag tracking initialization status."""
 59
 60    def __init__(self):
 61        """Initialize pipeline."""
 62        self.model: Optional["Model"] = None
 63        self._is_initialized: bool = False
 64
 65    def initialize(
 66        self,
 67        project: Optional[str] = None,
 68        version: Optional[str] = None,
 69    ) -> None:
 70        """Initialize pipeline by loading model artifacts.
 71
 72        Args:
 73            project: Artifact project name.
 74            version: Artifact version.
 75        """
 76        from geronimo.artifacts import ArtifactStore
 77
 78        project = project or self.artifact_project or self.model_class.name
 79        version = version or self.artifact_version or self.model_class.version
 80
 81        store = ArtifactStore.load(project=project, version=version)
 82
 83        self.model = self.model_class()
 84        self.model.load(store)
 85        self._is_initialized = True
 86
 87    def execute(self) -> Any:
 88        """Execute the pipeline.
 89
 90        Returns:
 91            Pipeline result.
 92        """
 93        if not self._is_initialized:
 94            raise RuntimeError("Pipeline not initialized. Call initialize() first.")
 95
 96        return self.run()
 97
 98    @abstractmethod
 99    def run(self) -> Any:
100        """Main pipeline logic.
101
102        Override this method with your batch processing logic.
103
104        Returns:
105            Pipeline result.
106        """
107        pass
108
109    def save_results(
110        self,
111        results: Any,
112        output_path: Optional[str] = None,
113        use_artifact_store: bool = False,
114        artifact_name: Optional[str] = None,
115    ) -> str:
116        """Save pipeline results.
117
118        Args:
119            results: Results to save (DataFrame, dict, etc.).
120            output_path: Optional output path (for file-based storage).
121            use_artifact_store: If True, save via ArtifactStore for consistency
122                               with other artifacts. Default False for backward
123                               compatibility.
124            artifact_name: Name for artifact when using artifact store.
125                          Defaults to pipeline class name.
126
127        Returns:
128            Path or URI where results were saved.
129        """
130        import pandas as pd
131        from datetime import datetime
132        from pathlib import Path
133
134        # Use ArtifactStore if requested
135        if use_artifact_store:
136            from geronimo.artifacts import ArtifactStore
137            
138            project = self.artifact_project or (
139                self.model_class.name if self.model_class else self.__class__.__name__
140            )
141            version = self.artifact_version or (
142                self.model_class.version if self.model_class else "1.0.0"
143            )
144            
145            store = ArtifactStore(project=project, version=version)
146            name = artifact_name or f"{self.__class__.__name__}_results"
147            return store.save(name, results, artifact_type="pipeline_result")
148
149        # Default file-based storage (backward compatible)
150        if output_path is None:
151            output_path = f"output/{self.__class__.__name__}_{datetime.now().isoformat()}.parquet"
152
153        path = Path(output_path)
154        path.parent.mkdir(parents=True, exist_ok=True)
155
156        if isinstance(results, pd.DataFrame):
157            results.to_parquet(path)
158        else:
159            import json
160
161            path = path.with_suffix(".json")
162            path.write_text(json.dumps(results, default=str))
163
164        return str(path)
165
166    @property
167    def is_initialized(self) -> bool:
168        """Check if pipeline is initialized."""
169        return self._is_initialized
170
171    def __repr__(self) -> str:
172        status = "initialized" if self._is_initialized else "not initialized"
173        model_name = self.model_class.__name__ if self.model_class else "None"
174        schedule_str = str(self.schedule) if self.schedule else "manual"
175        return f"{self.__class__.__name__}(model={model_name}, schedule={schedule_str}, {status})"

Base class for batch ML pipelines.

Provides a standardized interface for batch jobs with schedule/trigger support and integrated artifact management.

Example:
from geronimo.batch import BatchPipeline, Schedule
from myproject.models import CreditRiskModel

class DailyScoringPipeline(BatchPipeline):
    model_class = CreditRiskModel
    schedule = Schedule.daily(hour=6)

    def run(self):
        # Load data
        data = self.model.features.data_source.load()

        # Transform features
        X = self.model.features.transform(data)

        # Predict
        predictions = self.model.predict(X)

        # Save results
        self.save_results(predictions)

# Execute
pipeline = DailyScoringPipeline()
pipeline.initialize()
pipeline.execute()
BatchPipeline()
60    def __init__(self):
61        """Initialize pipeline."""
62        self.model: Optional["Model"] = None
63        self._is_initialized: bool = False

Initialize pipeline.

model_class: type[geronimo.models.Model] = None
schedule: Optional[Schedule] = None
trigger: Optional[Trigger] = None
artifact_project: Optional[str] = None
artifact_version: Optional[str] = None
model: Optional[geronimo.models.Model]

The loaded model instance.

def initialize( self, project: Optional[str] = None, version: Optional[str] = None) -> None:
65    def initialize(
66        self,
67        project: Optional[str] = None,
68        version: Optional[str] = None,
69    ) -> None:
70        """Initialize pipeline by loading model artifacts.
71
72        Args:
73            project: Artifact project name.
74            version: Artifact version.
75        """
76        from geronimo.artifacts import ArtifactStore
77
78        project = project or self.artifact_project or self.model_class.name
79        version = version or self.artifact_version or self.model_class.version
80
81        store = ArtifactStore.load(project=project, version=version)
82
83        self.model = self.model_class()
84        self.model.load(store)
85        self._is_initialized = True

Initialize pipeline by loading model artifacts.

Arguments:
  • project: Artifact project name.
  • version: Artifact version.
def execute(self) -> Any:
87    def execute(self) -> Any:
88        """Execute the pipeline.
89
90        Returns:
91            Pipeline result.
92        """
93        if not self._is_initialized:
94            raise RuntimeError("Pipeline not initialized. Call initialize() first.")
95
96        return self.run()

Execute the pipeline.

Returns:

Pipeline result.

@abstractmethod
def run(self) -> Any:
 98    @abstractmethod
 99    def run(self) -> Any:
100        """Main pipeline logic.
101
102        Override this method with your batch processing logic.
103
104        Returns:
105            Pipeline result.
106        """
107        pass

Main pipeline logic.

Override this method with your batch processing logic.

Returns:

Pipeline result.

def save_results( self, results: Any, output_path: Optional[str] = None, use_artifact_store: bool = False, artifact_name: Optional[str] = None) -> str:
109    def save_results(
110        self,
111        results: Any,
112        output_path: Optional[str] = None,
113        use_artifact_store: bool = False,
114        artifact_name: Optional[str] = None,
115    ) -> str:
116        """Save pipeline results.
117
118        Args:
119            results: Results to save (DataFrame, dict, etc.).
120            output_path: Optional output path (for file-based storage).
121            use_artifact_store: If True, save via ArtifactStore for consistency
122                               with other artifacts. Default False for backward
123                               compatibility.
124            artifact_name: Name for artifact when using artifact store.
125                          Defaults to pipeline class name.
126
127        Returns:
128            Path or URI where results were saved.
129        """
130        import pandas as pd
131        from datetime import datetime
132        from pathlib import Path
133
134        # Use ArtifactStore if requested
135        if use_artifact_store:
136            from geronimo.artifacts import ArtifactStore
137            
138            project = self.artifact_project or (
139                self.model_class.name if self.model_class else self.__class__.__name__
140            )
141            version = self.artifact_version or (
142                self.model_class.version if self.model_class else "1.0.0"
143            )
144            
145            store = ArtifactStore(project=project, version=version)
146            name = artifact_name or f"{self.__class__.__name__}_results"
147            return store.save(name, results, artifact_type="pipeline_result")
148
149        # Default file-based storage (backward compatible)
150        if output_path is None:
151            output_path = f"output/{self.__class__.__name__}_{datetime.now().isoformat()}.parquet"
152
153        path = Path(output_path)
154        path.parent.mkdir(parents=True, exist_ok=True)
155
156        if isinstance(results, pd.DataFrame):
157            results.to_parquet(path)
158        else:
159            import json
160
161            path = path.with_suffix(".json")
162            path.write_text(json.dumps(results, default=str))
163
164        return str(path)

Save pipeline results.

Arguments:
  • results: Results to save (DataFrame, dict, etc.).
  • output_path: Optional output path (for file-based storage).
  • use_artifact_store: If True, save via ArtifactStore for consistency with other artifacts. Default False for backward compatibility.
  • artifact_name: Name for artifact when using artifact store. Defaults to pipeline class name.
Returns:

Path or URI where results were saved.

is_initialized: bool
166    @property
167    def is_initialized(self) -> bool:
168        """Check if pipeline is initialized."""
169        return self._is_initialized

Check if pipeline is initialized.

class Schedule:
 8class Schedule:
 9    """Cron-based schedule for batch pipelines.
10
11    Example:
12        ```python
13        from geronimo.batch import Schedule
14
15        # Daily at 6 AM
16        daily = Schedule.cron("0 6 * * *")
17
18        # Every hour
19        hourly = Schedule.cron("0 * * * *")
20
21        # Weekly on Sunday
22        weekly = Schedule.weekly(day=0, hour=0)
23        ```
24    """
25
26
27
28    cron_expression: str
29    """The cron expression defining the schedule."""
30
31    description: Optional[str]
32    """Human-readable description of the schedule."""
33
34    def __init__(self, cron_expression: str, description: Optional[str] = None):
35        """Initialize schedule.
36
37        Args:
38            cron_expression: Standard cron expression (min hour day month weekday).
39            description: Optional human-readable description.
40        """
41        self.cron_expression = cron_expression
42        self.description = description
43
44    @classmethod
45    def cron(cls, expression: str) -> "Schedule":
46        """Create from cron expression.
47
48        Args:
49            expression: Cron expression.
50
51        Returns:
52            Schedule instance.
53        """
54        return cls(cron_expression=expression)
55
56    @classmethod
57    def daily(cls, hour: int = 0, minute: int = 0) -> "Schedule":
58        """Create daily schedule.
59
60        Args:
61            hour: Hour (0-23).
62            minute: Minute (0-59).
63
64        Returns:
65            Schedule instance.
66        """
67        return cls(
68            cron_expression=f"{minute} {hour} * * *",
69            description=f"Daily at {hour:02d}:{minute:02d}",
70        )
71
72    @classmethod
73    def weekly(cls, day: int = 0, hour: int = 0) -> "Schedule":
74        """Create weekly schedule.
75
76        Args:
77            day: Day of week (0=Sunday, 6=Saturday).
78            hour: Hour (0-23).
79
80        Returns:
81            Schedule instance.
82        """
83        days = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
84        return cls(
85            cron_expression=f"0 {hour} * * {day}",
86            description=f"Weekly on {days[day]} at {hour:02d}:00",
87        )
88
89    def __repr__(self) -> str:
90        return f"Schedule({self.cron_expression})"

Cron-based schedule for batch pipelines.

Example:
from geronimo.batch import Schedule

# Daily at 6 AM
daily = Schedule.cron("0 6 * * *")

# Every hour
hourly = Schedule.cron("0 * * * *")

# Weekly on Sunday
weekly = Schedule.weekly(day=0, hour=0)
Schedule(cron_expression: str, description: Optional[str] = None)
34    def __init__(self, cron_expression: str, description: Optional[str] = None):
35        """Initialize schedule.
36
37        Args:
38            cron_expression: Standard cron expression (min hour day month weekday).
39            description: Optional human-readable description.
40        """
41        self.cron_expression = cron_expression
42        self.description = description

Initialize schedule.

Arguments:
  • cron_expression: Standard cron expression (min hour day month weekday).
  • description: Optional human-readable description.
cron_expression: str

The cron expression defining the schedule.

description: Optional[str]

Human-readable description of the schedule.

@classmethod
def cron(cls, expression: str) -> Schedule:
44    @classmethod
45    def cron(cls, expression: str) -> "Schedule":
46        """Create from cron expression.
47
48        Args:
49            expression: Cron expression.
50
51        Returns:
52            Schedule instance.
53        """
54        return cls(cron_expression=expression)

Create from cron expression.

Arguments:
  • expression: Cron expression.
Returns:

Schedule instance.

@classmethod
def daily(cls, hour: int = 0, minute: int = 0) -> Schedule:
56    @classmethod
57    def daily(cls, hour: int = 0, minute: int = 0) -> "Schedule":
58        """Create daily schedule.
59
60        Args:
61            hour: Hour (0-23).
62            minute: Minute (0-59).
63
64        Returns:
65            Schedule instance.
66        """
67        return cls(
68            cron_expression=f"{minute} {hour} * * *",
69            description=f"Daily at {hour:02d}:{minute:02d}",
70        )

Create daily schedule.

Arguments:
  • hour: Hour (0-23).
  • minute: Minute (0-59).
Returns:

Schedule instance.

@classmethod
def weekly(cls, day: int = 0, hour: int = 0) -> Schedule:
72    @classmethod
73    def weekly(cls, day: int = 0, hour: int = 0) -> "Schedule":
74        """Create weekly schedule.
75
76        Args:
77            day: Day of week (0=Sunday, 6=Saturday).
78            hour: Hour (0-23).
79
80        Returns:
81            Schedule instance.
82        """
83        days = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
84        return cls(
85            cron_expression=f"0 {hour} * * {day}",
86            description=f"Weekly on {days[day]} at {hour:02d}:00",
87        )

Create weekly schedule.

Arguments:
  • day: Day of week (0=Sunday, 6=Saturday).
  • hour: Hour (0-23).
Returns:

Schedule instance.

class Trigger:
101class Trigger:
102    """Event-based trigger for batch pipelines.
103
104    Example:
105        ```python
106        from geronimo.batch import Trigger
107
108        # Trigger on S3 upload
109        s3_trigger = Trigger.s3_upload(bucket="data-bucket", prefix="input/")
110
111        # Manual trigger only
112        manual = Trigger.manual()
113        ```
114    """
115
116
117
118    trigger_type: TriggerType
119    """The type of event that triggers execution."""
120
121    config: dict
122    """Configuration specific to the trigger type."""
123
124    def __init__(
125        self,
126        trigger_type: TriggerType,
127        config: Optional[dict] = None,
128    ):
129        """Initialize trigger.
130
131        Args:
132            trigger_type: Type of trigger.
133            config: Trigger-specific configuration.
134        """
135        self.trigger_type = trigger_type
136        self.config = config or {}
137
138    @classmethod
139    def s3_upload(cls, bucket: str, prefix: Optional[str] = None) -> "Trigger":
140        """Trigger on S3 object upload.
141
142        Args:
143            bucket: S3 bucket name.
144            prefix: Optional key prefix filter.
145
146        Returns:
147            Trigger instance.
148        """
149        return cls(
150            trigger_type=TriggerType.S3_UPLOAD,
151            config={"bucket": bucket, "prefix": prefix},
152        )
153
154    @classmethod
155    def sns_message(cls, topic_arn: str) -> "Trigger":
156        """Trigger on SNS message.
157
158        Args:
159            topic_arn: SNS topic ARN.
160
161        Returns:
162            Trigger instance.
163        """
164        return cls(
165            trigger_type=TriggerType.SNS_MESSAGE,
166            config={"topic_arn": topic_arn},
167        )
168
169    @classmethod
170    def manual(cls) -> "Trigger":
171        """Manual trigger only.
172
173        Returns:
174            Trigger instance.
175        """
176        return cls(trigger_type=TriggerType.MANUAL)
177
178    def __repr__(self) -> str:
179        return f"Trigger({self.trigger_type.value})"

Event-based trigger for batch pipelines.

Example:
from geronimo.batch import Trigger

# Trigger on S3 upload
s3_trigger = Trigger.s3_upload(bucket="data-bucket", prefix="input/")

# Manual trigger only
manual = Trigger.manual()
Trigger( trigger_type: geronimo.batch.schedule.TriggerType, config: Optional[dict] = None)
124    def __init__(
125        self,
126        trigger_type: TriggerType,
127        config: Optional[dict] = None,
128    ):
129        """Initialize trigger.
130
131        Args:
132            trigger_type: Type of trigger.
133            config: Trigger-specific configuration.
134        """
135        self.trigger_type = trigger_type
136        self.config = config or {}

Initialize trigger.

Arguments:
  • trigger_type: Type of trigger.
  • config: Trigger-specific configuration.
trigger_type: geronimo.batch.schedule.TriggerType

The type of event that triggers execution.

config: dict

Configuration specific to the trigger type.

@classmethod
def s3_upload( cls, bucket: str, prefix: Optional[str] = None) -> Trigger:
138    @classmethod
139    def s3_upload(cls, bucket: str, prefix: Optional[str] = None) -> "Trigger":
140        """Trigger on S3 object upload.
141
142        Args:
143            bucket: S3 bucket name.
144            prefix: Optional key prefix filter.
145
146        Returns:
147            Trigger instance.
148        """
149        return cls(
150            trigger_type=TriggerType.S3_UPLOAD,
151            config={"bucket": bucket, "prefix": prefix},
152        )

Trigger on S3 object upload.

Arguments:
  • bucket: S3 bucket name.
  • prefix: Optional key prefix filter.
Returns:

Trigger instance.

@classmethod
def sns_message(cls, topic_arn: str) -> Trigger:
154    @classmethod
155    def sns_message(cls, topic_arn: str) -> "Trigger":
156        """Trigger on SNS message.
157
158        Args:
159            topic_arn: SNS topic ARN.
160
161        Returns:
162            Trigger instance.
163        """
164        return cls(
165            trigger_type=TriggerType.SNS_MESSAGE,
166            config={"topic_arn": topic_arn},
167        )

Trigger on SNS message.

Arguments:
  • topic_arn: SNS topic ARN.
Returns:

Trigger instance.

@classmethod
def manual(cls) -> Trigger:
169    @classmethod
170    def manual(cls) -> "Trigger":
171        """Manual trigger only.
172
173        Returns:
174            Trigger instance.
175        """
176        return cls(trigger_type=TriggerType.MANUAL)

Manual trigger only.

Returns:

Trigger instance.