"""
Message types for the queue-based execution engine.
This module defines all message types used in the pipeline execution queue.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from stabilize.models.stage import SyntheticStageOwner
from stabilize.models.status import WorkflowStatus
[docs]
@dataclass
class Message:
"""
Base class for all queue messages.
Each message includes metadata for tracking and debugging.
"""
# Message metadata
message_id: str | None = field(default=None, repr=False)
created_at: datetime = field(default_factory=datetime.now, repr=False)
attempts: int = field(default=0, repr=False)
max_attempts: int = field(default=10, repr=False)
# Error tracking for retries - stores context from previous failures
last_error: str | None = field(default=None, repr=False)
last_error_type: str | None = field(default=None, repr=False)
[docs]
def copy_with_attempts(self, attempts: int) -> Message:
"""Create a copy with updated attempt count."""
import copy
new_msg = copy.copy(self)
new_msg.attempts = attempts
return new_msg
[docs]
def set_error_context(self, error: Exception) -> None:
"""Store error context from a failed attempt.
This context is preserved across reschedules to help debugging
and provide visibility into why messages are being retried.
"""
self.last_error = str(error)
self.last_error_type = type(error).__name__
# ============================================================================
# Execution-level messages
# ============================================================================
[docs]
@dataclass
class WorkflowLevel(Message):
"""
Base class for execution-level messages.
These messages target a specific pipeline execution.
"""
execution_type: str = "PIPELINE"
execution_id: str = ""
[docs]
@dataclass
class StartWorkflow(WorkflowLevel):
"""
Message to start a pipeline execution.
Triggers the beginning of pipeline execution, starting initial stages.
"""
pass
[docs]
@dataclass
class CompleteWorkflow(WorkflowLevel):
"""
Message to complete a pipeline execution.
Sent when all stages have completed or execution should be finalized.
Attributes:
retry_count: Number of times this message has been re-queued while
waiting for stages to complete. Used to prevent infinite loops.
"""
retry_count: int = 0
[docs]
@dataclass
class CancelWorkflow(WorkflowLevel):
"""
Message to cancel a pipeline execution.
Marks the execution as canceled and stops all running stages.
"""
user: str = ""
reason: str = ""
[docs]
@dataclass
class StartWaitingWorkflows(Message):
"""
Message to start any queued/waiting executions for a pipeline config.
Sent after an execution completes when concurrent execution limits are enabled.
"""
pipeline_config_id: str = ""
purge_queue: bool = False
# ============================================================================
# Stage-level messages
# ============================================================================
[docs]
@dataclass
class StageLevel(WorkflowLevel):
"""
Base class for stage-level messages.
These messages target a specific stage within an execution.
Attributes:
stage_id: The ID of the stage this message targets
retry_count: Number of times this message has been re-queued while
waiting for upstream stages to complete. Used to prevent infinite loops.
"""
stage_id: str = ""
retry_count: int = 0
[docs]
@classmethod
def from_execution_level(cls, msg: WorkflowLevel, stage_id: str) -> StageLevel:
"""Create a stage-level message from an execution-level message."""
return cls(
execution_type=msg.execution_type,
execution_id=msg.execution_id,
stage_id=stage_id,
)
[docs]
@dataclass
class StartStage(StageLevel):
"""
Message to start a stage.
Checks if upstream stages are complete, then plans and starts the stage.
"""
pass
[docs]
@dataclass
class CompleteStage(StageLevel):
"""
Message to complete a stage.
Determines stage status, plans after stages, and triggers downstream.
"""
pass
[docs]
@dataclass
class SkipStage(StageLevel):
"""
Message to skip a stage.
Sets stage status to SKIPPED and triggers downstream stages.
"""
pass
[docs]
@dataclass
class CancelStage(StageLevel):
"""
Message to cancel a stage.
Cancels any running tasks and marks stage as canceled.
"""
pass
[docs]
@dataclass
class RestartStage(StageLevel):
"""
Message to restart a stage.
Resets stage status and re-executes from the beginning.
"""
pass
[docs]
@dataclass
class ResumeStage(StageLevel):
"""
Message to resume a paused stage.
Continues execution from where it was paused.
"""
pass
[docs]
@dataclass
class ContinueParentStage(StageLevel):
"""
Message to continue parent stage after synthetic stage completes.
Sent when a synthetic stage completes to notify its parent.
"""
phase: SyntheticStageOwner = SyntheticStageOwner.STAGE_AFTER
[docs]
@dataclass
class JumpToStage(StageLevel):
"""
Message to jump to a different stage (dynamic routing).
Resets the target stage to NOT_STARTED and starts it with merged context.
Used by TaskResult.jump_to() for dynamic flow control.
Attributes:
target_stage_ref_id: The ref_id of the stage to jump to
jump_context: Context to merge into target stage
jump_outputs: Outputs to make available to target stage
"""
target_stage_ref_id: str = ""
jump_context: dict[str, Any] = field(default_factory=dict)
jump_outputs: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class SignalStage(StageLevel):
"""
Message to signal a suspended stage (WCP-23, WCP-24).
For transient triggers (WCP-23), the signal is discarded if the stage
is not currently SUSPENDED. For persistent triggers (WCP-24), the
signal is buffered if the stage is not ready.
Attributes:
signal_name: Name/type of the signal
signal_data: Payload data carried by the signal
persistent: If True, buffer signal when stage not ready (WCP-24)
"""
signal_name: str = ""
signal_data: dict[str, Any] = field(default_factory=dict)
persistent: bool = False
[docs]
@dataclass
class CancelRegion(WorkflowLevel):
"""
Message to cancel all stages in a named region (WCP-25).
Finds all stages with matching cancel_region and pushes
CancelStage for each active one.
Attributes:
region: The cancel region name to target
"""
region: str = ""
[docs]
@dataclass
class AddMultiInstance(StageLevel):
"""
Message to add a new instance to a running multi-instance activity (WCP-15).
Creates a new parallel instance for an MI stage that allows dynamic additions.
Attributes:
instance_context: Context for the new instance
"""
instance_context: dict[str, Any] = field(default_factory=dict)
# ============================================================================
# Task-level messages
# ============================================================================
[docs]
@dataclass
class TaskLevel(StageLevel):
"""
Base class for task-level messages.
These messages target a specific task within a stage.
"""
task_id: str = ""
[docs]
@classmethod
def from_stage_level(cls, msg: StageLevel, task_id: str) -> TaskLevel:
"""Create a task-level message from a stage-level message."""
return cls(
execution_type=msg.execution_type,
execution_id=msg.execution_id,
stage_id=msg.stage_id,
task_id=task_id,
)
[docs]
@dataclass
class StartTask(TaskLevel):
"""
Message to start a task.
Sets task status to RUNNING and triggers RunTask.
"""
pass
[docs]
@dataclass
class RunTask(TaskLevel):
"""
Message to execute a task.
Runs the task implementation and handles the result.
"""
task_type: str = ""
[docs]
@dataclass
class CompleteTask(TaskLevel):
"""
Message to complete a task.
Updates task status and triggers next task or stage completion.
"""
status: WorkflowStatus = WorkflowStatus.SUCCEEDED
original_status: WorkflowStatus | None = None
[docs]
@dataclass
class PauseTask(TaskLevel):
"""
Message to pause a task.
Used when execution is paused - task will resume when execution resumes.
"""
pass
# ============================================================================
# Error messages
# ============================================================================
[docs]
@dataclass
class InvalidWorkflowId(WorkflowLevel):
"""
Message indicating an invalid execution ID was referenced.
Logged and dropped - no further processing.
"""
pass
[docs]
@dataclass
class InvalidStageId(StageLevel):
"""
Message indicating an invalid stage ID was referenced.
Logged and dropped - no further processing.
"""
pass
[docs]
@dataclass
class InvalidTaskId(TaskLevel):
"""
Message indicating an invalid task ID was referenced.
Logged and dropped - no further processing.
"""
pass
[docs]
@dataclass
class InvalidTaskType(TaskLevel):
"""
Message indicating an unknown task type was referenced.
Logged and dropped - no further processing.
"""
task_type_name: str = ""
# ============================================================================
# Message Registry
# ============================================================================
# Map of message type names to classes for deserialization
MESSAGE_TYPES: dict[str, type[Message]] = {
"StartWorkflow": StartWorkflow,
"CompleteWorkflow": CompleteWorkflow,
"CancelWorkflow": CancelWorkflow,
"StartWaitingWorkflows": StartWaitingWorkflows,
"StartStage": StartStage,
"CompleteStage": CompleteStage,
"SkipStage": SkipStage,
"CancelStage": CancelStage,
"RestartStage": RestartStage,
"ResumeStage": ResumeStage,
"ContinueParentStage": ContinueParentStage,
"JumpToStage": JumpToStage,
"SignalStage": SignalStage,
"CancelRegion": CancelRegion,
"AddMultiInstance": AddMultiInstance,
"StartTask": StartTask,
"RunTask": RunTask,
"CompleteTask": CompleteTask,
"PauseTask": PauseTask,
"InvalidWorkflowId": InvalidWorkflowId,
"InvalidStageId": InvalidStageId,
"InvalidTaskId": InvalidTaskId,
"InvalidTaskType": InvalidTaskType,
}
[docs]
def get_message_type_name(message: Message) -> str:
"""Get the type name for a message."""
return message.__class__.__name__
[docs]
def create_message_from_dict(type_name: str, data: dict[str, Any]) -> Message:
"""
Create a message from a dictionary representation.
Args:
type_name: The message type name
data: Dictionary of message fields
Returns:
A Message instance
Raises:
ValueError: If type_name is unknown
"""
if type_name not in MESSAGE_TYPES:
raise ValueError(f"Unknown message type: {type_name}")
message_class = MESSAGE_TYPES[type_name]
return message_class(**data)