Queue
- class stabilize.queue.processor.MessageHandler[source]
Bases:
Generic[M]Base class for message handlers.
Each handler processes a specific type of message.
- handle(message)[source]
Handle a message.
- Parameters:
message (M) – The message to handle
- Return type:
None
- property message_type: type[M]
Return the type of message this handler processes.
- class stabilize.queue.processor.QueueProcessor(queue, config=None, store=None, handler_config=None, task_registry=None, bulkhead_manager=None, circuit_factory=None)[source]
Bases:
QueueProcessorMixinProcesses messages from a queue using registered handlers.
The processor polls the queue at regular intervals and dispatches messages to appropriate handlers. Handlers run in a thread pool for concurrent processing.
When
storeandtask_registryare both provided, all 12 default handlers are registered automatically — no manual registration needed.Example
queue = SqliteQueue(“sqlite:///workflow.db”, table_name=”queue_messages”) store = SqliteWorkflowStore(“sqlite:///workflow.db”, create_tables=True) registry = TaskRegistry() processor = QueueProcessor(queue, store=store, task_registry=registry) processor.start()
- Parameters:
queue (Queue)
config (QueueProcessorConfig | None)
store (WorkflowStore | None)
handler_config (HandlerConfig | None)
task_registry (TaskRegistry | None)
bulkhead_manager (TaskBulkheadManager | None)
circuit_factory (WorkflowCircuitFactory | None)
- property active_count: int
Get the number of actively processing messages.
- property is_running: bool
Check if the processor is running.
- property is_stopping: bool
Check if stop has been requested but not yet completed.
- process_all(timeout=60.0)[source]
Process all messages synchronously until queue is empty.
Thread-safe: uses a processing lock to prevent concurrent calls. Also performs periodic DLQ cleanup for expired messages.
- Parameters:
timeout (float) – Maximum time to wait for processing
- Returns:
Number of messages processed
- Return type:
int
- process_one()[source]
Process a single message synchronously.
Useful for testing and debugging.
- Returns:
True if a message was processed, False otherwise
- Return type:
bool
- register_handler(handler)[source]
Register a message handler.
Raises ValueError if a handler for the same message type is already registered. Use
replace_handler()to override an existing handler.- Parameters:
handler (MessageHandler[Any]) – The handler to register
- Raises:
ValueError – If a handler for this message type is already registered.
- Return type:
None
- register_handler_func(message_type, handler_func)[source]
Register a handler function for a message type.
- Parameters:
message_type (type[M]) – The type of message to handle
handler_func (Callable[[M], None]) – Function to call with the message
- Return type:
None
- replace_handler(handler)[source]
Replace an existing handler.
- Parameters:
handler (MessageHandler[Any]) – The new handler to use
- Raises:
ValueError – If no handler is registered for this message type.
- Return type:
None
- request_stop()[source]
Request graceful stop without blocking.
Sets the stopping flag to stop accepting new messages, but doesn’t wait for active tasks to complete. Use the active_count property to monitor progress.
Example
processor.request_stop() while processor.active_count > 0:
time.sleep(0.1)
processor.stop()
- Return type:
None
- class stabilize.queue.processor.QueueProcessorConfig(poll_frequency_ms=50, max_workers=10, retry_delay=datetime.timedelta(seconds=15), stop_on_error=False, enable_deduplication=True)[source]
Bases:
objectConfiguration for the queue processor.
Values can be loaded from environment variables via HandlerConfig. See HandlerConfig documentation for environment variable names.
- Parameters:
poll_frequency_ms (int)
max_workers (int)
retry_delay (timedelta)
stop_on_error (bool)
enable_deduplication (bool)
- enable_deduplication: bool = True
- classmethod from_handler_config(handler_config=None)[source]
Create QueueProcessorConfig from HandlerConfig.
- Parameters:
handler_config (HandlerConfig | None) – HandlerConfig to use. If None, loads from environment.
- Returns:
QueueProcessorConfig with values from HandlerConfig
- Return type:
- max_workers: int = 10
- poll_frequency_ms: int = 50
- retry_delay: timedelta = datetime.timedelta(seconds=15)
- stop_on_error: bool = False
- class stabilize.queue.processor.SynchronousQueueProcessor(queue, store=None, task_registry=None, config=None, handler_config=None, bulkhead_manager=None, circuit_factory=None)[source]
Bases:
QueueProcessorA synchronous queue processor that processes messages immediately.
Useful for testing where you want deterministic execution order.
Accepts the same parameters as
QueueProcessorfor auto-registration.- Parameters:
queue (Queue)
store (WorkflowStore | None)
task_registry (TaskRegistry | None)
config (QueueProcessorConfig | None)
handler_config (HandlerConfig | None)
bulkhead_manager (TaskBulkheadManager | None)
circuit_factory (WorkflowCircuitFactory | None)
Message types for the queue-based execution engine.
This module defines all message types used in the pipeline execution queue.
- class stabilize.queue.messages.Message(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None)[source]
Bases:
objectBase class for all queue messages.
Each message includes metadata for tracking and debugging.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
- message_id: str | None = None
- created_at: datetime
- attempts: int = 0
- max_attempts: int = 10
- last_error: str | None = None
- last_error_type: str | None = None
- class stabilize.queue.messages.WorkflowLevel(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='')[source]
Bases:
MessageBase class for execution-level messages.
These messages target a specific pipeline execution.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
- execution_type: str = 'PIPELINE'
- execution_id: str = ''
- class stabilize.queue.messages.StartWorkflow(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='')[source]
Bases:
WorkflowLevelMessage to start a pipeline execution.
Triggers the beginning of pipeline execution, starting initial stages.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
- class stabilize.queue.messages.CompleteWorkflow(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', retry_count=0)[source]
Bases:
WorkflowLevelMessage to complete a pipeline execution.
Sent when all stages have completed or execution should be finalized.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
retry_count (int)
- retry_count
Number of times this message has been re-queued while waiting for stages to complete. Used to prevent infinite loops.
- Type:
int
- retry_count: int = 0
- class stabilize.queue.messages.CancelWorkflow(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', user='', reason='')[source]
Bases:
WorkflowLevelMessage to cancel a pipeline execution.
Marks the execution as canceled and stops all running stages.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
user (str)
reason (str)
- user: str = ''
- reason: str = ''
- class stabilize.queue.messages.StartWaitingWorkflows(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, pipeline_config_id='', purge_queue=False)[source]
Bases:
MessageMessage to start any queued/waiting executions for a pipeline config.
Sent after an execution completes when concurrent execution limits are enabled.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
pipeline_config_id (str)
purge_queue (bool)
- pipeline_config_id: str = ''
- purge_queue: bool = False
- class stabilize.queue.messages.StageLevel(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
WorkflowLevelBase class for stage-level messages.
These messages target a specific stage within an execution.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- stage_id
The ID of the stage this message targets
- Type:
str
- retry_count
Number of times this message has been re-queued while waiting for upstream stages to complete. Used to prevent infinite loops.
- Type:
int
- stage_id: str = ''
- retry_count: int = 0
- classmethod from_execution_level(msg, stage_id)[source]
Create a stage-level message from an execution-level message.
- Parameters:
msg (WorkflowLevel)
stage_id (str)
- Return type:
- class stabilize.queue.messages.StartStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage to start a stage.
Checks if upstream stages are complete, then plans and starts the stage.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.CompleteStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage to complete a stage.
Determines stage status, plans after stages, and triggers downstream.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.SkipStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage to skip a stage.
Sets stage status to SKIPPED and triggers downstream stages.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.CancelStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage to cancel a stage.
Cancels any running tasks and marks stage as canceled.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.RestartStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage to restart a stage.
Resets stage status and re-executes from the beginning.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.ResumeStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage to resume a paused stage.
Continues execution from where it was paused.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.ContinueParentStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, phase=SyntheticStageOwner.STAGE_AFTER)[source]
Bases:
StageLevelMessage to continue parent stage after synthetic stage completes.
Sent when a synthetic stage completes to notify its parent.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
phase (SyntheticStageOwner)
- phase: SyntheticStageOwner = 'STAGE_AFTER'
- class stabilize.queue.messages.JumpToStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, target_stage_ref_id='', jump_context=<factory>, jump_outputs=<factory>)[source]
Bases:
StageLevelMessage to jump to a different stage (dynamic routing).
Resets the target stage to NOT_STARTED and starts it with merged context. Used by TaskResult.jump_to() for dynamic flow control.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
target_stage_ref_id (str)
jump_context (dict[str, Any])
jump_outputs (dict[str, Any])
- target_stage_ref_id
The ref_id of the stage to jump to
- Type:
str
- jump_context
Context to merge into target stage
- Type:
dict[str, Any]
- jump_outputs
Outputs to make available to target stage
- Type:
dict[str, Any]
- target_stage_ref_id: str = ''
- jump_context: dict[str, Any]
- jump_outputs: dict[str, Any]
- class stabilize.queue.messages.SignalStage(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, signal_name='', signal_data=<factory>, persistent=False)[source]
Bases:
StageLevelMessage to signal a suspended stage (WCP-23, WCP-24).
For transient triggers (WCP-23), the signal is discarded if the stage is not currently SUSPENDED. For persistent triggers (WCP-24), the signal is buffered if the stage is not ready.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
signal_name (str)
signal_data (dict[str, Any])
persistent (bool)
- signal_name
Name/type of the signal
- Type:
str
- signal_data
Payload data carried by the signal
- Type:
dict[str, Any]
- persistent
If True, buffer signal when stage not ready (WCP-24)
- Type:
bool
- signal_name: str = ''
- signal_data: dict[str, Any]
- persistent: bool = False
- class stabilize.queue.messages.CancelRegion(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', region='')[source]
Bases:
WorkflowLevelMessage to cancel all stages in a named region (WCP-25).
Finds all stages with matching cancel_region and pushes CancelStage for each active one.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
region (str)
- region
The cancel region name to target
- Type:
str
- region: str = ''
- class stabilize.queue.messages.AddMultiInstance(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, instance_context=<factory>)[source]
Bases:
StageLevelMessage to add a new instance to a running multi-instance activity (WCP-15).
Creates a new parallel instance for an MI stage that allows dynamic additions.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
instance_context (dict[str, Any])
- instance_context
Context for the new instance
- Type:
dict[str, Any]
- instance_context: dict[str, Any]
- class stabilize.queue.messages.TaskLevel(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]
Bases:
StageLevelBase class for task-level messages.
These messages target a specific task within a stage.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
- task_id: str = ''
- classmethod from_stage_level(msg, task_id)[source]
Create a task-level message from a stage-level message.
- Parameters:
msg (StageLevel)
task_id (str)
- Return type:
- class stabilize.queue.messages.StartTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]
Bases:
TaskLevelMessage to start a task.
Sets task status to RUNNING and triggers RunTask.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
- class stabilize.queue.messages.RunTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='', task_type='')[source]
Bases:
TaskLevelMessage to execute a task.
Runs the task implementation and handles the result.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
task_type (str)
- task_type: str = ''
- class stabilize.queue.messages.CompleteTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='', status=WorkflowStatus.SUCCEEDED, original_status=None)[source]
Bases:
TaskLevelMessage to complete a task.
Updates task status and triggers next task or stage completion.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
status (WorkflowStatus)
original_status (WorkflowStatus | None)
- status: WorkflowStatus = ('SUCCEEDED', True, False)
- original_status: WorkflowStatus | None = None
- class stabilize.queue.messages.PauseTask(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]
Bases:
TaskLevelMessage to pause a task.
Used when execution is paused - task will resume when execution resumes.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
- class stabilize.queue.messages.InvalidWorkflowId(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='')[source]
Bases:
WorkflowLevelMessage indicating an invalid execution ID was referenced.
Logged and dropped - no further processing.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
- class stabilize.queue.messages.InvalidStageId(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0)[source]
Bases:
StageLevelMessage indicating an invalid stage ID was referenced.
Logged and dropped - no further processing.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
- class stabilize.queue.messages.InvalidTaskId(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='')[source]
Bases:
TaskLevelMessage indicating an invalid task ID was referenced.
Logged and dropped - no further processing.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
- class stabilize.queue.messages.InvalidTaskType(message_id=None, created_at=<factory>, attempts=0, max_attempts=10, last_error=None, last_error_type=None, execution_type='PIPELINE', execution_id='', stage_id='', retry_count=0, task_id='', task_type_name='')[source]
Bases:
TaskLevelMessage indicating an unknown task type was referenced.
Logged and dropped - no further processing.
- Parameters:
message_id (str | None)
created_at (datetime)
attempts (int)
max_attempts (int)
last_error (str | None)
last_error_type (str | None)
execution_type (str)
execution_id (str)
stage_id (str)
retry_count (int)
task_id (str)
task_type_name (str)
- task_type_name: str = ''
- stabilize.queue.messages.get_message_type_name(message)[source]
Get the type name for a message.
- Parameters:
message (Message)
- Return type:
str
- stabilize.queue.messages.create_message_from_dict(type_name, data)[source]
Create a message from a dictionary representation.
- Parameters:
type_name (str) – The message type name
data (dict[str, Any]) – Dictionary of message fields
- Returns:
A Message instance
- Raises:
ValueError – If type_name is unknown
- Return type: