Flow Control

Stabilize implements all 43 Workflow Control-Flow Patterns (WCP) catalogued by van der Aalst et al. This guide covers basic patterns first, then advanced patterns introduced in v0.18.

Basic Patterns

Sequence (WCP-1)

Stages execute one after another via requisite_stage_ref_ids:

stages=[
    StageExecution(ref_id="A", ...),
    StageExecution(ref_id="B", requisite_stage_ref_ids={"A"}, ...),
    StageExecution(ref_id="C", requisite_stage_ref_ids={"B"}, ...),
]

Parallel Split / AND-Split (WCP-2)

Multiple stages depending on the same upstream run in parallel automatically:

#      A
#     / \
#    B   C
#     \ /
#      D

stages=[
    StageExecution(ref_id="A", ...),
    StageExecution(ref_id="B", requisite_stage_ref_ids={"A"}, ...),
    StageExecution(ref_id="C", requisite_stage_ref_ids={"A"}, ...),
    StageExecution(ref_id="D", requisite_stage_ref_ids={"B", "C"}, ...),
]

Synchronization / AND-Join (WCP-3)

Stage D above waits for all upstreams to complete before starting. This is the default join_type=JoinType.AND.

Exclusive Choice / XOR-Split (WCP-4)

Use stageEnabled in stage context or TaskResult.jump_to() for dynamic routing:

# Option 1: Conditional stage enablement
StageExecution(
    ref_id="deploy_prod",
    context={"stageEnabled": {"type": "expression", "expression": "env == 'production'"}},
    ...
)

# Option 2: Dynamic routing via jump_to
class RouterTask(Task):
    def execute(self, stage: StageExecution) -> TaskResult:
        if stage.context.get("tests_passed"):
            return TaskResult.jump_to("deploy_stage")
        else:
            return TaskResult.jump_to("fix_stage")

Simple Merge / XOR-Join (WCP-5)

A stage with a single requisite_stage_ref_id that serves as a merge point when only one upstream branch is ever active.

Advanced Branching Patterns

OR-Split / Multi-Choice (WCP-6)

Diverge into multiple branches where one or more are chosen based on conditions. Set split_type=SplitType.OR and provide per-downstream conditions in split_conditions:

from stabilize.models.stage import SplitType

# Triage stage: dispatch police and/or ambulance and/or fire
StageExecution(
    ref_id="triage",
    type="triage",
    name="Triage",
    split_type=SplitType.OR,
    split_conditions={
        "police": "emergency_type == 'crime' or emergency_type == 'accident'",
        "ambulance": "injury_severity > 0",
        "fire": "fire_detected == True",
    },
    ...
),
StageExecution(ref_id="police", requisite_stage_ref_ids={"triage"}, ...),
StageExecution(ref_id="ambulance", requisite_stage_ref_ids={"triage"}, ...),
StageExecution(ref_id="fire", requisite_stage_ref_ids={"triage"}, ...),

Conditions are evaluated using the safe expression evaluator (stabilize.expressions.evaluate_expression). Expressions can reference values from both stage.context and stage.outputs.

Supported expression syntax:

  • Comparisons: ==, !=, <, <=, >, >=, in, not in

  • Boolean: and, or, not

  • Literals: strings, numbers, True, False, None

  • Context lookups: key_name, nested.key, dict["key"]

Branches with conditions that evaluate to False are automatically skipped.

OR-Join / Structured Synchronizing Merge (WCP-7)

Wait only for activated branches from a paired OR-split. Set join_type=JoinType.OR. The OR-split records activated branch ref_ids in the join stage’s context under _activated_branches:

from stabilize.models.stage import JoinType

StageExecution(
    ref_id="merge",
    join_type=JoinType.OR,
    requisite_stage_ref_ids={"police", "ambulance", "fire"},
    ...
)

The merge stage fires as soon as all activated upstreams complete. Skipped branches are ignored.

Multi-Merge (WCP-8)

Each upstream completion independently triggers the downstream stage — no synchronization:

