Handlers
Stabilize uses a message-driven architecture where handlers process different message types
to drive workflow execution. All 15 default handlers are auto-registered by QueueProcessor
when store and task_registry are provided to the constructor. Manual registration is
no longer needed for default handlers.
Core Handlers
Base message handler classes.
This module provides the base classes for all message handlers in the pipeline execution engine.
- class stabilize.handlers.base.MessageHandler[source]
Bases:
ABC,Generic[M]Base class for message handlers.
Each handler processes a specific type of message.
- abstract property message_type: type[M]
Return the type of message this handler processes.
- class stabilize.handlers.base.StabilizeHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
MessageHandler[M],ABCBase handler with common utilities.
Provides helper methods for retrieving executions, stages, and tasks, as well as the startNext() implementation.
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- queue
The message queue for pushing messages
- repository
The workflow store for persistence
- retry_delay
Delay before re-queuing messages (from config or override)
- handler_config
Configuration for retry behavior and other settings
- property event_recorder: EventRecorder | None
Get the event recorder, preferring instance over global.
- set_event_context(workflow_id)[source]
Set event context for correlation tracking.
Call this at the start of handler processing to establish correlation context for any events recorded during handling.
- Parameters:
workflow_id (str) – The workflow ID for correlation.
- Return type:
None
- with_execution(message, block)[source]
Execute a block with the execution for a message.
- Parameters:
message (WorkflowLevel) – Message containing execution ID
block (Callable[[Workflow], None]) – Function to call with the execution
- Return type:
None
- with_stage(message, block)[source]
Execute a block with the stage for a message.
- Parameters:
message (StageLevel) – Message containing stage ID
block (Callable[[StageExecution], None]) – Function to call with the stage
- Return type:
None
- with_task(message, block)[source]
Execute a block with the stage and task for a message.
- Parameters:
message (TaskLevel) – Message containing task ID
block (Callable[[StageExecution, TaskExecution], None]) – Function to call with (stage, task)
- Return type:
None
- start_next(stage)[source]
Start the next stage(s) after a stage completes.
This is the critical method for DAG traversal: 1. Find downstream stages (those that depend on this stage) 2. Push StartStage for each downstream stage 3. If this is a synthetic stage, notify parent 4. If no downstream and not synthetic, complete execution
- Parameters:
stage (StageExecution)
- Return type:
None
- retry_on_concurrency_error(func, context='operation')[source]
Execute a function with retry on ConcurrencyError.
Uses resilient-circuit’s RetryWithBackoffPolicy for consistent retry behavior. Configuration comes from handler_config settings. Set concurrency_max_retries to 0 to disable retries entirely.
- Parameters:
func (Callable[[], None]) – The function to execute
context (str) – Description for logging (e.g., “completing task”, “starting stage”)
- Raises:
ConcurrencyError – If max retries exceeded
- Return type:
None
- set_stage_status(stage, new_status)[source]
Set stage status with validation.
Validates the transition is allowed before setting the status.
- Parameters:
stage (StageExecution) – The stage to update
new_status (WorkflowStatus) – The new status to set
- Raises:
InvalidStateTransitionError – If the transition is not allowed
- Return type:
None
- set_task_status(task, new_status)[source]
Set task status with validation.
Validates the transition is allowed before setting the status.
- Parameters:
task (TaskExecution) – The task to update
new_status (WorkflowStatus) – The new status to set
- Raises:
InvalidStateTransitionError – If the transition is not allowed
- Return type:
None
- set_workflow_status(workflow, new_status)[source]
Set workflow status with validation.
Validates the transition is allowed before setting the status.
- Parameters:
workflow (Workflow) – The workflow to update
new_status (WorkflowStatus) – The new status to set
- Raises:
InvalidStateTransitionError – If the transition is not allowed
- Return type:
None
StartWorkflowHandler - handles pipeline execution startup.
This handler is triggered when a new pipeline execution is started. It finds initial stages (those with no dependencies) and queues them for execution.
- class stabilize.handlers.start_workflow.StartWorkflowHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[StartWorkflow]Handler for StartWorkflow messages.
When a pipeline execution starts: 1. Check if execution should be queued (concurrent limits) 2. Find initial stages (no dependencies) 3. Push StartStage for each initial stage 4. Mark execution as RUNNING
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[StartWorkflow]
Return the type of message this handler processes.
- handle(message)[source]
Handle the StartWorkflow message.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (StartWorkflow)
- Return type:
None
StartWaitingWorkflowsHandler - starts buffered executions.
This handler is triggered when a pipeline execution completes. It checks if there are any BUFFERED executions for the same pipeline config and starts them if capacity allows.
- class stabilize.handlers.start_waiting_workflows.StartWaitingWorkflowsHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[StartWaitingWorkflows]Handler for StartWaitingWorkflows messages.
Execution flow: 1. Find buffered executions for the pipeline config 2. Check concurrent limits 3. Start executions up to the limit 4. If purge_queue is True, cancel remaining buffered executions
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[StartWaitingWorkflows]
Return the type of message this handler processes.
- handle(message)[source]
Handle the StartWaitingWorkflows message.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (StartWaitingWorkflows)
- Return type:
None
- class stabilize.handlers.start_stage.StartStageHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StartStageConditionsMixin,StartStageOrchestrationMixin,StartStagePlannerMixin,StabilizeHandler[StartStage]Handler for StartStage messages.
Execution flow: 1. Check if any upstream stages failed -> CompleteWorkflow 2. Check if all upstream stages complete
If not: Re-queue with retry delay
If yes: Continue to step 3
Check if stage should be skipped -> SkipStage
Check if start time expired -> SkipStage
Plan the stage (build tasks and before stages)
Start the stage: - If has before stages: StartStage for each - Else if has tasks: StartTask for first task - Else: CompleteStage
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- handle(message)[source]
Handle the StartStage message.
- Parameters:
message (StartStage)
- Return type:
None
- property message_type: type[StartStage]
Return the type of message this handler processes.
SkipStageHandler - handles stage skipping.
This handler is invoked when a stage should be skipped due to conditional execution or start time expiry.
- class stabilize.handlers.skip_stage.SkipStageHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[SkipStage]Handler for SkipStage messages.
Execution flow: 1. Check if stage is still in a skippable state 2. Set stage status to SKIPPED 3. Set end time 4. Atomically store the stage and trigger downstream stages
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
CancelStageHandler - handles stage cancellation.
This handler is invoked when a stage needs to be canceled, typically due to upstream failure or execution cancellation.
- class stabilize.handlers.cancel_stage.CancelStageHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[CancelStage]Handler for CancelStage messages.
Execution flow: 1. Check if stage is still in a cancellable state 2. Cancel all running tasks 3. Set stage status to CANCELED 4. Set end time 5. Store the stage 6. Does NOT trigger downstream (cancellation terminates the path)
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[CancelStage]
Return the type of message this handler processes.
- handle(message)[source]
Handle the CancelStage message.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (CancelStage)
- Return type:
None
ContinueParentStageHandler - handles synthetic stage completion notification.
This handler is invoked when a synthetic stage (before/after stage) completes and the parent stage needs to be notified to continue processing.
- class stabilize.handlers.continue_parent_stage.ContinueParentStageHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[ContinueParentStage]Handler for ContinueParentStage messages.
Execution flow based on phase:
For STAGE_BEFORE phase: 1. Check if all before-stages are complete 2. If complete: start the parent’s first task 3. If not: wait (another ContinueParentStage will be sent)
For STAGE_AFTER phase: 1. Check if all after-stages are complete 2. If complete: push CompleteStage for parent 3. If not: wait (another ContinueParentStage will be sent)
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[ContinueParentStage]
Return the type of message this handler processes.
- handle(message)[source]
Handle the ContinueParentStage message.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (ContinueParentStage)
- Return type:
None
StartTaskHandler - handles task startup.
This handler prepares a task for execution and triggers RunTask.
- class stabilize.handlers.start_task.StartTaskHandler(queue, repository, task_registry, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[StartTask]Handler for StartTask messages.
Execution flow: 1. Check if task is enabled (SkippableTask)
If not: Push CompleteTask(SKIPPED)
Set task status to RUNNING
Set task start time
Push RunTask
- Parameters:
queue (Queue)
repository (WorkflowStore)
task_registry (TaskRegistry)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
RunTask handler package.
- class stabilize.handlers.run_task.RunTaskHandler(queue, repository, task_registry, retry_delay=None, bulkhead_manager=None, circuit_factory=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[RunTask]Handler for RunTask messages.
This is where tasks are actually executed. The handler: 1. Resolves the task implementation 2. Checks for cancellation/pause 3. Checks for timeout 4. Executes the task 5. Processes the result
- Parameters:
queue (Queue)
repository (WorkflowStore)
task_registry (TaskRegistry)
retry_delay (timedelta | None)
bulkhead_manager (TaskBulkheadManager | None)
circuit_factory (WorkflowCircuitFactory | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
CompleteTaskHandler - handles task completion.
This handler updates task status and triggers either the next task or stage completion.
- class stabilize.handlers.complete_task.CompleteTaskHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[CompleteTask]Handler for CompleteTask messages.
Execution flow: 1. Update task status and end time 2. If there’s a next task: Push StartTask 3. Otherwise: Push CompleteStage
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[CompleteTask]
Return the type of message this handler processes.
- handle(message)[source]
Handle the CompleteTask message.
Uses atomic transactions to ensure stage updates and message pushes are committed together, preventing orphaned states.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (CompleteTask)
- Return type:
None
- class stabilize.handlers.complete_stage.CompleteStageHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None, task_registry=None)[source]
Bases:
CompleteStagesSplitMixin,CompleteStagePlannerMixin,StabilizeHandler[CompleteStage]Handler for CompleteStage messages.
Execution flow: 1. Check if stage already complete 2. Determine status from synthetic stages and tasks 3. If success: Plan and start after stages 4. If failure: Plan and start on-failure stages 5. Update stage status and end time 6. If status allows continuation: startNext() 7. Otherwise: CancelStage + CompleteWorkflow
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
task_registry (TaskRegistry | None)
- handle(message)[source]
Handle the CompleteStage message.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (CompleteStage)
- Return type:
None
- property message_type: type[CompleteStage]
Return the type of message this handler processes.
CompleteWorkflowHandler - handles execution completion.
This handler determines the final execution status based on all top-level stages and marks the execution as complete.
- class stabilize.handlers.complete_workflow.CompleteWorkflowHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[CompleteWorkflow]Handler for CompleteWorkflow messages.
Execution flow: 1. Check if execution already complete 2. Determine final status from top-level stages 3. Update execution status 4. Cancel any running stages if failed 5. Start waiting executions if queue is enabled
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[CompleteWorkflow]
Return the type of message this handler processes.
- handle(message)[source]
Handle the CompleteWorkflow message.
Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings.
- Parameters:
message (CompleteWorkflow)
- Return type:
None
Control-Flow Pattern Handlers
These handlers support advanced workflow control-flow patterns (WCP 6-43).
SignalStageHandler - handles external signals for suspended stages.
Implements: - WCP-23: Transient Trigger - signal is lost if stage not SUSPENDED - WCP-24: Persistent Trigger - signal is buffered if stage not ready
- class stabilize.handlers.signal_stage.SignalStageHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[SignalStage]Handler for SignalStage messages.
Execution flow: 1. Check if stage is SUSPENDED
If SUSPENDED: transition to RUNNING, merge signal data, push StartStage
If not SUSPENDED and transient: discard signal (WCP-23)
If not SUSPENDED and persistent: buffer signal for later (WCP-24)
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[SignalStage]
Return the type of message this handler processes.
- handle(message)[source]
Handle the SignalStage message.
- Parameters:
message (SignalStage)
- Return type:
None
CancelRegionHandler - cancels all stages in a named region.
Implements WCP-25: Cancel Region. Finds all stages with matching cancel_region and pushes CancelStage for each active one.
- class stabilize.handlers.cancel_region.CancelRegionHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[CancelRegion]Handler for CancelRegion messages.
Execution flow: 1. Load the workflow execution 2. Find all stages with matching cancel_region 3. For each active stage, push CancelStage
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[CancelRegion]
Return the type of message this handler processes.
- handle(message)[source]
Handle the CancelRegion message.
- Parameters:
message (CancelRegion)
- Return type:
None
AddMultiInstanceHandler - adds a new instance to a running MI activity.
Implements WCP-15: Multiple Instances without a priori Run-Time Knowledge. Dynamically creates new parallel instances during execution.
- class stabilize.handlers.add_multi_instance.AddMultiInstanceHandler(queue, repository, retry_delay=None, handler_config=None, event_recorder=None)[source]
Bases:
StabilizeHandler[AddMultiInstance]Handler for AddMultiInstance messages.
Execution flow: 1. Load the parent MI stage 2. Verify it allows dynamic instances (mi_config.allow_dynamic) 3. Create a new child stage instance 4. Update the MI join threshold if needed 5. Push StartStage for the new instance
- Parameters:
queue (Queue)
repository (WorkflowStore)
retry_delay (timedelta | None)
handler_config (HandlerConfig | None)
event_recorder (EventRecorder | None)
- property message_type: type[AddMultiInstance]
Return the type of message this handler processes.
- handle(message)[source]
Handle the AddMultiInstance message.
- Parameters:
message (AddMultiInstance)
- Return type:
None