Source code for stabilize.models.stage.stage

"""
StageExecution model.

A stage represents a logical unit of work in a pipeline. Stages can have:
- Prerequisites (other stages that must complete first)
- Tasks (sequential work units)
- Synthetic stages (before/after stages injected by builders)

The DAG structure is represented via requisite_stage_ref_ids, which contains
the ref_ids of all stages this stage depends on.
"""

from __future__ import annotations

import weakref
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from stabilize.models.stage.enums import (
    JoinType,
    SplitType,
    SyntheticStageOwner,
    _generate_stage_id,
)
from stabilize.models.stage.navigation import StageNavigationMixin
from stabilize.models.status import WorkflowStatus
from stabilize.models.task import TaskExecution

if TYPE_CHECKING:
    from stabilize.models.multi_instance import MultiInstanceConfig
    from stabilize.models.snapshot import StageStateSnapshot
    from stabilize.models.workflow import Workflow


[docs] @dataclass class StageExecution(StageNavigationMixin): """ Represents a stage execution within a pipeline. The DAG structure is encoded in requisite_stage_ref_ids: - Empty set = initial stage (no dependencies) - Single ref_id = sequential dependency - Multiple ref_ids = join point (waits for all) Attributes: id: Unique identifier (ULID) ref_id: Reference identifier used for DAG relationships type: Stage type (e.g., "deploy", "bake", "wait") name: Human-readable stage name status: Current execution status context: Input parameters and runtime state (stage-scoped) outputs: Values available to downstream stages (pipeline-scoped) tasks: List of tasks to execute in this stage requisite_stage_ref_ids: Set of ref_ids this stage depends on (DAG edges) parent_stage_id: Parent stage ID for synthetic stages synthetic_stage_owner: STAGE_BEFORE or STAGE_AFTER for synthetic stages start_time: Epoch milliseconds when stage started end_time: Epoch milliseconds when stage completed start_time_expiry: If stage not started by this time, skip it scheduled_time: When stage is scheduled to execute """ id: str = field(default_factory=_generate_stage_id) ref_id: str = "" type: str = "" name: str = "" status: WorkflowStatus = WorkflowStatus.NOT_STARTED context: dict[str, Any] = field(default_factory=dict) outputs: dict[str, Any] = field(default_factory=dict) tasks: list[TaskExecution] = field(default_factory=list) requisite_stage_ref_ids: set[str] = field(default_factory=set) parent_stage_id: str | None = None synthetic_stage_owner: SyntheticStageOwner | None = None start_time: int | None = None end_time: int | None = None start_time_expiry: int | None = None scheduled_time: int | None = None version: int = 0 # Finalizer support: cleanup on failure and registered finalizer names cleanup_on_failure: bool = False finalizer_names: list[str] = field(default_factory=list) # ========== Advanced Control-Flow Pattern Fields ========== # Join semantics (WCP-7,8,9,28-33,37,38) join_type: JoinType = JoinType.AND join_threshold: int = 0 # For N_OF_M join: how many upstreams needed # Split semantics (WCP-6) split_type: SplitType = SplitType.AND split_conditions: dict[str, str] = field(default_factory=dict) # downstream_ref_id -> condition expr # Multi-instance configuration (WCP-12-15, 26, 27, 34-36) mi_config: MultiInstanceConfig | None = None # Deferred choice group (WCP-16) deferred_choice_group: str | None = None # Milestone gating (WCP-18) milestone_ref_id: str | None = None milestone_status: str | None = None # Required status name of milestone stage # Mutual exclusion / critical section (WCP-17, 39, 40) mutex_key: str | None = None # Cancel region (WCP-25) cancel_region: str | None = None # Back-reference to parent execution (set after construction) # Can be weakref (default) or strong ref (for standalone stages) _execution: weakref.ReferenceType[Workflow] | Workflow | None = field(default=None, repr=False) def __post_init__(self) -> None: if self.join_type == JoinType.N_OF_M and self.join_threshold < 0: raise ValueError(f"join_threshold must be >= 0 for N_OF_M join, got {self.join_threshold}") # ========== Phase-Version State Tracking ========== @property def phase_version(self) -> tuple[str, int]: """Return (status_name, version) for optimistic locking with phase check. This combines the current status and version number into a single tuple for phase-aware optimistic locking. When updating a stage, you can pass this to ensure both the version AND the expected status match before the update proceeds. Returns: Tuple of (status_name, version) for comparison. Example: expected = stage.phase_version # ... some operation ... store.update_status(stage, expected_phase=expected[0]) """ return (self.status.name, self.version)
[docs] def state_snapshot(self) -> StageStateSnapshot: """Return frozen copy of current state for read-only use. Creates an immutable snapshot of the current stage state that can be safely passed to external systems, cached, or logged without risk of mutation. Returns: Frozen StageStateSnapshot with current state. """ from stabilize.models.snapshot import StageStateSnapshot, _freeze_dict return StageStateSnapshot( id=self.id, ref_id=self.ref_id, status=self.status, version=self.version, context=_freeze_dict(self.context), outputs=_freeze_dict(self.outputs), start_time=self.start_time, end_time=self.end_time, parent_stage_id=self.parent_stage_id, )
@property def execution(self) -> Workflow: """Get the parent pipeline execution.""" if self._execution is None: raise ValueError("Stage is not attached to an execution") if isinstance(self._execution, weakref.ReferenceType): exe = self._execution() if exe is None: raise ValueError("Execution has been garbage collected") return exe # Strong reference return self._execution @execution.setter def execution(self, value: Workflow) -> None: """Set the parent pipeline execution (weakref by default).""" self._execution = weakref.ref(value)
[docs] def set_execution_strong(self, value: Workflow) -> None: """Set the parent pipeline execution as a strong reference.""" self._execution = value
[docs] def has_execution(self) -> bool: """Check if this stage is attached to an execution.""" if self._execution is None: return False if isinstance(self._execution, weakref.ReferenceType): return self._execution() is not None return True
[docs] def cleanup(self) -> None: """Explicitly break circular references.""" self._execution = None for task in self.tasks: task.cleanup()
# ========== Task Methods ==========
[docs] def first_task(self) -> TaskExecution | None: """Get the first task in this stage.""" return self.tasks[0] if self.tasks else None
[docs] def next_task(self, task: TaskExecution) -> TaskExecution | None: """Get the task that follows the given task.""" if task.is_stage_end: return None try: index = self.tasks.index(task) return self.tasks[index + 1] except (ValueError, IndexError): return None
[docs] def has_tasks(self) -> bool: """Check if this stage has any tasks.""" return len(self.tasks) > 0
# ========== Status Methods ==========
[docs] def determine_status(self) -> WorkflowStatus: """Determine the stage status based on before-stages, tasks, and after-stages. Status priority (highest to lowest): 1. TERMINAL/STOPPED/CANCELED - halt conditions 2. PAUSED/BUFFERED/SUSPENDED - waiting conditions 3. RUNNING - in progress 4. FAILED_CONTINUE - completed with non-fatal failure 5. SUCCEEDED/SKIPPED - completed successfully After-stages ARE included in status determination to ensure the stage doesn't report SUCCEEDED while after-stages are still running. """ # Collect statuses from all components before_stage_statuses = [s.status for s in self.before_stages()] task_statuses = [t.status for t in self.tasks] after_stage_statuses = [s.status for s in self.after_stages()] # Core statuses (before-stages + tasks) determine the main outcome core_statuses = before_stage_statuses + task_statuses if not core_statuses: # No tasks and no before-stages: if the stage is already RUNNING # (claimed by StartStageHandler), treat as a no-op success rather # than NOT_STARTED which would incorrectly terminate the workflow. if self.status == WorkflowStatus.RUNNING: if after_stage_statuses: if any(s in {WorkflowStatus.NOT_STARTED, WorkflowStatus.RUNNING} for s in after_stage_statuses): return WorkflowStatus.RUNNING if WorkflowStatus.TERMINAL in after_stage_statuses: return WorkflowStatus.TERMINAL return WorkflowStatus.SUCCEEDED return WorkflowStatus.NOT_STARTED # Check halt conditions first (highest priority) if WorkflowStatus.TERMINAL in core_statuses: return self.failure_status() if WorkflowStatus.STOPPED in core_statuses: return WorkflowStatus.STOPPED if WorkflowStatus.CANCELED in core_statuses: return WorkflowStatus.CANCELED # Check waiting conditions if WorkflowStatus.PAUSED in core_statuses: return WorkflowStatus.PAUSED if WorkflowStatus.BUFFERED in core_statuses: return WorkflowStatus.BUFFERED if WorkflowStatus.SUSPENDED in core_statuses: return WorkflowStatus.SUSPENDED # Check if core work is still in progress incomplete_statuses = {WorkflowStatus.NOT_STARTED, WorkflowStatus.RUNNING} if any(s in incomplete_statuses for s in core_statuses): return WorkflowStatus.RUNNING # Core work is complete - check if it succeeded or failed with continue core_has_failure = WorkflowStatus.FAILED_CONTINUE in core_statuses core_all_done = all( s in {WorkflowStatus.SUCCEEDED, WorkflowStatus.SKIPPED, WorkflowStatus.FAILED_CONTINUE} for s in core_statuses ) if not core_all_done: # Unexpected status in core - treat as running return WorkflowStatus.RUNNING # Core work is done - now check after-stages if after_stage_statuses: # Check if after-stages have halted if WorkflowStatus.TERMINAL in after_stage_statuses: return WorkflowStatus.TERMINAL if WorkflowStatus.STOPPED in after_stage_statuses: return WorkflowStatus.STOPPED if WorkflowStatus.CANCELED in after_stage_statuses: return WorkflowStatus.CANCELED # Check if after-stages are still in progress if any(s in incomplete_statuses for s in after_stage_statuses): return WorkflowStatus.RUNNING # All work complete - return final status if core_has_failure: return WorkflowStatus.FAILED_CONTINUE return WorkflowStatus.SUCCEEDED
[docs] def failure_status(self, default: WorkflowStatus = WorkflowStatus.TERMINAL) -> WorkflowStatus: """Get the appropriate failure status based on stage configuration.""" if self.continue_pipeline_on_failure: return WorkflowStatus.FAILED_CONTINUE if self.should_fail_pipeline(): return default return WorkflowStatus.STOPPED
@property def continue_pipeline_on_failure(self) -> bool: """Check if pipeline should continue on stage failure.""" return bool(self.context.get("continuePipelineOnFailure", False))
[docs] def should_fail_pipeline(self) -> bool: """Check if stage failure should fail the pipeline.""" return bool(self.context.get("failPipeline", True))
@property def allow_sibling_stages_to_continue_on_failure(self) -> bool: """Check if sibling stages can continue on this stage's failure.""" return bool(self.context.get("allowSiblingStagesToContinueOnFailure", False)) # ========== Factory Methods ==========
[docs] @classmethod def create( cls, type: str, name: str, ref_id: str, context: dict[str, Any] | None = None, requisite_stage_ref_ids: set[str] | None = None, ) -> StageExecution: """ Factory method to create a new stage execution. Args: type: Stage type name: Human-readable name ref_id: Reference ID for DAG relationships context: Initial context/parameters requisite_stage_ref_ids: Dependencies (empty = initial stage) Returns: A new StageExecution instance """ return cls( type=type, name=name, ref_id=ref_id, context=context or {}, requisite_stage_ref_ids=requisite_stage_ref_ids or set(), )
[docs] @classmethod def create_synthetic( cls, type: str, name: str, parent: StageExecution, owner: SyntheticStageOwner, context: dict[str, Any] | None = None, ) -> StageExecution: """ Factory method to create a synthetic stage. Args: type: Stage type name: Human-readable name parent: Parent stage owner: STAGE_BEFORE or STAGE_AFTER context: Initial context/parameters Returns: A new synthetic StageExecution """ from ulid import ULID stage = cls( type=type, name=name, ref_id=str(ULID()), # Synthetic stages get unique ref_ids context=context or {}, parent_stage_id=parent.id, synthetic_stage_owner=owner, ) if parent.has_execution(): stage.execution = parent.execution return stage