Flow Control
Stabilize implements all 43 Workflow Control-Flow Patterns (WCP) catalogued by van der Aalst et al. This guide covers basic patterns first, then advanced patterns introduced in v0.18.
Basic Patterns
Sequence (WCP-1)
Stages execute one after another via requisite_stage_ref_ids:
stages=[
StageExecution(ref_id="A", ...),
StageExecution(ref_id="B", requisite_stage_ref_ids={"A"}, ...),
StageExecution(ref_id="C", requisite_stage_ref_ids={"B"}, ...),
]
Parallel Split / AND-Split (WCP-2)
Multiple stages depending on the same upstream run in parallel automatically:
# A
# / \
# B C
# \ /
# D
stages=[
StageExecution(ref_id="A", ...),
StageExecution(ref_id="B", requisite_stage_ref_ids={"A"}, ...),
StageExecution(ref_id="C", requisite_stage_ref_ids={"A"}, ...),
StageExecution(ref_id="D", requisite_stage_ref_ids={"B", "C"}, ...),
]
Synchronization / AND-Join (WCP-3)
Stage D above waits for all upstreams to complete before starting. This is
the default join_type=JoinType.AND.
Exclusive Choice / XOR-Split (WCP-4)
Use stageEnabled in stage context or TaskResult.jump_to() for dynamic
routing:
# Option 1: Conditional stage enablement
StageExecution(
ref_id="deploy_prod",
context={"stageEnabled": {"type": "expression", "expression": "env == 'production'"}},
...
)
# Option 2: Dynamic routing via jump_to
class RouterTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
if stage.context.get("tests_passed"):
return TaskResult.jump_to("deploy_stage")
else:
return TaskResult.jump_to("fix_stage")
Simple Merge / XOR-Join (WCP-5)
A stage with a single requisite_stage_ref_id that serves as a merge point
when only one upstream branch is ever active.
Advanced Branching Patterns
OR-Split / Multi-Choice (WCP-6)
Diverge into multiple branches where one or more are chosen based on
conditions. Set split_type=SplitType.OR and provide per-downstream
conditions in split_conditions:
from stabilize.models.stage import SplitType
# Triage stage: dispatch police and/or ambulance and/or fire
StageExecution(
ref_id="triage",
type="triage",
name="Triage",
split_type=SplitType.OR,
split_conditions={
"police": "emergency_type == 'crime' or emergency_type == 'accident'",
"ambulance": "injury_severity > 0",
"fire": "fire_detected == True",
},
...
),
StageExecution(ref_id="police", requisite_stage_ref_ids={"triage"}, ...),
StageExecution(ref_id="ambulance", requisite_stage_ref_ids={"triage"}, ...),
StageExecution(ref_id="fire", requisite_stage_ref_ids={"triage"}, ...),
Conditions are evaluated using the safe expression evaluator
(stabilize.expressions.evaluate_expression). Expressions can reference
values from both stage.context and stage.outputs.
Supported expression syntax:
Comparisons:
==,!=,<,<=,>,>=,in,not inBoolean:
and,or,notLiterals: strings, numbers,
True,False,NoneContext lookups:
key_name,nested.key,dict["key"]
Branches with conditions that evaluate to False are automatically skipped.
OR-Join / Structured Synchronizing Merge (WCP-7)
Wait only for activated branches from a paired OR-split. Set
join_type=JoinType.OR. The OR-split records activated branch ref_ids in
the join stage’s context under _activated_branches:
from stabilize.models.stage import JoinType
StageExecution(
ref_id="merge",
join_type=JoinType.OR,
requisite_stage_ref_ids={"police", "ambulance", "fire"},
...
)
The merge stage fires as soon as all activated upstreams complete. Skipped branches are ignored.
Multi-Merge (WCP-8)
Each upstream completion independently triggers the downstream stage — no synchronization:
StageExecution(
ref_id="quality_review",
join_type=JoinType.MULTI_MERGE,
requisite_stage_ref_ids={"foundations", "materials", "laborers"},
...
)
Discriminator / 1-out-of-N Join (WCP-9)
Fire on the first upstream completion, ignore the rest:
# Check breathing and check pulse run in parallel.
# Start triage as soon as the first check completes.
StageExecution(
ref_id="triage",
join_type=JoinType.DISCRIMINATOR,
requisite_stage_ref_ids={"check_breathing", "check_pulse"},
...
)
N-of-M Partial Join (WCP-30)
Fire when N of M upstreams complete:
# 5 reviewers assigned, proceed after 3 respond
StageExecution(
ref_id="proceed",
join_type=JoinType.N_OF_M,
join_threshold=3,
requisite_stage_ref_ids={"r1", "r2", "r3", "r4", "r5"},
...
)
State-Based Patterns
Deferred Choice (WCP-16)
A race between branches — the first to start execution wins, others are
cancelled. Group competing stages with deferred_choice_group:
# Agent picks up complaint OR manager escalation timer fires
StageExecution(
ref_id="agent_contact",
deferred_choice_group="complaint_response",
...
),
StageExecution(
ref_id="escalate_to_manager",
deferred_choice_group="complaint_response",
...
),
When one stage in the group starts running, all siblings in the same group are automatically cancelled.
Milestone Gating (WCP-18)
An activity is only enabled when a milestone stage is in a required status. If the milestone has already passed, the activity is skipped:
# Route change is only allowed while the ticket is RUNNING (not yet issued)
StageExecution(
ref_id="route_change",
milestone_ref_id="issue_ticket",
milestone_status="RUNNING",
...
),
Mutual Exclusion / Critical Section (WCP-17, 39, 40)
Prevent concurrent execution of stages sharing a critical resource. Stages
with the same mutex_key cannot run simultaneously:
# Two branches both access a shared database
StageExecution(
ref_id="update_inventory",
mutex_key="shared_db",
...
),
StageExecution(
ref_id="update_ledger",
mutex_key="shared_db",
...
),
When a mutex-blocked stage tries to start, it is re-queued with a delay until the lock holder completes.
Cancellation Patterns
Cancel Task (WCP-19)
Cancel a running stage via CancelStage message. Tasks have an on_cancel()
hook for cleanup.
Cancel Case (WCP-20)
Cancel an entire workflow via CancelWorkflow message. Propagates
CancelStage to all active stages.
Cancel Region (WCP-25)
Cancel all stages in a named region at once. Tag stages with
cancel_region:
from stabilize.queue.messages import CancelRegion
# Tag stages with a region name
StageExecution(ref_id="access_evidence_1", cancel_region="evidence_access", ...),
StageExecution(ref_id="access_evidence_2", cancel_region="evidence_access", ...),
# Cancel all stages in the region
queue.push(CancelRegion(
execution_type="workflow",
execution_id=workflow.id,
region="evidence_access",
))
Trigger Patterns (Signals)
Stabilize supports external signal-based triggers with two semantics:
TaskResult.suspend()
A task can suspend itself to wait for an external signal:
from stabilize import Task, TaskResult
class ApprovalTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
# Check if we've received the signal
signal_name = stage.context.get("_signal_name")
if signal_name == "approved":
return TaskResult.success(
outputs={"approved_by": stage.context.get("_signal_data", {}).get("user")}
)
# Suspend and wait for signal
return TaskResult.suspend()
Transient Trigger (WCP-23)
Send a signal to a stage. If the stage is not currently SUSPENDED, the
signal is discarded:
from stabilize.queue.messages import SignalStage
queue.push(SignalStage(
execution_type="workflow",
execution_id=workflow.id,
stage_id=stage.id,
signal_name="approved",
signal_data={"user": "alice"},
persistent=False, # Transient — discarded if not ready
))
Persistent Trigger (WCP-24)
Like a transient trigger, but the signal is buffered if the stage is not
yet suspended. Buffered signals are automatically consumed when the stage
enters SUSPENDED status:
queue.push(SignalStage(
execution_type="workflow",
execution_id=workflow.id,
stage_id=stage.id,
signal_name="data_ready",
signal_data={"batch_id": 42},
persistent=True, # Buffered until stage is ready
))
Signal data is available in the resumed task via stage.context["_signal_name"]
and stage.context["_signal_data"].
Iteration Patterns
Arbitrary Cycles (WCP-10)
TaskResult.jump_to() enables jumping to any stage (forward or backward).
The target stage is reset to NOT_STARTED. Max 10 jumps by default to
prevent infinite loops.
Structured Loops (WCP-21)
Use LoopBuilder for structured while and repeat-until patterns:
from stabilize.stages.loop_builder import LoopBuilder
# While loop: check condition first, then execute body
stages = LoopBuilder.while_loop(
condition="iteration_count < max_iterations",
body_stages=[stage_a, stage_b],
loop_ref_prefix="retry_loop",
max_iterations=100,
)
# Repeat-until loop: execute body first, then check condition
stages = LoopBuilder.repeat_until(
condition="tests_passed == True",
body_stages=[stage_a, stage_b],
loop_ref_prefix="test_loop",
)
Recursion / Sub-Workflows (WCP-22)
Use SubWorkflowTask to start a child workflow and wait for its completion:
from stabilize.tasks.sub_workflow import SubWorkflowTask
registry.register("sub_workflow", SubWorkflowTask)
StageExecution(
ref_id="resolve_sub_defects",
type="sub_workflow",
context={
"_sub_workflow_config": {
"name": "Resolve Sub-Defect",
"application": "defect-tracker",
"stages": [...],
"context": {"defect_id": "DEF-456"},
},
},
...
)
Recursion depth is tracked via _recursion_depth (default max: 10).
Multiple Instance Patterns
Use MultiInstanceBuilder to create parallel instances of a stage.
Fixed Count (WCP-12, 13)
from stabilize.stages.multi_instance_builder import MultiInstanceBuilder
parent = StageExecution(ref_id="review", type="review", name="Review", ...)
# WCP-13: 6 reviewers, wait for all
instance_stages = MultiInstanceBuilder.create_fixed(
parent_stage=parent,
count=6,
instance_type="review",
instance_name_prefix="Reviewer",
sync_on_complete=True,
)
# WCP-12: Fire and forget (no synchronization)
instance_stages = MultiInstanceBuilder.create_fixed(
parent_stage=parent,
count=3,
sync_on_complete=False,
)
Runtime Count from Context (WCP-14)
parent = StageExecution(
ref_id="review", type="review", name="Review",
context={"num_reviewers": 4},
...
)
instance_stages = MultiInstanceBuilder.create_from_context(
parent_stage=parent,
count_key="num_reviewers",
)
Collection-Based Instances
parent = StageExecution(
ref_id="process_items", type="process", name="Process",
context={"items": ["item_a", "item_b", "item_c"]},
...
)
instance_stages = MultiInstanceBuilder.create_from_collection(
parent_stage=parent,
collection_key="items",
item_context_key="current_item",
)
Dynamic Instances (WCP-15)
Instances can be added during execution via AddMultiInstance messages:
instance_stages = MultiInstanceBuilder.create_dynamic(
parent_stage=parent,
initial_count=2,
)
# Later, during execution:
from stabilize.queue.messages import AddMultiInstance
queue.push(AddMultiInstance(
execution_type="workflow",
execution_id=workflow.id,
stage_id=parent_stage.id,
instance_context={"item": "new_item"},
))
N-of-M with Multiple Instances (WCP-34)
# Proceed after 3 of 5 reviewers respond
instance_stages = MultiInstanceBuilder.create_fixed(
parent_stage=parent,
count=5,
join_threshold=3,
cancel_remaining=True, # Cancel remaining after threshold
)
Synthetic Stages
Synthetic stages are dynamically injected stages that run before, after, or on failure of a parent stage. They’re used for setup, cleanup, validation, and rollback.
SyntheticStageOwner Enum
from stabilize.models.stage import SyntheticStageOwner
SyntheticStageOwner.STAGE_BEFORE # Runs before parent's tasks
SyntheticStageOwner.STAGE_AFTER # Runs after parent completes
Creating Synthetic Stages
Use StageExecution.create_synthetic() to create synthetic stages:
from stabilize import StageExecution, TaskExecution
from stabilize.models.stage import SyntheticStageOwner
validation = StageExecution.create_synthetic(
type="shell",
name="Validate Configuration",
parent=parent_stage,
owner=SyntheticStageOwner.STAGE_BEFORE,
context={"command": "validate-config.sh"},
tasks=[
TaskExecution.create("Validate", "shell", stage_start=True, stage_end=True)
],
)
StageDefinitionBuilder
Create custom builders to define synthetic stages for your stage types:
from stabilize import StageExecution, TaskExecution
from stabilize.stages.builder import StageDefinitionBuilder
from stabilize.dag.graph import StageGraphBuilder
from stabilize.models.stage import SyntheticStageOwner
class DeployStageBuilder(StageDefinitionBuilder):
@property
def type(self) -> str:
return "deploy"
def build_tasks(self, stage: StageExecution) -> list[TaskExecution]:
return [
TaskExecution.create(
name="Deploy Application",
implementing_class="shell",
stage_start=True,
stage_end=True,
),
]
def before_stages(
self,
stage: StageExecution,
graph: StageGraphBuilder,
) -> None:
"""Add validation stage that runs BEFORE deploy tasks."""
validation = StageExecution.create_synthetic(
type="shell",
name="Validate Configuration",
parent=stage,
owner=SyntheticStageOwner.STAGE_BEFORE,
context={"command": "validate-config.sh"},
tasks=[
TaskExecution.create("Validate", "shell", stage_start=True, stage_end=True)
],
)
graph.add(validation)
def after_stages(
self,
stage: StageExecution,
graph: StageGraphBuilder,
) -> None:
"""Add notification stage that runs AFTER deploy succeeds."""
notify = StageExecution.create_synthetic(
type="http",
name="Send Notification",
parent=stage,
owner=SyntheticStageOwner.STAGE_AFTER,
context={"url": "https://hooks.slack.com/...", "method": "POST"},
tasks=[
TaskExecution.create("Notify", "http", stage_start=True, stage_end=True)
],
)
graph.add(notify)
def on_failure_stages(
self,
stage: StageExecution,
graph: StageGraphBuilder,
) -> None:
"""Add rollback stage that runs ONLY if deploy fails."""
rollback = StageExecution.create_synthetic(
type="shell",
name="Rollback Deployment",
parent=stage,
owner=SyntheticStageOwner.STAGE_AFTER,
context={"command": "rollback.sh"},
tasks=[
TaskExecution.create("Rollback", "shell", stage_start=True, stage_end=True)
],
)
graph.add(rollback)
Registering Builders
from stabilize.stages.builder import register_builder, StageDefinitionBuilderFactory
# Option 1: Use global factory
register_builder(DeployStageBuilder())
# Option 2: Create custom factory
factory = StageDefinitionBuilderFactory()
factory.register(DeployStageBuilder())
Execution Order
before_stages()synthetic stages execute first (in dependency order)Parent stage’s tasks execute
after_stages()synthetic stages execute (on success)on_failure_stages()synthetic stages execute (on failure only)
The ContinueParentStageHandler manages transitions between synthetic stages and notifies the parent when all children complete.
Concurrency Limits
Limit concurrent executions for a specific pipeline configuration.
config = {
"limitConcurrent": True,
"maxConcurrentExecutions": 5,
"keepWaitingPipelines": True
}
If the limit is reached, new executions enter BUFFERED state and are started automatically when slots free up.
Dynamic Routing
Dynamic routing allows tasks to redirect execution to a different stage based on runtime conditions. This enables patterns like retry loops, conditional branching, and error recovery flows.
TaskResult.jump_to()
Use TaskResult.jump_to() to redirect execution to another stage:
from stabilize import Task, TaskResult, StageExecution
class RouterTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
if stage.context.get("tests_passed"):
return TaskResult.success()
else:
# Jump to another stage with context
return TaskResult.jump_to(
"implement_stage",
context={"retry_reason": "tests failed"}
)
The target stage is reset to NOT_STARTED and re-executed with the merged context.
Jump Count Limiting
To prevent infinite loops, jump count is tracked and limited:
Default maximum: 10 jumps per execution
Configurable via
_max_jumpsin execution contextJump history recorded in
_jump_historyfor debugging
# Set custom max jumps
execution = Workflow.create(
...,
context={"_max_jumps": 5}
)
Stateful Retries
TransientError supports preserving state across retry attempts with context_update:
from stabilize import Task, TaskResult, TransientError
class ProgressTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
processed = stage.context.get("processed_items", 0)
try:
new_processed = process_batch(processed)
return TaskResult.success(outputs={"total": new_processed})
except RateLimitError:
# Preserve progress for next retry
raise TransientError(
"Rate limited",
retry_after=30,
context_update={"processed_items": processed + 10}
)
The context_update dict is merged into stage.context before the retry, allowing tasks to resume from where they left off.
StageExecution Control-Flow Fields Reference
All fields added to StageExecution for advanced control-flow patterns:
Field |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Join semantics: AND, OR, MULTI_MERGE, DISCRIMINATOR, N_OF_M |
|
|
|
For N_OF_M join: how many upstreams needed (0 = all) |
|
|
|
Split semantics: AND (all downstream), OR (conditional) |
|
|
|
Map of downstream_ref_id to condition expression |
|
|
|
Multi-instance configuration |
|
|
|
Group name for deferred choice race |
|
|
|
Ref ID of milestone stage to check |
|
|
|
Required status name of milestone stage |
|
|
|
Named mutex for critical section |
|
|
|
Named region for group cancellation |
Key Files
src/stabilize/stages/builder.py- StageDefinitionBuilder and factorysrc/stabilize/dag/graph.py- StageGraphBuildersrc/stabilize/dag/readiness.py- Readiness evaluation with join type dispatchsrc/stabilize/expressions.py- Safe expression evaluator for conditionssrc/stabilize/models/stage.py- JoinType, SplitType enums, all control-flow fieldssrc/stabilize/models/multi_instance.py- MultiInstanceConfig dataclasssrc/stabilize/stages/multi_instance_builder.py- MultiInstanceBuildersrc/stabilize/stages/loop_builder.py- LoopBuilder for structured loopssrc/stabilize/tasks/sub_workflow.py- SubWorkflowTask for recursionsrc/stabilize/tasks/result.py- TaskResult with jump_to() and suspend()src/stabilize/handlers/signal_stage.py- SignalStageHandler for triggerssrc/stabilize/handlers/cancel_region.py- CancelRegionHandlersrc/stabilize/handlers/add_multi_instance.py- AddMultiInstanceHandlersrc/stabilize/handlers/continue_parent_stage.py- ContinueParentStageHandlersrc/stabilize/handlers/jump_to_stage.py- JumpToStageHandler for dynamic routingsrc/stabilize/errors.py- TransientError with context_update parameter