Source code for stabilize.persistence.store.interface

"""
WorkflowStore interface.

This module defines the abstract interface for execution persistence.
All storage backends must implement this interface.

Enterprise Features:
- Atomic transactions for store + queue operations (optional)
- Dead letter queue integration
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from stabilize.models.stage import StageExecution
    from stabilize.models.workflow import Workflow
    from stabilize.queue import Queue

    from .criteria import WorkflowCriteria
    from .transaction import StoreTransaction


[docs] class WorkflowStore(ABC): """Abstract interface for execution persistence.""" # ========== Execution Operations ==========
[docs] @abstractmethod def store(self, execution: Workflow) -> None: """ Store a complete execution. Creates the execution and all its stages. Args: execution: The execution to store """ pass
[docs] @abstractmethod def retrieve(self, execution_id: str) -> Workflow: """ Retrieve an execution by ID. Args: execution_id: The execution ID Returns: The execution Raises: WorkflowNotFoundError: If not found """ pass
[docs] @abstractmethod def retrieve_execution_summary(self, execution_id: str) -> Workflow: """ Retrieve execution metadata without stages. Args: execution_id: The execution ID Returns: The execution with empty stages list Raises: WorkflowNotFoundError: If not found """ pass
[docs] @abstractmethod def update_status(self, execution: Workflow) -> None: """ Update the status of an execution. Args: execution: The execution with updated status """ pass
[docs] @abstractmethod def delete(self, execution_id: str) -> None: """ Delete an execution and all its stages. Args: execution_id: The execution ID """ pass
# ========== Stage Operations ==========
[docs] @abstractmethod def store_stage( self, stage: StageExecution, expected_phase: str | None = None, ) -> None: """ Store or update a stage. Args: stage: The stage to store expected_phase: If provided, the WHERE clause includes status = expected_phase. This enables phase-aware optimistic locking where updates only succeed if the stage is in the expected status. """ pass
[docs] @abstractmethod def add_stage(self, stage: StageExecution) -> None: """ Add a new stage to an execution. Args: stage: The stage to add """ pass
[docs] @abstractmethod def remove_stage( self, execution: Workflow, stage_id: str, ) -> None: """ Remove a stage from an execution. Args: execution: The execution stage_id: The stage ID to remove """ pass
[docs] @abstractmethod def retrieve_stage(self, stage_id: str) -> StageExecution: """ Retrieve a single stage by ID. The returned stage will have a partial parent execution attached (containing metadata but no other stages). Args: stage_id: The stage ID Returns: The stage execution Raises: ValueError: If stage not found """ pass
[docs] @abstractmethod def get_upstream_stages( self, execution_id: str, stage_ref_id: str, ) -> list[StageExecution]: """ Get upstream stages for a given stage. Args: execution_id: The execution ID stage_ref_id: The reference ID of the stage Returns: List of upstream stages """ pass
[docs] @abstractmethod def get_downstream_stages( self, execution_id: str, stage_ref_id: str, ) -> list[StageExecution]: """ Get downstream stages for a given stage. Args: execution_id: The execution ID stage_ref_id: The reference ID of the stage Returns: List of downstream stages """ pass
[docs] @abstractmethod def get_synthetic_stages( self, execution_id: str, parent_stage_id: str, ) -> list[StageExecution]: """ Get synthetic stages for a given parent stage. Args: execution_id: The execution ID parent_stage_id: The parent stage ID Returns: List of synthetic stages """ pass
[docs] @abstractmethod def get_merged_ancestor_outputs( self, execution_id: str, stage_ref_id: str, ) -> dict[str, Any]: """ Get merged outputs from all ancestor stages. Traverses the DAG upstream, collects outputs, and merges them according to topological order (latest wins). Args: execution_id: The execution ID stage_ref_id: The reference ID of the stage Returns: Merged dictionary of outputs """ pass
# ========== Query Operations ==========
[docs] @abstractmethod def retrieve_by_pipeline_config_id( self, pipeline_config_id: str, criteria: WorkflowCriteria | None = None, ) -> Iterator[Workflow]: """ Retrieve executions by pipeline config ID. Args: pipeline_config_id: The pipeline config ID criteria: Optional query criteria Returns: Iterator of matching executions """ pass
[docs] @abstractmethod def retrieve_by_application( self, application: str, criteria: WorkflowCriteria | None = None, ) -> Iterator[Workflow]: """ Retrieve executions by application. Args: application: The application name criteria: Optional query criteria Returns: Iterator of matching executions """ pass
# ========== Pause/Resume Operations ==========
[docs] @abstractmethod def pause( self, execution_id: str, paused_by: str, ) -> None: """ Pause an execution. Args: execution_id: The execution ID paused_by: Who paused it """ pass
[docs] @abstractmethod def resume(self, execution_id: str) -> None: """ Resume a paused execution. Args: execution_id: The execution ID """ pass
# ========== Cancel Operations ==========
[docs] @abstractmethod def cancel( self, execution_id: str, canceled_by: str, reason: str, ) -> None: """ Cancel an execution. Args: execution_id: The execution ID canceled_by: Who canceled it reason: Cancellation reason """ pass
# ========== Message Deduplication ==========
[docs] def is_message_processed(self, message_id: str) -> bool: """ Check if a message has already been processed. Used for idempotency - prevents duplicate message processing. Args: message_id: The unique message ID Returns: True if the message has been processed before """ # Default implementation: no deduplication (always returns False) return False
[docs] def mark_message_processed( self, message_id: str, handler_type: str | None = None, execution_id: str | None = None, ) -> None: """ Mark a message as successfully processed. Args: message_id: The unique message ID handler_type: Optional handler type for debugging execution_id: Optional execution ID for debugging """ # Default implementation: no-op pass
[docs] def cleanup_old_processed_messages(self, max_age_hours: float = 24.0) -> int: """ Clean up old processed message records. Args: max_age_hours: Delete records older than this many hours Returns: Number of records deleted """ # Default implementation: no cleanup return 0
# ========== Optional Methods ==========
[docs] def is_healthy(self) -> bool: """ Check if the repository is healthy. Returns: True if healthy """ return True
[docs] def count_by_application(self, application: str) -> int: """ Count executions for an application. Args: application: The application name Returns: Number of executions """ return sum(1 for _ in self.retrieve_by_application(application))
[docs] def exists(self, execution_id: str) -> bool: """ Check if a workflow exists. Args: execution_id: The workflow ID to check Returns: True if the workflow exists, False otherwise """ try: self.retrieve_execution_summary(execution_id) return True except Exception: return False
[docs] @contextmanager def transaction(self, queue: Queue | None = None) -> Iterator[StoreTransaction]: """ Create an atomic transaction for store + queue operations. Use this when you need to atomically update both stage state AND queue a message. This prevents orphaned workflows from crashes between separate store and queue operations. Default implementation provides a no-op transaction that just delegates to the normal store methods. SQLite and PostgreSQL implementations provide true atomic transactions. Args: queue: Optional queue for pushing messages. Required for backends that don't have integrated queue support. Usage: with store.transaction(queue) as txn: txn.store_stage(stage) txn.push_message(message) # Commits on success, rolls back on exception Yields: StoreTransaction with store_stage() and push_message() methods """ from .noop_transaction import NoOpTransaction txn = NoOpTransaction(self, queue) try: yield txn # Flush pending messages on successful exit txn._flush_messages() except Exception: # Don't flush on exception raise