geronimo.batch
Geronimo Batch Processing Module.
The batch module provides the framework for defining and scheduling recurring machine learning workflows, such as batch inference or automated training jobs.
Key components:
- BatchPipeline: Defines a sequence of steps to execute for a batch job.
- Schedule: Specifies when the pipeline should run (e.g., cron expressions).
- Trigger: Defines events that can start a pipeline execution.
This module helps create robust, scheduled ML pipelines that can run on various execution environments.
1"""Geronimo Batch Processing Module. 2 3The batch module provides the framework for defining and scheduling recurring 4machine learning workflows, such as batch inference or automated training jobs. 5 6Key components: 7- BatchPipeline: Defines a sequence of steps to execute for a batch job. 8- Schedule: Specifies when the pipeline should run (e.g., cron expressions). 9- Trigger: Defines events that can start a pipeline execution. 10 11This module helps create robust, scheduled ML pipelines that can run on various 12execution environments. 13""" 14 15from geronimo.batch.pipeline import BatchPipeline 16from geronimo.batch.schedule import Schedule, Trigger 17 18__all__ = ["BatchPipeline", "Schedule", "Trigger"] 19 20__docformat__ = "google"
12class BatchPipeline(ABC): 13 """Base class for batch ML pipelines. 14 15 Provides a standardized interface for batch jobs with 16 schedule/trigger support and integrated artifact management. 17 18 Example: 19 ```python 20 from geronimo.batch import BatchPipeline, Schedule 21 from myproject.models import CreditRiskModel 22 23 class DailyScoringPipeline(BatchPipeline): 24 model_class = CreditRiskModel 25 schedule = Schedule.daily(hour=6) 26 27 def run(self): 28 # Load data 29 data = self.model.features.data_source.load() 30 31 # Transform features 32 X = self.model.features.transform(data) 33 34 # Predict 35 predictions = self.model.predict(X) 36 37 # Save results 38 self.save_results(predictions) 39 40 # Execute 41 pipeline = DailyScoringPipeline() 42 pipeline.initialize() 43 pipeline.execute() 44 ``` 45 """ 46 47 # Override in subclass 48 model_class: type["Model"] = None 49 schedule: Optional["Schedule"] = None 50 trigger: Optional["Trigger"] = None 51 artifact_project: Optional[str] = None 52 artifact_version: Optional[str] = None 53 54 model: Optional["Model"] 55 """The loaded model instance.""" 56 57 _is_initialized: bool 58 """Internal flag tracking initialization status.""" 59 60 def __init__(self): 61 """Initialize pipeline.""" 62 self.model: Optional["Model"] = None 63 self._is_initialized: bool = False 64 65 def initialize( 66 self, 67 project: Optional[str] = None, 68 version: Optional[str] = None, 69 ) -> None: 70 """Initialize pipeline by loading model artifacts. 71 72 Args: 73 project: Artifact project name. 74 version: Artifact version. 75 """ 76 from geronimo.artifacts import ArtifactStore 77 78 project = project or self.artifact_project or self.model_class.name 79 version = version or self.artifact_version or self.model_class.version 80 81 store = ArtifactStore.load(project=project, version=version) 82 83 self.model = self.model_class() 84 self.model.load(store) 85 self._is_initialized = True 86 87 def execute(self) -> Any: 88 """Execute the pipeline. 89 90 Returns: 91 Pipeline result. 92 """ 93 if not self._is_initialized: 94 raise RuntimeError("Pipeline not initialized. Call initialize() first.") 95 96 return self.run() 97 98 @abstractmethod 99 def run(self) -> Any: 100 """Main pipeline logic. 101 102 Override this method with your batch processing logic. 103 104 Returns: 105 Pipeline result. 106 """ 107 pass 108 109 def save_results( 110 self, 111 results: Any, 112 output_path: Optional[str] = None, 113 use_artifact_store: bool = False, 114 artifact_name: Optional[str] = None, 115 ) -> str: 116 """Save pipeline results. 117 118 Args: 119 results: Results to save (DataFrame, dict, etc.). 120 output_path: Optional output path (for file-based storage). 121 use_artifact_store: If True, save via ArtifactStore for consistency 122 with other artifacts. Default False for backward 123 compatibility. 124 artifact_name: Name for artifact when using artifact store. 125 Defaults to pipeline class name. 126 127 Returns: 128 Path or URI where results were saved. 129 """ 130 import pandas as pd 131 from datetime import datetime 132 from pathlib import Path 133 134 # Use ArtifactStore if requested 135 if use_artifact_store: 136 from geronimo.artifacts import ArtifactStore 137 138 project = self.artifact_project or ( 139 self.model_class.name if self.model_class else self.__class__.__name__ 140 ) 141 version = self.artifact_version or ( 142 self.model_class.version if self.model_class else "1.0.0" 143 ) 144 145 store = ArtifactStore(project=project, version=version) 146 name = artifact_name or f"{self.__class__.__name__}_results" 147 return store.save(name, results, artifact_type="pipeline_result") 148 149 # Default file-based storage (backward compatible) 150 if output_path is None: 151 output_path = f"output/{self.__class__.__name__}_{datetime.now().isoformat()}.parquet" 152 153 path = Path(output_path) 154 path.parent.mkdir(parents=True, exist_ok=True) 155 156 if isinstance(results, pd.DataFrame): 157 results.to_parquet(path) 158 else: 159 import json 160 161 path = path.with_suffix(".json") 162 path.write_text(json.dumps(results, default=str)) 163 164 return str(path) 165 166 @property 167 def is_initialized(self) -> bool: 168 """Check if pipeline is initialized.""" 169 return self._is_initialized 170 171 def __repr__(self) -> str: 172 status = "initialized" if self._is_initialized else "not initialized" 173 model_name = self.model_class.__name__ if self.model_class else "None" 174 schedule_str = str(self.schedule) if self.schedule else "manual" 175 return f"{self.__class__.__name__}(model={model_name}, schedule={schedule_str}, {status})"
Base class for batch ML pipelines.
Provides a standardized interface for batch jobs with schedule/trigger support and integrated artifact management.
Example:
from geronimo.batch import BatchPipeline, Schedule from myproject.models import CreditRiskModel class DailyScoringPipeline(BatchPipeline): model_class = CreditRiskModel schedule = Schedule.daily(hour=6) def run(self): # Load data data = self.model.features.data_source.load() # Transform features X = self.model.features.transform(data) # Predict predictions = self.model.predict(X) # Save results self.save_results(predictions) # Execute pipeline = DailyScoringPipeline() pipeline.initialize() pipeline.execute()
60 def __init__(self): 61 """Initialize pipeline.""" 62 self.model: Optional["Model"] = None 63 self._is_initialized: bool = False
Initialize pipeline.
65 def initialize( 66 self, 67 project: Optional[str] = None, 68 version: Optional[str] = None, 69 ) -> None: 70 """Initialize pipeline by loading model artifacts. 71 72 Args: 73 project: Artifact project name. 74 version: Artifact version. 75 """ 76 from geronimo.artifacts import ArtifactStore 77 78 project = project or self.artifact_project or self.model_class.name 79 version = version or self.artifact_version or self.model_class.version 80 81 store = ArtifactStore.load(project=project, version=version) 82 83 self.model = self.model_class() 84 self.model.load(store) 85 self._is_initialized = True
Initialize pipeline by loading model artifacts.
Arguments:
- project: Artifact project name.
- version: Artifact version.
87 def execute(self) -> Any: 88 """Execute the pipeline. 89 90 Returns: 91 Pipeline result. 92 """ 93 if not self._is_initialized: 94 raise RuntimeError("Pipeline not initialized. Call initialize() first.") 95 96 return self.run()
Execute the pipeline.
Returns:
Pipeline result.
98 @abstractmethod 99 def run(self) -> Any: 100 """Main pipeline logic. 101 102 Override this method with your batch processing logic. 103 104 Returns: 105 Pipeline result. 106 """ 107 pass
Main pipeline logic.
Override this method with your batch processing logic.
Returns:
Pipeline result.
109 def save_results( 110 self, 111 results: Any, 112 output_path: Optional[str] = None, 113 use_artifact_store: bool = False, 114 artifact_name: Optional[str] = None, 115 ) -> str: 116 """Save pipeline results. 117 118 Args: 119 results: Results to save (DataFrame, dict, etc.). 120 output_path: Optional output path (for file-based storage). 121 use_artifact_store: If True, save via ArtifactStore for consistency 122 with other artifacts. Default False for backward 123 compatibility. 124 artifact_name: Name for artifact when using artifact store. 125 Defaults to pipeline class name. 126 127 Returns: 128 Path or URI where results were saved. 129 """ 130 import pandas as pd 131 from datetime import datetime 132 from pathlib import Path 133 134 # Use ArtifactStore if requested 135 if use_artifact_store: 136 from geronimo.artifacts import ArtifactStore 137 138 project = self.artifact_project or ( 139 self.model_class.name if self.model_class else self.__class__.__name__ 140 ) 141 version = self.artifact_version or ( 142 self.model_class.version if self.model_class else "1.0.0" 143 ) 144 145 store = ArtifactStore(project=project, version=version) 146 name = artifact_name or f"{self.__class__.__name__}_results" 147 return store.save(name, results, artifact_type="pipeline_result") 148 149 # Default file-based storage (backward compatible) 150 if output_path is None: 151 output_path = f"output/{self.__class__.__name__}_{datetime.now().isoformat()}.parquet" 152 153 path = Path(output_path) 154 path.parent.mkdir(parents=True, exist_ok=True) 155 156 if isinstance(results, pd.DataFrame): 157 results.to_parquet(path) 158 else: 159 import json 160 161 path = path.with_suffix(".json") 162 path.write_text(json.dumps(results, default=str)) 163 164 return str(path)
Save pipeline results.
Arguments:
- results: Results to save (DataFrame, dict, etc.).
- output_path: Optional output path (for file-based storage).
- use_artifact_store: If True, save via ArtifactStore for consistency with other artifacts. Default False for backward compatibility.
- artifact_name: Name for artifact when using artifact store. Defaults to pipeline class name.
Returns:
Path or URI where results were saved.
8class Schedule: 9 """Cron-based schedule for batch pipelines. 10 11 Example: 12 ```python 13 from geronimo.batch import Schedule 14 15 # Daily at 6 AM 16 daily = Schedule.cron("0 6 * * *") 17 18 # Every hour 19 hourly = Schedule.cron("0 * * * *") 20 21 # Weekly on Sunday 22 weekly = Schedule.weekly(day=0, hour=0) 23 ``` 24 """ 25 26 27 28 cron_expression: str 29 """The cron expression defining the schedule.""" 30 31 description: Optional[str] 32 """Human-readable description of the schedule.""" 33 34 def __init__(self, cron_expression: str, description: Optional[str] = None): 35 """Initialize schedule. 36 37 Args: 38 cron_expression: Standard cron expression (min hour day month weekday). 39 description: Optional human-readable description. 40 """ 41 self.cron_expression = cron_expression 42 self.description = description 43 44 @classmethod 45 def cron(cls, expression: str) -> "Schedule": 46 """Create from cron expression. 47 48 Args: 49 expression: Cron expression. 50 51 Returns: 52 Schedule instance. 53 """ 54 return cls(cron_expression=expression) 55 56 @classmethod 57 def daily(cls, hour: int = 0, minute: int = 0) -> "Schedule": 58 """Create daily schedule. 59 60 Args: 61 hour: Hour (0-23). 62 minute: Minute (0-59). 63 64 Returns: 65 Schedule instance. 66 """ 67 return cls( 68 cron_expression=f"{minute} {hour} * * *", 69 description=f"Daily at {hour:02d}:{minute:02d}", 70 ) 71 72 @classmethod 73 def weekly(cls, day: int = 0, hour: int = 0) -> "Schedule": 74 """Create weekly schedule. 75 76 Args: 77 day: Day of week (0=Sunday, 6=Saturday). 78 hour: Hour (0-23). 79 80 Returns: 81 Schedule instance. 82 """ 83 days = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"] 84 return cls( 85 cron_expression=f"0 {hour} * * {day}", 86 description=f"Weekly on {days[day]} at {hour:02d}:00", 87 ) 88 89 def __repr__(self) -> str: 90 return f"Schedule({self.cron_expression})"
Cron-based schedule for batch pipelines.
Example:
from geronimo.batch import Schedule # Daily at 6 AM daily = Schedule.cron("0 6 * * *") # Every hour hourly = Schedule.cron("0 * * * *") # Weekly on Sunday weekly = Schedule.weekly(day=0, hour=0)
34 def __init__(self, cron_expression: str, description: Optional[str] = None): 35 """Initialize schedule. 36 37 Args: 38 cron_expression: Standard cron expression (min hour day month weekday). 39 description: Optional human-readable description. 40 """ 41 self.cron_expression = cron_expression 42 self.description = description
Initialize schedule.
Arguments:
- cron_expression: Standard cron expression (min hour day month weekday).
- description: Optional human-readable description.
44 @classmethod 45 def cron(cls, expression: str) -> "Schedule": 46 """Create from cron expression. 47 48 Args: 49 expression: Cron expression. 50 51 Returns: 52 Schedule instance. 53 """ 54 return cls(cron_expression=expression)
Create from cron expression.
Arguments:
- expression: Cron expression.
Returns:
Schedule instance.
56 @classmethod 57 def daily(cls, hour: int = 0, minute: int = 0) -> "Schedule": 58 """Create daily schedule. 59 60 Args: 61 hour: Hour (0-23). 62 minute: Minute (0-59). 63 64 Returns: 65 Schedule instance. 66 """ 67 return cls( 68 cron_expression=f"{minute} {hour} * * *", 69 description=f"Daily at {hour:02d}:{minute:02d}", 70 )
Create daily schedule.
Arguments:
- hour: Hour (0-23).
- minute: Minute (0-59).
Returns:
Schedule instance.
72 @classmethod 73 def weekly(cls, day: int = 0, hour: int = 0) -> "Schedule": 74 """Create weekly schedule. 75 76 Args: 77 day: Day of week (0=Sunday, 6=Saturday). 78 hour: Hour (0-23). 79 80 Returns: 81 Schedule instance. 82 """ 83 days = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"] 84 return cls( 85 cron_expression=f"0 {hour} * * {day}", 86 description=f"Weekly on {days[day]} at {hour:02d}:00", 87 )
Create weekly schedule.
Arguments:
- day: Day of week (0=Sunday, 6=Saturday).
- hour: Hour (0-23).
Returns:
Schedule instance.
101class Trigger: 102 """Event-based trigger for batch pipelines. 103 104 Example: 105 ```python 106 from geronimo.batch import Trigger 107 108 # Trigger on S3 upload 109 s3_trigger = Trigger.s3_upload(bucket="data-bucket", prefix="input/") 110 111 # Manual trigger only 112 manual = Trigger.manual() 113 ``` 114 """ 115 116 117 118 trigger_type: TriggerType 119 """The type of event that triggers execution.""" 120 121 config: dict 122 """Configuration specific to the trigger type.""" 123 124 def __init__( 125 self, 126 trigger_type: TriggerType, 127 config: Optional[dict] = None, 128 ): 129 """Initialize trigger. 130 131 Args: 132 trigger_type: Type of trigger. 133 config: Trigger-specific configuration. 134 """ 135 self.trigger_type = trigger_type 136 self.config = config or {} 137 138 @classmethod 139 def s3_upload(cls, bucket: str, prefix: Optional[str] = None) -> "Trigger": 140 """Trigger on S3 object upload. 141 142 Args: 143 bucket: S3 bucket name. 144 prefix: Optional key prefix filter. 145 146 Returns: 147 Trigger instance. 148 """ 149 return cls( 150 trigger_type=TriggerType.S3_UPLOAD, 151 config={"bucket": bucket, "prefix": prefix}, 152 ) 153 154 @classmethod 155 def sns_message(cls, topic_arn: str) -> "Trigger": 156 """Trigger on SNS message. 157 158 Args: 159 topic_arn: SNS topic ARN. 160 161 Returns: 162 Trigger instance. 163 """ 164 return cls( 165 trigger_type=TriggerType.SNS_MESSAGE, 166 config={"topic_arn": topic_arn}, 167 ) 168 169 @classmethod 170 def manual(cls) -> "Trigger": 171 """Manual trigger only. 172 173 Returns: 174 Trigger instance. 175 """ 176 return cls(trigger_type=TriggerType.MANUAL) 177 178 def __repr__(self) -> str: 179 return f"Trigger({self.trigger_type.value})"
Event-based trigger for batch pipelines.
Example:
from geronimo.batch import Trigger # Trigger on S3 upload s3_trigger = Trigger.s3_upload(bucket="data-bucket", prefix="input/") # Manual trigger only manual = Trigger.manual()
124 def __init__( 125 self, 126 trigger_type: TriggerType, 127 config: Optional[dict] = None, 128 ): 129 """Initialize trigger. 130 131 Args: 132 trigger_type: Type of trigger. 133 config: Trigger-specific configuration. 134 """ 135 self.trigger_type = trigger_type 136 self.config = config or {}
Initialize trigger.
Arguments:
- trigger_type: Type of trigger.
- config: Trigger-specific configuration.
138 @classmethod 139 def s3_upload(cls, bucket: str, prefix: Optional[str] = None) -> "Trigger": 140 """Trigger on S3 object upload. 141 142 Args: 143 bucket: S3 bucket name. 144 prefix: Optional key prefix filter. 145 146 Returns: 147 Trigger instance. 148 """ 149 return cls( 150 trigger_type=TriggerType.S3_UPLOAD, 151 config={"bucket": bucket, "prefix": prefix}, 152 )
Trigger on S3 object upload.
Arguments:
- bucket: S3 bucket name.
- prefix: Optional key prefix filter.
Returns:
Trigger instance.
154 @classmethod 155 def sns_message(cls, topic_arn: str) -> "Trigger": 156 """Trigger on SNS message. 157 158 Args: 159 topic_arn: SNS topic ARN. 160 161 Returns: 162 Trigger instance. 163 """ 164 return cls( 165 trigger_type=TriggerType.SNS_MESSAGE, 166 config={"topic_arn": topic_arn}, 167 )
Trigger on SNS message.
Arguments:
- topic_arn: SNS topic ARN.
Returns:
Trigger instance.