StageExecution(
    ref_id="quality_review",
    join_type=JoinType.MULTI_MERGE,
    requisite_stage_ref_ids={"foundations", "materials", "laborers"},
    ...
)

Discriminator / 1-out-of-N Join (WCP-9)

Fire on the first upstream completion, ignore the rest:

# Check breathing and check pulse run in parallel.
# Start triage as soon as the first check completes.
StageExecution(
    ref_id="triage",
    join_type=JoinType.DISCRIMINATOR,
    requisite_stage_ref_ids={"check_breathing", "check_pulse"},
    ...
)

N-of-M Partial Join (WCP-30)

Fire when N of M upstreams complete:

# 5 reviewers assigned, proceed after 3 respond
StageExecution(
    ref_id="proceed",
    join_type=JoinType.N_OF_M,
    join_threshold=3,
    requisite_stage_ref_ids={"r1", "r2", "r3", "r4", "r5"},
    ...
)

State-Based Patterns

Deferred Choice (WCP-16)

A race between branches — the first to start execution wins, others are cancelled. Group competing stages with deferred_choice_group:

# Agent picks up complaint OR manager escalation timer fires
StageExecution(
    ref_id="agent_contact",
    deferred_choice_group="complaint_response",
    ...
),
StageExecution(
    ref_id="escalate_to_manager",
    deferred_choice_group="complaint_response",
    ...
),

When one stage in the group starts running, all siblings in the same group are automatically cancelled.

Milestone Gating (WCP-18)

An activity is only enabled when a milestone stage is in a required status. If the milestone has already passed, the activity is skipped:

# Route change is only allowed while the ticket is RUNNING (not yet issued)
StageExecution(
    ref_id="route_change",
    milestone_ref_id="issue_ticket",
    milestone_status="RUNNING",
    ...
),

Mutual Exclusion / Critical Section (WCP-17, 39, 40)

Prevent concurrent execution of stages sharing a critical resource. Stages with the same mutex_key cannot run simultaneously:

# Two branches both access a shared database
StageExecution(
    ref_id="update_inventory",
    mutex_key="shared_db",
    ...
),
StageExecution(
    ref_id="update_ledger",
    mutex_key="shared_db",
    ...
),

When a mutex-blocked stage tries to start, it is re-queued with a delay until the lock holder completes.

Cancellation Patterns

Cancel Task (WCP-19)

Cancel a running stage via CancelStage message. Tasks have an on_cancel() hook for cleanup.

Cancel Case (WCP-20)

Cancel an entire workflow via CancelWorkflow message. Propagates CancelStage to all active stages.

Cancel Region (WCP-25)

Cancel all stages in a named region at once. Tag stages with cancel_region:

from stabilize.queue.messages import CancelRegion

# Tag stages with a region name
StageExecution(ref_id="access_evidence_1", cancel_region="evidence_access", ...),
StageExecution(ref_id="access_evidence_2", cancel_region="evidence_access", ...),

# Cancel all stages in the region
queue.push(CancelRegion(
    execution_type="workflow",
    execution_id=workflow.id,
    region="evidence_access",
))

Trigger Patterns (Signals)

Stabilize supports external signal-based triggers with two semantics:

TaskResult.suspend()

A task can suspend itself to wait for an external signal:

from stabilize import Task, TaskResult

class ApprovalTask(Task):
    def execute(self, stage: StageExecution) -> TaskResult:
        # Check if we've received the signal
        signal_name = stage.context.get("_signal_name")
        if signal_name == "approved":
            return TaskResult.success(
                outputs={"approved_by": stage.context.get("_signal_data", {}).get("user")}
            )

        # Suspend and wait for signal
        return TaskResult.suspend()

Transient Trigger (WCP-23)

Send a signal to a stage. If the stage is not currently SUSPENDED, the signal is discarded:

from stabilize.queue.messages import SignalStage

queue.push(SignalStage(
    execution_type="workflow",
    execution_id=workflow.id,
    stage_id=stage.id,
    signal_name="approved",
    signal_data={"user": "alice"},
    persistent=False,  # Transient — discarded if not ready
))

Persistent Trigger (WCP-24)

