aws_ddk_core.pipelines.DataStage

class aws_ddk_core.pipelines.DataStage(*args: Any, **kwargs)

Class that represents a data stage within a data pipeline.

To create a DataStage, inherit from this class, add infrastructure required by the stage, and implement get_event_pattern and get_targets methods. For example:

class MyStage(DataStage):
    def __init__(
        self,
        scope: Construct,
        id: str,
        environment_id: str,
    ) -> None:
        super().__init__(scope, id)

        # Define stage infrastructure, for example a queue
        self._queue = SQSFactory.queue(
            self,
            id,
            environment_id,
        )

    @property
    def queue(self) -> Queue:
        '''
        Return: Queue
            The SQS queue
        '''
        return self._queue

    def get_event_pattern(self) -> Optional[EventPattern]:
        return EventPattern(
            detail_type=["my-detail-type"],
        )

    def get_targets(self) -> Optional[List[IRuleTarget]]:
        return [SqsQueue(self._queue)]
__init__(scope: constructs.Construct, id: str, name: Optional[str] = None, description: Optional[str] = None, alarms_enabled: Optional[bool] = True) None

Create a stage.

Parameters
  • scope (Construct) – Scope within which this construct is defined

  • id (str) – Identifier of the stage

  • name (Optional[str]) – Name of the stage

  • description (Optional[str]) – Description of the stage

  • alarms_enabled (Optional[bool]) – Enable/Disable all alarms in a DataStage. Default - True

Methods

__init__(scope, id[, name, description, ...])

Create a stage.

add_alarm(alarm_id, alarm_metric[, ...])

Add a CloudWatch alarm for the Data Stage

get_event_pattern()

Get output event pattern of the stage.

get_targets()

Get input targets of the stage.

is_construct(x)

Checks if x is a construct.

to_string()

Returns a string representation of this construct.

Attributes

cloudwatch_alarms

List[Alarm] List of CloudWatch Alarms linked to the stage

node

The tree node.

add_alarm(alarm_id: str, alarm_metric: aws_cdk.aws_cloudwatch.IMetric, alarm_comparison_operator: Optional[aws_cdk.aws_cloudwatch.ComparisonOperator] = ComparisonOperator.GREATER_THAN_THRESHOLD, alarm_evaluation_periods: Optional[int] = 1, alarm_threshold: Optional[int] = 5) aws_ddk_core.pipelines.stage.DataStage

Add a CloudWatch alarm for the Data Stage

Parameters
  • alarm_id (str) – Identifier of the CloudWatch Alarm.

  • alarm_metric (IMetric) – Metric to use for creating the Stage’s CloudWatch Alarm.

  • alarm_comparison_operator (Optional[ComparisonOperator]) – Comparison operator to use for alarm. GREATER_THAN_THRESHOLD by default.

  • alarm_threshold (Optional[int]) – The value against which the specified alarm statistic is compared. 5 by default.

  • alarm_evaluation_periods (Optional[int]) – The number of periods over which data is compared to the specified threshold. 1 by default.

property cloudwatch_alarms: List[Optional[aws_cdk.aws_cloudwatch.IAlarm]]

List[Alarm] List of CloudWatch Alarms linked to the stage

Type

Return