Source code for stabilize.models.status

"""
WorkflowStatus enum.

This enum represents all possible states for executions, stages, and tasks.
Each status has two boolean properties:
- complete: Whether the entity has finished its work (successfully or not)
- halt: Whether downstream execution should be blocked
"""

from enum import Enum


[docs] class WorkflowStatus(Enum): """ Execution status enum. Each value is a tuple of (name, complete, halt). """ # The task has yet to start NOT_STARTED = ("NOT_STARTED", False, False) # The task is still running and may be re-executed to continue RUNNING = ("RUNNING", False, False) # The task is paused and may be resumed to continue PAUSED = ("PAUSED", False, False) # The task is complete but pipeline should stop pending a trigger SUSPENDED = ("SUSPENDED", False, False) # The task executed successfully and pipeline may proceed SUCCEEDED = ("SUCCEEDED", True, False) # The task failed but pipeline may proceed to the next task FAILED_CONTINUE = ("FAILED_CONTINUE", True, False) # The task failed terminally - pipeline will not progress further TERMINAL = ("TERMINAL", True, True) # The task was canceled - pipeline will not progress further CANCELED = ("CANCELED", True, True) # The step completed but indicates a decision path should be followed REDIRECT = ("REDIRECT", False, False) # The task was stopped - pipeline will not progress further STOPPED = ("STOPPED", True, True) # The task was skipped and pipeline will proceed to next task SKIPPED = ("SKIPPED", True, False) # The task is not started and must transition to NOT_STARTED BUFFERED = ("BUFFERED", False, False) def __init__(self, name: str, complete: bool, halt: bool) -> None: self._name = name self._complete = complete self._halt = halt @property def is_complete(self) -> bool: """ Indicates that the task/stage/pipeline has finished its work. Returns True for: CANCELED, SUCCEEDED, STOPPED, SKIPPED, TERMINAL, FAILED_CONTINUE """ return self._complete @property def is_halt(self) -> bool: """ Indicates an abnormal completion - nothing downstream should run. Returns True for: TERMINAL, CANCELED, STOPPED """ return self._halt @property def is_successful(self) -> bool: """Check if this status represents a successful completion.""" return self in _SUCCESSFUL_STATUSES @property def is_failure(self) -> bool: """Check if this status represents a failure.""" return self in _FAILURE_STATUSES @property def is_skipped(self) -> bool: """Check if this status is SKIPPED.""" return self == WorkflowStatus.SKIPPED @property def is_dirty(self) -> bool: """True if entity is mid-execution (RUNNING, REDIRECT). Dirty states indicate the entity is actively being processed and may have uncommitted changes. Useful for: - Determining if optimistic locking should be used - Identifying stages that need recovery after crash - Phase-aware state transitions """ return self in {WorkflowStatus.RUNNING, WorkflowStatus.REDIRECT} def __str__(self) -> str: return self._name def __repr__(self) -> str: return f"WorkflowStatus.{self.name}"
# Status sets for quick membership testing (matching Orca's ImmutableSets) COMPLETED_STATUSES: frozenset[WorkflowStatus] = frozenset( { WorkflowStatus.CANCELED, WorkflowStatus.SUCCEEDED, WorkflowStatus.STOPPED, WorkflowStatus.SKIPPED, WorkflowStatus.TERMINAL, WorkflowStatus.FAILED_CONTINUE, } ) _SUCCESSFUL_STATUSES: frozenset[WorkflowStatus] = frozenset( { WorkflowStatus.SUCCEEDED, WorkflowStatus.SKIPPED, } ) _FAILURE_STATUSES: frozenset[WorkflowStatus] = frozenset( { WorkflowStatus.TERMINAL, WorkflowStatus.STOPPED, WorkflowStatus.FAILED_CONTINUE, } ) # Statuses that allow downstream stages to continue CONTINUABLE_STATUSES: frozenset[WorkflowStatus] = frozenset( { WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED_CONTINUE, WorkflowStatus.SKIPPED, WorkflowStatus.REDIRECT, } ) # Statuses that indicate a halt condition - downstream execution should be blocked HALT_STATUSES: frozenset[WorkflowStatus] = frozenset( { WorkflowStatus.TERMINAL, WorkflowStatus.CANCELED, WorkflowStatus.STOPPED, } ) # Statuses that indicate the entity is still actively processing ACTIVE_STATUSES: frozenset[WorkflowStatus] = frozenset( { WorkflowStatus.NOT_STARTED, WorkflowStatus.RUNNING, WorkflowStatus.PAUSED, WorkflowStatus.SUSPENDED, } ) # Valid state transitions for correctness guarantees # Maps current state -> set of valid target states VALID_TRANSITIONS: dict[WorkflowStatus, frozenset[WorkflowStatus]] = { WorkflowStatus.NOT_STARTED: frozenset( { WorkflowStatus.RUNNING, WorkflowStatus.CANCELED, WorkflowStatus.SKIPPED, WorkflowStatus.BUFFERED, WorkflowStatus.TERMINAL, # For error cases before task starts (e.g., max retries exceeded) } ), WorkflowStatus.BUFFERED: frozenset( { WorkflowStatus.NOT_STARTED, WorkflowStatus.RUNNING, WorkflowStatus.CANCELED, WorkflowStatus.SKIPPED, } ), WorkflowStatus.RUNNING: frozenset( { WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED_CONTINUE, WorkflowStatus.TERMINAL, WorkflowStatus.CANCELED, WorkflowStatus.PAUSED, WorkflowStatus.STOPPED, WorkflowStatus.SUSPENDED, WorkflowStatus.REDIRECT, WorkflowStatus.SKIPPED, # For manual skip of running task } ), WorkflowStatus.PAUSED: frozenset( { WorkflowStatus.RUNNING, WorkflowStatus.CANCELED, WorkflowStatus.STOPPED, } ), WorkflowStatus.SUSPENDED: frozenset( { WorkflowStatus.RUNNING, WorkflowStatus.CANCELED, WorkflowStatus.STOPPED, } ), WorkflowStatus.REDIRECT: frozenset( { WorkflowStatus.RUNNING, WorkflowStatus.SUCCEEDED, WorkflowStatus.CANCELED, } ), # Terminal states - no transitions allowed WorkflowStatus.SUCCEEDED: frozenset(), WorkflowStatus.FAILED_CONTINUE: frozenset(), WorkflowStatus.TERMINAL: frozenset(), WorkflowStatus.CANCELED: frozenset(), WorkflowStatus.STOPPED: frozenset(), WorkflowStatus.SKIPPED: frozenset(), }
[docs] class InvalidStateTransitionError(Exception): """Raised when an invalid state transition is attempted.""" def __init__( self, current: WorkflowStatus, target: WorkflowStatus, entity_type: str = "entity", entity_id: str | None = None, ) -> None: self.current = current self.target = target self.entity_type = entity_type self.entity_id = entity_id msg = f"Invalid state transition for {entity_type}" if entity_id: msg += f" {entity_id}" msg += f": {current} -> {target}" super().__init__(msg)
[docs] def can_transition(current: WorkflowStatus, target: WorkflowStatus) -> bool: """Check if a state transition is valid. Args: current: The current workflow status target: The desired target status Returns: True if the transition is valid, False otherwise """ # Same state is always allowed (idempotent) if current == target: return True return target in VALID_TRANSITIONS.get(current, frozenset())
[docs] def validate_transition( current: WorkflowStatus, target: WorkflowStatus, entity_type: str = "entity", entity_id: str | None = None, ) -> None: """Validate and raise if state transition is invalid. Args: current: The current workflow status target: The desired target status entity_type: Type of entity (workflow, stage, task) for error message entity_id: Optional ID of the entity for error message Raises: InvalidStateTransitionError: If the transition is not valid """ if not can_transition(current, target): raise InvalidStateTransitionError(current, target, entity_type, entity_id)