Like a transient trigger, but the signal is buffered if the stage is not yet suspended. Buffered signals are automatically consumed when the stage enters SUSPENDED status:

queue.push(SignalStage(
    execution_type="workflow",
    execution_id=workflow.id,
    stage_id=stage.id,
    signal_name="data_ready",
    signal_data={"batch_id": 42},
    persistent=True,  # Buffered until stage is ready
))

Signal data is available in the resumed task via stage.context["_signal_name"] and stage.context["_signal_data"].

Iteration Patterns

Arbitrary Cycles (WCP-10)

TaskResult.jump_to() enables jumping to any stage (forward or backward). The target stage is reset to NOT_STARTED. Max 10 jumps by default to prevent infinite loops.

Structured Loops (WCP-21)

Use LoopBuilder for structured while and repeat-until patterns:

from stabilize.stages.loop_builder import LoopBuilder

# While loop: check condition first, then execute body
stages = LoopBuilder.while_loop(
    condition="iteration_count < max_iterations",
    body_stages=[stage_a, stage_b],
    loop_ref_prefix="retry_loop",
    max_iterations=100,
)

# Repeat-until loop: execute body first, then check condition
stages = LoopBuilder.repeat_until(
    condition="tests_passed == True",
    body_stages=[stage_a, stage_b],
    loop_ref_prefix="test_loop",
)

Recursion / Sub-Workflows (WCP-22)

Use SubWorkflowTask to start a child workflow and wait for its completion:

from stabilize.tasks.sub_workflow import SubWorkflowTask

registry.register("sub_workflow", SubWorkflowTask)

StageExecution(
    ref_id="resolve_sub_defects",
    type="sub_workflow",
    context={
        "_sub_workflow_config": {
            "name": "Resolve Sub-Defect",
            "application": "defect-tracker",
            "stages": [...],
            "context": {"defect_id": "DEF-456"},
        },
    },
    ...
)

Recursion depth is tracked via _recursion_depth (default max: 10).

Multiple Instance Patterns

Use MultiInstanceBuilder to create parallel instances of a stage.

Fixed Count (WCP-12, 13)

from stabilize.stages.multi_instance_builder import MultiInstanceBuilder

parent = StageExecution(ref_id="review", type="review", name="Review", ...)

# WCP-13: 6 reviewers, wait for all
instance_stages = MultiInstanceBuilder.create_fixed(
    parent_stage=parent,
    count=6,
    instance_type="review",
    instance_name_prefix="Reviewer",
    sync_on_complete=True,
)

# WCP-12: Fire and forget (no synchronization)
instance_stages = MultiInstanceBuilder.create_fixed(
    parent_stage=parent,
    count=3,
    sync_on_complete=False,
)

Runtime Count from Context (WCP-14)

parent = StageExecution(
    ref_id="review", type="review", name="Review",
    context={"num_reviewers": 4},
    ...
)

instance_stages = MultiInstanceBuilder.create_from_context(
    parent_stage=parent,
    count_key="num_reviewers",
)

Collection-Based Instances

parent = StageExecution(
    ref_id="process_items", type="process", name="Process",
    context={"items": ["item_a", "item_b", "item_c"]},
    ...
)

instance_stages = MultiInstanceBuilder.create_from_collection(
    parent_stage=parent,
    collection_key="items",
    item_context_key="current_item",
)

Dynamic Instances (WCP-15)

Instances can be added during execution via AddMultiInstance messages:

instance_stages = MultiInstanceBuilder.create_dynamic(
    parent_stage=parent,
    initial_count=2,
)

# Later, during execution:
from stabilize.queue.messages import AddMultiInstance

queue.push(AddMultiInstance(
    execution_type="workflow",
    execution_id=workflow.id,
    stage_id=parent_stage.id,
    instance_context={"item": "new_item"},
))

N-of-M with Multiple Instances (WCP-34)

# Proceed after 3 of 5 reviewers respond
instance_stages = MultiInstanceBuilder.create_fixed(
    parent_stage=parent,
    count=5,
    join_threshold=3,
    cancel_remaining=True,  # Cancel remaining after threshold
)

Synthetic Stages

Synthetic stages are dynamically injected stages that run before, after, or on failure of a parent stage. They’re used for setup, cleanup, validation, and rollback.

SyntheticStageOwner Enum

from stabilize.models.stage import SyntheticStageOwner

SyntheticStageOwner.STAGE_BEFORE  # Runs before parent's tasks
SyntheticStageOwner.STAGE_AFTER   # Runs after parent completes

Creating Synthetic Stages

Use StageExecution.create_synthetic() to create synthetic stages:

from stabilize import StageExecution, TaskExecution
from stabilize.models.stage import SyntheticStageOwner

validation = StageExecution.create_synthetic(
    type="shell",
    name="Validate Configuration",
    parent=parent_stage,
    owner=SyntheticStageOwner.STAGE_BEFORE,
    context={"command": "validate-config.sh"},
    tasks=[
        TaskExecution.create("Validate", "shell", stage_start=True, stage_end=True)
    ],
)

StageDefinitionBuilder

Create custom builders to define synthetic stages for your stage types:

from stabilize import StageExecution, TaskExecution
from stabilize.stages.builder import StageDefinitionBuilder
from stabilize.dag.graph import StageGraphBuilder
from stabilize.models.stage import SyntheticStageOwner

class DeployStageBuilder(StageDefinitionBuilder):
    @property
    def type(self) -> str:
        return "deploy"

    def build_tasks(self, stage: StageExecution) -> list[TaskExecution]:
        return [
            TaskExecution.create(
                name="Deploy Application",
                implementing_class="shell",
                stage_start=True,
                stage_end=True,
            ),
        ]

    def before_stages(
        self,
        stage: StageExecution,
        graph: StageGraphBuilder,
    ) -> None:
        """Add validation stage that runs BEFORE deploy tasks."""
        validation = StageExecution.create_synthetic(
            type="shell",
            name="Validate Configuration",
            parent=stage,
            owner=SyntheticStageOwner.STAGE_BEFORE,
            context={"command": "validate-config.sh"},
            tasks=[
                TaskExecution.create("Validate", "shell", stage_start=True, stage_end=True)
            ],
        )
        graph.add(validation)

    def after_stages(
        self,
        stage: StageExecution,
        graph: StageGraphBuilder,
    ) -> None:
        """Add notification stage that runs AFTER deploy succeeds."""
        notify = StageExecution.create_synthetic(
            type="http",
            name="Send Notification",
            parent=stage,
            owner=SyntheticStageOwner.STAGE_AFTER,
            context={"url": "https://hooks.slack.com/...", "method": "POST"},
            tasks=[
                TaskExecution.create("Notify", "http", stage_start=True, stage_end=True)
            ],
        )
        graph.add(notify)

    def on_failure_stages(
        self,
        stage: StageExecution,
        graph: StageGraphBuilder,
    ) -> None:
        """Add rollback stage that runs ONLY if deploy fails."""
        rollback = StageExecution.create_synthetic(
            type="shell",
            name="Rollback Deployment",
            parent=stage,
            owner=SyntheticStageOwner.STAGE_AFTER,
            context={"command": "rollback.sh"},
            tasks=[
                TaskExecution.create("Rollback", "shell", stage_start=True, stage_end=True)
            ],
        )
        graph.add(rollback)

Registering Builders

from stabilize.stages.builder import register_builder, StageDefinitionBuilderFactory

# Option 1: Use global factory
register_builder(DeployStageBuilder())

# Option 2: Create custom factory
factory = StageDefinitionBuilderFactory()
factory.register(DeployStageBuilder())

Execution Order

  1. before_stages() synthetic stages execute first (in dependency order)

  2. Parent stage’s tasks execute

  3. after_stages() synthetic stages execute (on success)

  4. on_failure_stages() synthetic stages execute (on failure only)

The ContinueParentStageHandler manages transitions between synthetic stages and notifies the parent when all children complete.

Concurrency Limits

Limit concurrent executions for a specific pipeline configuration.

