Models
Core Models
Workflow model.
A pipeline execution represents a running instance of a pipeline, containing all stages and their runtime state. The execution tracks: - Overall status - All stages (including synthetic stages) - Trigger information - Timing data
- class stabilize.models.workflow.WorkflowType(value)[source]
Bases:
EnumType of execution.
PIPELINE: A full pipeline execution ORCHESTRATION: An ad-hoc orchestration (single stage)
- PIPELINE = 'PIPELINE'
- ORCHESTRATION = 'ORCHESTRATION'
- class stabilize.models.workflow.Trigger(type='manual', user='anonymous', parameters=<factory>, artifacts=<factory>, payload=<factory>)[source]
Bases:
objectTrigger information for a pipeline execution.
Contains details about what triggered the pipeline (manual, webhook, cron, etc.) and any parameters passed to the execution.
- Parameters:
type (str)
user (str)
parameters (dict[str, Any])
artifacts (list[dict[str, Any]])
payload (dict[str, Any])
- type: str = 'manual'
- user: str = 'anonymous'
- parameters: dict[str, Any]
- artifacts: list[dict[str, Any]]
- payload: dict[str, Any]
- class stabilize.models.workflow.PausedDetails(paused_by='', pause_time=None, resume_time=None, paused_ms=0)[source]
Bases:
objectDetails about a paused execution.
- Parameters:
paused_by (str)
pause_time (int | None)
resume_time (int | None)
paused_ms (int)
- paused_by: str = ''
- pause_time: int | None = None
- resume_time: int | None = None
- paused_ms: int = 0
- property is_paused: bool
Check if currently paused.
- class stabilize.models.workflow.Workflow(id=<factory>, type=WorkflowType.PIPELINE, application='', name='', status=WorkflowStatus.NOT_STARTED, stages=<factory>, context=<factory>, trigger=<factory>, start_time=None, end_time=None, start_time_expiry=None, is_canceled=False, canceled_by=None, cancellation_reason=None, paused=None, pipeline_config_id=None, is_limit_concurrent=False, max_concurrent_executions=0, keep_waiting_pipelines=False, origin='unknown', config_version=None)[source]
Bases:
objectRepresents a pipeline execution.
This is the top-level container for all execution state. It holds all stages and tracks the overall execution status.
- Parameters:
id (str)
type (WorkflowType)
application (str)
name (str)
status (WorkflowStatus)
stages (list[StageExecution])
context (dict[str, Any])
trigger (Trigger)
start_time (int | None)
end_time (int | None)
start_time_expiry (int | None)
is_canceled (bool)
canceled_by (str | None)
cancellation_reason (str | None)
paused (PausedDetails | None)
pipeline_config_id (str | None)
is_limit_concurrent (bool)
max_concurrent_executions (int)
keep_waiting_pipelines (bool)
origin (str)
config_version (str | None)
- id
Unique identifier (ULID)
- Type:
str
- type
PIPELINE or ORCHESTRATION
- application
Application name this pipeline belongs to
- Type:
str
- name
Pipeline name
- Type:
str
- status
Current execution status
- stages
All stages in this execution (including synthetic)
- Type:
- trigger
Trigger information
- start_time
Epoch milliseconds when execution started
- Type:
int | None
- end_time
Epoch milliseconds when execution completed
- Type:
int | None
- start_time_expiry
If not started by this time, cancel
- Type:
int | None
- is_canceled
Whether execution has been canceled
- Type:
bool
- canceled_by
User who canceled the execution
- Type:
str | None
- cancellation_reason
Reason for cancellation
- Type:
str | None
- paused
Pause details if execution is paused
- Type:
- pipeline_config_id
ID of the pipeline configuration
- Type:
str | None
- is_limit_concurrent
Whether to limit concurrent executions
- Type:
bool
- max_concurrent_executions
Max concurrent executions allowed
- Type:
int
- keep_waiting_pipelines
Keep queued pipelines on cancel
- Type:
bool
- origin
Origin of the execution (e.g., “api”, “deck”)
- Type:
str
- id: str
- type: WorkflowType = 'PIPELINE'
- application: str = ''
- name: str = ''
- status: WorkflowStatus = ('NOT_STARTED', False, False)
- stages: list[StageExecution]
- context: dict[str, Any]
- start_time: int | None = None
- end_time: int | None = None
- start_time_expiry: int | None = None
- is_canceled: bool = False
- canceled_by: str | None = None
- cancellation_reason: str | None = None
- paused: PausedDetails | None = None
- pipeline_config_id: str | None = None
- is_limit_concurrent: bool = False
- max_concurrent_executions: int = 0
- keep_waiting_pipelines: bool = False
- origin: str = 'unknown'
- config_version: str | None = None
- add_stage(stage)[source]
Add a stage to this execution.
- Parameters:
stage (StageExecution)
- Return type:
None
- remove_stage(stage_id)[source]
Remove a stage from this execution.
- Parameters:
stage_id (str)
- Return type:
None
- stage_by_id(stage_id)[source]
Get a stage by its ID.
- Raises:
ValueError – If stage not found
- Parameters:
stage_id (str)
- Return type:
- stage_by_ref_id(ref_id)[source]
Get a stage by its reference ID.
- Parameters:
ref_id (str)
- Return type:
StageExecution | None
- initial_stages()[source]
Get all initial stages (no dependencies, not synthetic).
These are the stages that can start immediately when execution begins.
- Return type:
list[StageExecution]
- top_level_stages()[source]
Get all top-level stages (not synthetic).
- Return type:
list[StageExecution]
- get_context()[source]
Get aggregated context from all stages.
Returns merged outputs from all stages in topological order. Collections are concatenated, latest value wins for non-collections.
- Return type:
dict[str, Any]
- update_status(status)[source]
Update the execution status.
- Parameters:
status (WorkflowStatus)
- Return type:
None
- cancel(user, reason)[source]
Mark this execution as canceled.
- Parameters:
user (str)
reason (str)
- Return type:
None
- paused_duration_relative_to(instant_ms)[source]
Get paused duration relative to a given instant.
Returns 0 if not paused or pause was before the instant.
- Parameters:
instant_ms (int)
- Return type:
int
- classmethod create(application, name, stages, trigger=None, pipeline_config_id=None, context=None)[source]
Factory method to create a new pipeline execution.
- Parameters:
application (str) – Application name
name (str) – Pipeline name
stages (list[StageExecution]) – List of stages
trigger (Trigger | None) – Optional trigger info
pipeline_config_id (str | None) – Optional config ID
context (dict[str, Any] | None) – Optional execution-level context (for jump tracking, etc.)
- Returns:
A new Workflow instance
- Return type:
- classmethod create_orchestration(application, name, stages)[source]
Factory method to create an orchestration (ad-hoc execution).
- Parameters:
application (str) – Application name
name (str) – Orchestration name
stages (list[StageExecution]) – List of stages
- Returns:
A new Workflow with type ORCHESTRATION
- Return type:
- class stabilize.models.stage.JoinType(value)[source]
Bases:
EnumJoin semantics for stages with multiple upstream dependencies.
AND: Wait for ALL upstreams (default, WCP-3). OR: Wait only for activated branches from a paired OR-split (WCP-7). MULTI_MERGE: Fire once per upstream completion, no sync (WCP-8). DISCRIMINATOR: Fire on first upstream completion, ignore rest (WCP-9). N_OF_M: Fire when N of M upstreams complete (WCP-30).
- AND = 'AND'
- OR = 'OR'
- MULTI_MERGE = 'MULTI_MERGE'
- DISCRIMINATOR = 'DISCRIMINATOR'
- N_OF_M = 'N_OF_M'
- class stabilize.models.stage.SplitType(value)[source]
Bases:
EnumSplit semantics for stages with multiple downstream branches.
AND: Activate ALL downstream stages (default, WCP-2). OR: Evaluate conditions per downstream, activate matching ones (WCP-6).
- AND = 'AND'
- OR = 'OR'
- class stabilize.models.stage.StageExecution(id=<factory>, ref_id='', type='', name='', status=WorkflowStatus.NOT_STARTED, context=<factory>, outputs=<factory>, tasks=<factory>, requisite_stage_ref_ids=<factory>, parent_stage_id=None, synthetic_stage_owner=None, start_time=None, end_time=None, start_time_expiry=None, scheduled_time=None, version=0, cleanup_on_failure=False, finalizer_names=<factory>, join_type=JoinType.AND, join_threshold=0, split_type=SplitType.AND, split_conditions=<factory>, mi_config=None, deferred_choice_group=None, milestone_ref_id=None, milestone_status=None, mutex_key=None, cancel_region=None, _execution=None)[source]
Bases:
StageNavigationMixinRepresents a stage execution within a pipeline.
The DAG structure is encoded in requisite_stage_ref_ids: - Empty set = initial stage (no dependencies) - Single ref_id = sequential dependency - Multiple ref_ids = join point (waits for all)
- Parameters:
id (str)
ref_id (str)
type (str)
name (str)
status (WorkflowStatus)
context (dict[str, Any])
outputs (dict[str, Any])
tasks (list[TaskExecution])
requisite_stage_ref_ids (set[str])
parent_stage_id (str | None)
synthetic_stage_owner (SyntheticStageOwner | None)
start_time (int | None)
end_time (int | None)
start_time_expiry (int | None)
scheduled_time (int | None)
version (int)
cleanup_on_failure (bool)
finalizer_names (list[str])
join_type (JoinType)
join_threshold (int)
split_type (SplitType)
split_conditions (dict[str, str])
mi_config (MultiInstanceConfig | None)
deferred_choice_group (str | None)
milestone_ref_id (str | None)
milestone_status (str | None)
mutex_key (str | None)
cancel_region (str | None)
_execution (weakref.ReferenceType[Workflow] | Workflow | None)
- id
Unique identifier (ULID)
- Type:
str
- ref_id
Reference identifier used for DAG relationships
- Type:
str
- type
Stage type (e.g., “deploy”, “bake”, “wait”)
- Type:
str
- name
Human-readable stage name
- Type:
str
- status
Current execution status
- Type:
- context
Input parameters and runtime state (stage-scoped)
- Type:
dict[str, Any]
- outputs
Values available to downstream stages (pipeline-scoped)
- Type:
dict[str, Any]
- tasks
List of tasks to execute in this stage
- Type:
list[TaskExecution]
- requisite_stage_ref_ids
Set of ref_ids this stage depends on (DAG edges)
- Type:
set[str]
- parent_stage_id
Parent stage ID for synthetic stages
- Type:
str | None
- synthetic_stage_owner
STAGE_BEFORE or STAGE_AFTER for synthetic stages
- Type:
SyntheticStageOwner | None
- start_time
Epoch milliseconds when stage started
- Type:
int | None
- end_time
Epoch milliseconds when stage completed
- Type:
int | None
- start_time_expiry
If stage not started by this time, skip it
- Type:
int | None
- scheduled_time
When stage is scheduled to execute
- Type:
int | None
- property allow_sibling_stages_to_continue_on_failure: bool
Check if sibling stages can continue on this stage’s failure.
- cancel_region: str | None = None
- cleanup_on_failure: bool = False
- property continue_pipeline_on_failure: bool
Check if pipeline should continue on stage failure.
- classmethod create(type, name, ref_id, context=None, requisite_stage_ref_ids=None)[source]
Factory method to create a new stage execution.
- Parameters:
type (str) – Stage type
name (str) – Human-readable name
ref_id (str) – Reference ID for DAG relationships
context (dict[str, Any] | None) – Initial context/parameters
requisite_stage_ref_ids (set[str] | None) – Dependencies (empty = initial stage)
- Returns:
A new StageExecution instance
- Return type:
- classmethod create_synthetic(type, name, parent, owner, context=None)[source]
Factory method to create a synthetic stage.
- Parameters:
type (str) – Stage type
name (str) – Human-readable name
parent (StageExecution) – Parent stage
owner (SyntheticStageOwner) – STAGE_BEFORE or STAGE_AFTER
context (dict[str, Any] | None) – Initial context/parameters
- Returns:
A new synthetic StageExecution
- Return type:
- deferred_choice_group: str | None = None
- determine_status()[source]
Determine the stage status based on before-stages, tasks, and after-stages.
Status priority (highest to lowest): 1. TERMINAL/STOPPED/CANCELED - halt conditions 2. PAUSED/BUFFERED/SUSPENDED - waiting conditions 3. RUNNING - in progress 4. FAILED_CONTINUE - completed with non-fatal failure 5. SUCCEEDED/SKIPPED - completed successfully
After-stages ARE included in status determination to ensure the stage doesn’t report SUCCEEDED while after-stages are still running.
- Return type:
- end_time: int | None = None
- failure_status(default=WorkflowStatus.TERMINAL)[source]
Get the appropriate failure status based on stage configuration.
- Parameters:
default (WorkflowStatus)
- Return type:
- first_task()[source]
Get the first task in this stage.
- Return type:
TaskExecution | None
- join_threshold: int = 0
- mi_config: MultiInstanceConfig | None = None
- milestone_ref_id: str | None = None
- milestone_status: str | None = None
- mutex_key: str | None = None
- name: str = ''
- next_task(task)[source]
Get the task that follows the given task.
- Parameters:
task (TaskExecution)
- Return type:
TaskExecution | None
- parent_stage_id: str | None = None
- property phase_version: tuple[str, int]
Return (status_name, version) for optimistic locking with phase check.
This combines the current status and version number into a single tuple for phase-aware optimistic locking. When updating a stage, you can pass this to ensure both the version AND the expected status match before the update proceeds.
- Returns:
Tuple of (status_name, version) for comparison.
Example
expected = stage.phase_version # … some operation … store.update_status(stage, expected_phase=expected[0])
- ref_id: str = ''
- scheduled_time: int | None = None
- set_execution_strong(value)[source]
Set the parent pipeline execution as a strong reference.
- Parameters:
value (Workflow)
- Return type:
None
- start_time: int | None = None
- start_time_expiry: int | None = None
- state_snapshot()[source]
Return frozen copy of current state for read-only use.
Creates an immutable snapshot of the current stage state that can be safely passed to external systems, cached, or logged without risk of mutation.
- Returns:
Frozen StageStateSnapshot with current state.
- Return type:
StageStateSnapshot
- status: WorkflowStatus = ('NOT_STARTED', False, False)
- synthetic_stage_owner: SyntheticStageOwner | None = None
- type: str = ''
- version: int = 0
- id: str
- context: dict[str, Any]
- outputs: dict[str, Any]
- tasks: list[TaskExecution]
- requisite_stage_ref_ids: set[str]
- finalizer_names: list[str]
- split_conditions: dict[str, str]
- class stabilize.models.stage.SyntheticStageOwner(value)[source]
Bases:
EnumIndicates the relationship of a synthetic stage to its parent.
STAGE_BEFORE: Runs before the parent’s tasks STAGE_AFTER: Runs after the parent completes
- STAGE_BEFORE = 'STAGE_BEFORE'
- STAGE_AFTER = 'STAGE_AFTER'
TaskExecution model.
A task is the smallest unit of work within a stage. Each stage contains one or more tasks that execute sequentially.
- class stabilize.models.task.TaskExecution(id=<factory>, name='', implementing_class='', status=WorkflowStatus.NOT_STARTED, start_time=None, end_time=None, stage_start=False, stage_end=False, loop_start=False, loop_end=False, task_exception_details=<factory>, version=0, _stage=None)[source]
Bases:
objectRepresents a single task within a stage execution.
Tasks are the atomic units of work in a pipeline. They execute sequentially within a stage, and each task produces a result that can include: - Status updates - Context modifications (stage-scoped) - Output values (pipeline-scoped)
- Parameters:
id (str)
name (str)
implementing_class (str)
status (WorkflowStatus)
start_time (int | None)
end_time (int | None)
stage_start (bool)
stage_end (bool)
loop_start (bool)
loop_end (bool)
task_exception_details (dict[str, Any])
version (int)
_stage (weakref.ReferenceType[StageExecution] | StageExecution | None)
- id
Unique identifier for this task execution
- Type:
str
- name
Human-readable name for this task
- Type:
str
- implementing_class
Fully qualified class name of the task implementation
- Type:
str
- status
Current execution status
- Type:
- start_time
Epoch milliseconds when task started
- Type:
int | None
- end_time
Epoch milliseconds when task completed
- Type:
int | None
- stage_start
True if this is the first task in the stage
- Type:
bool
- stage_end
True if this is the last task in the stage
- Type:
bool
- loop_start
True if this task starts a loop
- Type:
bool
- loop_end
True if this task ends a loop
- Type:
bool
- task_exception_details
Exception information if task failed
- Type:
dict[str, Any]
- id: str
- name: str = ''
- implementing_class: str = ''
- status: WorkflowStatus = ('NOT_STARTED', False, False)
- start_time: int | None = None
- end_time: int | None = None
- stage_start: bool = False
- stage_end: bool = False
- loop_start: bool = False
- loop_end: bool = False
- task_exception_details: dict[str, Any]
- version: int = 0
- property stage: StageExecution | None
Get the parent stage for this task.
- set_stage_strong(value)[source]
Set the parent stage for this task as a strong reference.
- Parameters:
value (StageExecution)
- Return type:
None
- has_stage()[source]
Check if this task is attached to a stage.
Returns True if the task has a valid stage reference that hasn’t been garbage collected. This is useful for safely checking before accessing stage data when using weak references.
- Returns:
True if stage reference is valid, False otherwise.
- Return type:
bool
- property is_stage_start: bool
Check if this task starts the stage.
- property is_stage_end: bool
Check if this task ends the stage.
- property is_loop_start: bool
Check if this task starts a loop.
- property is_loop_end: bool
Check if this task ends a loop.
- set_exception_details(exception)[source]
Store exception details for this task.
- Parameters:
exception (dict[str, Any])
- Return type:
None
- property phase_version: tuple[str, int]
Return (status_name, version) for optimistic locking with phase check.
This combines the current status and version number into a single tuple for phase-aware optimistic locking.
- Returns:
Tuple of (status_name, version) for comparison.
- state_snapshot()[source]
Return frozen copy of current state for read-only use.
Creates an immutable snapshot of the current task state that can be safely passed to external systems, cached, or logged without risk of mutation.
- Returns:
Frozen TaskStateSnapshot with current state.
- Return type:
TaskStateSnapshot
- classmethod create(name, implementing_class, stage_start=False, stage_end=False)[source]
Factory method to create a new task execution.
- Parameters:
name (str) – Human-readable task name
implementing_class (str) – Class name or callable reference for task
stage_start (bool) – Whether this is the first task
stage_end (bool) – Whether this is the last task
- Returns:
A new TaskExecution instance
- Return type:
WorkflowStatus enum.
This enum represents all possible states for executions, stages, and tasks. Each status has two boolean properties: - complete: Whether the entity has finished its work (successfully or not) - halt: Whether downstream execution should be blocked
- class stabilize.models.status.WorkflowStatus(value)[source]
Bases:
EnumExecution status enum.
Each value is a tuple of (name, complete, halt).
- NOT_STARTED = ('NOT_STARTED', False, False)
- RUNNING = ('RUNNING', False, False)
- PAUSED = ('PAUSED', False, False)
- SUSPENDED = ('SUSPENDED', False, False)
- SUCCEEDED = ('SUCCEEDED', True, False)
- FAILED_CONTINUE = ('FAILED_CONTINUE', True, False)
- TERMINAL = ('TERMINAL', True, True)
- CANCELED = ('CANCELED', True, True)
- REDIRECT = ('REDIRECT', False, False)
- STOPPED = ('STOPPED', True, True)
- SKIPPED = ('SKIPPED', True, False)
- BUFFERED = ('BUFFERED', False, False)
- property is_complete: bool
Indicates that the task/stage/pipeline has finished its work.
Returns True for: CANCELED, SUCCEEDED, STOPPED, SKIPPED, TERMINAL, FAILED_CONTINUE
- property is_halt: bool
Indicates an abnormal completion - nothing downstream should run.
Returns True for: TERMINAL, CANCELED, STOPPED
- property is_successful: bool
Check if this status represents a successful completion.
- property is_failure: bool
Check if this status represents a failure.
- property is_skipped: bool
Check if this status is SKIPPED.
- property is_dirty: bool
True if entity is mid-execution (RUNNING, REDIRECT).
Dirty states indicate the entity is actively being processed and may have uncommitted changes. Useful for: - Determining if optimistic locking should be used - Identifying stages that need recovery after crash - Phase-aware state transitions
- exception stabilize.models.status.InvalidStateTransitionError(current, target, entity_type='entity', entity_id=None)[source]
Bases:
ExceptionRaised when an invalid state transition is attempted.
- Parameters:
current (WorkflowStatus)
target (WorkflowStatus)
entity_type (str)
entity_id (str | None)
- Return type:
None
- stabilize.models.status.can_transition(current, target)[source]
Check if a state transition is valid.
- Parameters:
current (WorkflowStatus) – The current workflow status
target (WorkflowStatus) – The desired target status
- Returns:
True if the transition is valid, False otherwise
- Return type:
bool
- stabilize.models.status.validate_transition(current, target, entity_type='entity', entity_id=None)[source]
Validate and raise if state transition is invalid.
- Parameters:
current (WorkflowStatus) – The current workflow status
target (WorkflowStatus) – The desired target status
entity_type (str) – Type of entity (workflow, stage, task) for error message
entity_id (str | None) – Optional ID of the entity for error message
- Raises:
InvalidStateTransitionError – If the transition is not valid
- Return type:
None
Control-Flow Pattern Models
Multi-instance configuration for WCP-12 through WCP-15.
Defines how a stage spawns and synchronizes multiple parallel instances.
- class stabilize.models.multi_instance.MultiInstanceConfig(count=0, count_from_context='', sync_on_complete=True, allow_dynamic=False, collection_from_context='', join_threshold=0, cancel_remaining=False)[source]
Bases:
objectConfiguration for multi-instance stage execution.
Supports several patterns: - WCP-12: count > 0, sync_on_complete=False (fire and forget) - WCP-13: count > 0, sync_on_complete=True (design-time known count) - WCP-14: count_from_context set, sync_on_complete=True (runtime known count) - WCP-15: allow_dynamic=True (count not known until last instance)
- Parameters:
count (int)
count_from_context (str)
sync_on_complete (bool)
allow_dynamic (bool)
collection_from_context (str)
join_threshold (int)
cancel_remaining (bool)
- count
Fixed number of instances (0 means use count_from_context).
- Type:
int
- count_from_context
Context key holding the instance count at runtime.
- Type:
str
- sync_on_complete
Whether to wait for all instances before continuing.
- Type:
bool
- allow_dynamic
Whether new instances can be added during execution.
- Type:
bool
- collection_from_context
Context key holding items to iterate over.
- Type:
str
- join_threshold
If set, continue after this many complete (N-of-M).
- Type:
int
- cancel_remaining
Cancel remaining instances after threshold reached.
- Type:
bool
- count: int = 0
- count_from_context: str = ''
- sync_on_complete: bool = True
- allow_dynamic: bool = False
- collection_from_context: str = ''
- join_threshold: int = 0
- cancel_remaining: bool = False
Safe expression evaluator for workflow control-flow conditions.
Used by OR-split (WCP-6) to evaluate per-downstream conditions, and by other patterns that need runtime condition evaluation.
Only supports a safe subset of operations - no arbitrary code execution. Expressions can reference context values using dot notation or bracket notation.
- exception stabilize.expressions.ExpressionError[source]
Bases:
ExceptionRaised when an expression cannot be evaluated.
- stabilize.expressions.evaluate_expression(expression, context)[source]
Evaluate a simple expression against a context dictionary.
Supports: - Boolean literals: true, false, True, False - String/number/None literals - Context lookups: context_key, context[“key”] - Comparisons: ==, !=, <, <=, >, >=, in, not in - Boolean operators: and, or, not - Attribute access for nested dicts: a.b.c
Does NOT support: - Function calls - Imports - Assignments - Arbitrary code
- Parameters:
expression (str) – The expression string to evaluate
context (dict[str, Any]) – Dictionary of values available to the expression
- Returns:
The result of evaluating the expression
- Raises:
ExpressionError – If the expression is invalid or uses unsupported features
- Return type:
Any