Source code for stabilize.queue.messages

"""
Message types for the queue-based execution engine.

This module defines all message types used in the pipeline execution queue.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

from stabilize.models.stage import SyntheticStageOwner
from stabilize.models.status import WorkflowStatus


[docs] @dataclass class Message: """ Base class for all queue messages. Each message includes metadata for tracking and debugging. """ # Message metadata message_id: str | None = field(default=None, repr=False) created_at: datetime = field(default_factory=datetime.now, repr=False) attempts: int = field(default=0, repr=False) max_attempts: int = field(default=10, repr=False) # Error tracking for retries - stores context from previous failures last_error: str | None = field(default=None, repr=False) last_error_type: str | None = field(default=None, repr=False)
[docs] def copy_with_attempts(self, attempts: int) -> Message: """Create a copy with updated attempt count.""" import copy new_msg = copy.copy(self) new_msg.attempts = attempts return new_msg
[docs] def set_error_context(self, error: Exception) -> None: """Store error context from a failed attempt. This context is preserved across reschedules to help debugging and provide visibility into why messages are being retried. """ self.last_error = str(error) self.last_error_type = type(error).__name__
# ============================================================================ # Execution-level messages # ============================================================================
[docs] @dataclass class WorkflowLevel(Message): """ Base class for execution-level messages. These messages target a specific pipeline execution. """ execution_type: str = "PIPELINE" execution_id: str = ""
[docs] @dataclass class StartWorkflow(WorkflowLevel): """ Message to start a pipeline execution. Triggers the beginning of pipeline execution, starting initial stages. """ pass
[docs] @dataclass class CompleteWorkflow(WorkflowLevel): """ Message to complete a pipeline execution. Sent when all stages have completed or execution should be finalized. Attributes: retry_count: Number of times this message has been re-queued while waiting for stages to complete. Used to prevent infinite loops. """ retry_count: int = 0
[docs] @dataclass class CancelWorkflow(WorkflowLevel): """ Message to cancel a pipeline execution. Marks the execution as canceled and stops all running stages. """ user: str = "" reason: str = ""
[docs] @dataclass class StartWaitingWorkflows(Message): """ Message to start any queued/waiting executions for a pipeline config. Sent after an execution completes when concurrent execution limits are enabled. """ pipeline_config_id: str = "" purge_queue: bool = False
# ============================================================================ # Stage-level messages # ============================================================================
[docs] @dataclass class StageLevel(WorkflowLevel): """ Base class for stage-level messages. These messages target a specific stage within an execution. Attributes: stage_id: The ID of the stage this message targets retry_count: Number of times this message has been re-queued while waiting for upstream stages to complete. Used to prevent infinite loops. """ stage_id: str = "" retry_count: int = 0
[docs] @classmethod def from_execution_level(cls, msg: WorkflowLevel, stage_id: str) -> StageLevel: """Create a stage-level message from an execution-level message.""" return cls( execution_type=msg.execution_type, execution_id=msg.execution_id, stage_id=stage_id, )
[docs] @dataclass class StartStage(StageLevel): """ Message to start a stage. Checks if upstream stages are complete, then plans and starts the stage. """ pass
[docs] @dataclass class CompleteStage(StageLevel): """ Message to complete a stage. Determines stage status, plans after stages, and triggers downstream. """ pass
[docs] @dataclass class SkipStage(StageLevel): """ Message to skip a stage. Sets stage status to SKIPPED and triggers downstream stages. """ pass
[docs] @dataclass class CancelStage(StageLevel): """ Message to cancel a stage. Cancels any running tasks and marks stage as canceled. """ pass
[docs] @dataclass class RestartStage(StageLevel): """ Message to restart a stage. Resets stage status and re-executes from the beginning. """ pass
[docs] @dataclass class ResumeStage(StageLevel): """ Message to resume a paused stage. Continues execution from where it was paused. """ pass
[docs] @dataclass class ContinueParentStage(StageLevel): """ Message to continue parent stage after synthetic stage completes. Sent when a synthetic stage completes to notify its parent. """ phase: SyntheticStageOwner = SyntheticStageOwner.STAGE_AFTER
[docs] @dataclass class JumpToStage(StageLevel): """ Message to jump to a different stage (dynamic routing). Resets the target stage to NOT_STARTED and starts it with merged context. Used by TaskResult.jump_to() for dynamic flow control. Attributes: target_stage_ref_id: The ref_id of the stage to jump to jump_context: Context to merge into target stage jump_outputs: Outputs to make available to target stage """ target_stage_ref_id: str = "" jump_context: dict[str, Any] = field(default_factory=dict) jump_outputs: dict[str, Any] = field(default_factory=dict)
[docs] @dataclass class SignalStage(StageLevel): """ Message to signal a suspended stage (WCP-23, WCP-24). For transient triggers (WCP-23), the signal is discarded if the stage is not currently SUSPENDED. For persistent triggers (WCP-24), the signal is buffered if the stage is not ready. Attributes: signal_name: Name/type of the signal signal_data: Payload data carried by the signal persistent: If True, buffer signal when stage not ready (WCP-24) """ signal_name: str = "" signal_data: dict[str, Any] = field(default_factory=dict) persistent: bool = False
[docs] @dataclass class CancelRegion(WorkflowLevel): """ Message to cancel all stages in a named region (WCP-25). Finds all stages with matching cancel_region and pushes CancelStage for each active one. Attributes: region: The cancel region name to target """ region: str = ""
[docs] @dataclass class AddMultiInstance(StageLevel): """ Message to add a new instance to a running multi-instance activity (WCP-15). Creates a new parallel instance for an MI stage that allows dynamic additions. Attributes: instance_context: Context for the new instance """ instance_context: dict[str, Any] = field(default_factory=dict)
# ============================================================================ # Task-level messages # ============================================================================
[docs] @dataclass class TaskLevel(StageLevel): """ Base class for task-level messages. These messages target a specific task within a stage. """ task_id: str = ""
[docs] @classmethod def from_stage_level(cls, msg: StageLevel, task_id: str) -> TaskLevel: """Create a task-level message from a stage-level message.""" return cls( execution_type=msg.execution_type, execution_id=msg.execution_id, stage_id=msg.stage_id, task_id=task_id, )
[docs] @dataclass class StartTask(TaskLevel): """ Message to start a task. Sets task status to RUNNING and triggers RunTask. """ pass
[docs] @dataclass class RunTask(TaskLevel): """ Message to execute a task. Runs the task implementation and handles the result. """ task_type: str = ""
[docs] @dataclass class CompleteTask(TaskLevel): """ Message to complete a task. Updates task status and triggers next task or stage completion. """ status: WorkflowStatus = WorkflowStatus.SUCCEEDED original_status: WorkflowStatus | None = None
[docs] @dataclass class PauseTask(TaskLevel): """ Message to pause a task. Used when execution is paused - task will resume when execution resumes. """ pass
# ============================================================================ # Error messages # ============================================================================
[docs] @dataclass class InvalidWorkflowId(WorkflowLevel): """ Message indicating an invalid execution ID was referenced. Logged and dropped - no further processing. """ pass
[docs] @dataclass class InvalidStageId(StageLevel): """ Message indicating an invalid stage ID was referenced. Logged and dropped - no further processing. """ pass
[docs] @dataclass class InvalidTaskId(TaskLevel): """ Message indicating an invalid task ID was referenced. Logged and dropped - no further processing. """ pass
[docs] @dataclass class InvalidTaskType(TaskLevel): """ Message indicating an unknown task type was referenced. Logged and dropped - no further processing. """ task_type_name: str = ""
# ============================================================================ # Message Registry # ============================================================================ # Map of message type names to classes for deserialization MESSAGE_TYPES: dict[str, type[Message]] = { "StartWorkflow": StartWorkflow, "CompleteWorkflow": CompleteWorkflow, "CancelWorkflow": CancelWorkflow, "StartWaitingWorkflows": StartWaitingWorkflows, "StartStage": StartStage, "CompleteStage": CompleteStage, "SkipStage": SkipStage, "CancelStage": CancelStage, "RestartStage": RestartStage, "ResumeStage": ResumeStage, "ContinueParentStage": ContinueParentStage, "JumpToStage": JumpToStage, "SignalStage": SignalStage, "CancelRegion": CancelRegion, "AddMultiInstance": AddMultiInstance, "StartTask": StartTask, "RunTask": RunTask, "CompleteTask": CompleteTask, "PauseTask": PauseTask, "InvalidWorkflowId": InvalidWorkflowId, "InvalidStageId": InvalidStageId, "InvalidTaskId": InvalidTaskId, "InvalidTaskType": InvalidTaskType, }
[docs] def get_message_type_name(message: Message) -> str: """Get the type name for a message.""" return message.__class__.__name__
[docs] def create_message_from_dict(type_name: str, data: dict[str, Any]) -> Message: """ Create a message from a dictionary representation. Args: type_name: The message type name data: Dictionary of message fields Returns: A Message instance Raises: ValueError: If type_name is unknown """ if type_name not in MESSAGE_TYPES: raise ValueError(f"Unknown message type: {type_name}") message_class = MESSAGE_TYPES[type_name] return message_class(**data)