config = {
    "limitConcurrent": True,
    "maxConcurrentExecutions": 5,
    "keepWaitingPipelines": True
}

If the limit is reached, new executions enter BUFFERED state and are started automatically when slots free up.

Dynamic Routing

Dynamic routing allows tasks to redirect execution to a different stage based on runtime conditions. This enables patterns like retry loops, conditional branching, and error recovery flows.

TaskResult.jump_to()

Use TaskResult.jump_to() to redirect execution to another stage:

from stabilize import Task, TaskResult, StageExecution

class RouterTask(Task):
    def execute(self, stage: StageExecution) -> TaskResult:
        if stage.context.get("tests_passed"):
            return TaskResult.success()
        else:
            # Jump to another stage with context
            return TaskResult.jump_to(
                "implement_stage",
                context={"retry_reason": "tests failed"}
            )

The target stage is reset to NOT_STARTED and re-executed with the merged context.

Jump Count Limiting

To prevent infinite loops, jump count is tracked and limited:

  • Default maximum: 10 jumps per execution

  • Configurable via _max_jumps in execution context

  • Jump history recorded in _jump_history for debugging

# Set custom max jumps
execution = Workflow.create(
    ...,
    context={"_max_jumps": 5}
)

Stateful Retries

TransientError supports preserving state across retry attempts with context_update:

from stabilize import Task, TaskResult, TransientError

class ProgressTask(Task):
    def execute(self, stage: StageExecution) -> TaskResult:
        processed = stage.context.get("processed_items", 0)
        try:
            new_processed = process_batch(processed)
            return TaskResult.success(outputs={"total": new_processed})
        except RateLimitError:
            # Preserve progress for next retry
            raise TransientError(
                "Rate limited",
                retry_after=30,
                context_update={"processed_items": processed + 10}
            )

The context_update dict is merged into stage.context before the retry, allowing tasks to resume from where they left off.

StageExecution Control-Flow Fields Reference

All fields added to StageExecution for advanced control-flow patterns:

Field

Type

Default

Description

join_type

JoinType

AND

Join semantics: AND, OR, MULTI_MERGE, DISCRIMINATOR, N_OF_M

join_threshold

int

0

For N_OF_M join: how many upstreams needed (0 = all)

split_type

SplitType

AND

Split semantics: AND (all downstream), OR (conditional)

split_conditions

dict[str, str]

{}

Map of downstream_ref_id to condition expression

mi_config

MultiInstanceConfig

None

Multi-instance configuration

deferred_choice_group

str

None

Group name for deferred choice race

milestone_ref_id

str

None

Ref ID of milestone stage to check

milestone_status

str

None

Required status name of milestone stage

mutex_key

str

None

Named mutex for critical section

cancel_region

str

None

Named region for group cancellation

Key Files

  • src/stabilize/stages/builder.py - StageDefinitionBuilder and factory

  • src/stabilize/dag/graph.py - StageGraphBuilder

  • src/stabilize/dag/readiness.py - Readiness evaluation with join type dispatch

  • src/stabilize/expressions.py - Safe expression evaluator for conditions

  • src/stabilize/models/stage.py - JoinType, SplitType enums, all control-flow fields

  • src/stabilize/models/multi_instance.py - MultiInstanceConfig dataclass

  • src/stabilize/stages/multi_instance_builder.py - MultiInstanceBuilder

  • src/stabilize/stages/loop_builder.py - LoopBuilder for structured loops

  • src/stabilize/tasks/sub_workflow.py - SubWorkflowTask for recursion

  • src/stabilize/tasks/result.py - TaskResult with jump_to() and suspend()

  • src/stabilize/handlers/signal_stage.py - SignalStageHandler for triggers

  • src/stabilize/handlers/cancel_region.py - CancelRegionHandler

  • src/stabilize/handlers/add_multi_instance.py - AddMultiInstanceHandler

  • src/stabilize/handlers/continue_parent_stage.py - ContinueParentStageHandler

  • src/stabilize/handlers/jump_to_stage.py - JumpToStageHandler for dynamic routing

  • src/stabilize/errors.py - TransientError with context_update parameter