"""
WorkflowStore interface.
This module defines the abstract interface for execution persistence.
All storage backends must implement this interface.
Enterprise Features:
- Atomic transactions for store + queue operations (optional)
- Dead letter queue integration
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from stabilize.models.stage import StageExecution
from stabilize.models.workflow import Workflow
from stabilize.queue import Queue
from .criteria import WorkflowCriteria
from .transaction import StoreTransaction
[docs]
class WorkflowStore(ABC):
"""Abstract interface for execution persistence."""
# ========== Execution Operations ==========
[docs]
@abstractmethod
def store(self, execution: Workflow) -> None:
"""
Store a complete execution.
Creates the execution and all its stages.
Args:
execution: The execution to store
"""
pass
[docs]
@abstractmethod
def retrieve(self, execution_id: str) -> Workflow:
"""
Retrieve an execution by ID.
Args:
execution_id: The execution ID
Returns:
The execution
Raises:
WorkflowNotFoundError: If not found
"""
pass
[docs]
@abstractmethod
def retrieve_execution_summary(self, execution_id: str) -> Workflow:
"""
Retrieve execution metadata without stages.
Args:
execution_id: The execution ID
Returns:
The execution with empty stages list
Raises:
WorkflowNotFoundError: If not found
"""
pass
[docs]
@abstractmethod
def update_status(self, execution: Workflow) -> None:
"""
Update the status of an execution.
Args:
execution: The execution with updated status
"""
pass
[docs]
@abstractmethod
def delete(self, execution_id: str) -> None:
"""
Delete an execution and all its stages.
Args:
execution_id: The execution ID
"""
pass
# ========== Stage Operations ==========
[docs]
@abstractmethod
def store_stage(
self,
stage: StageExecution,
expected_phase: str | None = None,
) -> None:
"""
Store or update a stage.
Args:
stage: The stage to store
expected_phase: If provided, the WHERE clause includes status = expected_phase.
This enables phase-aware optimistic locking where updates only
succeed if the stage is in the expected status.
"""
pass
[docs]
@abstractmethod
def add_stage(self, stage: StageExecution) -> None:
"""
Add a new stage to an execution.
Args:
stage: The stage to add
"""
pass
[docs]
@abstractmethod
def remove_stage(
self,
execution: Workflow,
stage_id: str,
) -> None:
"""
Remove a stage from an execution.
Args:
execution: The execution
stage_id: The stage ID to remove
"""
pass
[docs]
@abstractmethod
def retrieve_stage(self, stage_id: str) -> StageExecution:
"""
Retrieve a single stage by ID.
The returned stage will have a partial parent execution attached
(containing metadata but no other stages).
Args:
stage_id: The stage ID
Returns:
The stage execution
Raises:
ValueError: If stage not found
"""
pass
[docs]
@abstractmethod
def get_upstream_stages(
self,
execution_id: str,
stage_ref_id: str,
) -> list[StageExecution]:
"""
Get upstream stages for a given stage.
Args:
execution_id: The execution ID
stage_ref_id: The reference ID of the stage
Returns:
List of upstream stages
"""
pass
[docs]
@abstractmethod
def get_downstream_stages(
self,
execution_id: str,
stage_ref_id: str,
) -> list[StageExecution]:
"""
Get downstream stages for a given stage.
Args:
execution_id: The execution ID
stage_ref_id: The reference ID of the stage
Returns:
List of downstream stages
"""
pass
[docs]
@abstractmethod
def get_synthetic_stages(
self,
execution_id: str,
parent_stage_id: str,
) -> list[StageExecution]:
"""
Get synthetic stages for a given parent stage.
Args:
execution_id: The execution ID
parent_stage_id: The parent stage ID
Returns:
List of synthetic stages
"""
pass
[docs]
@abstractmethod
def get_merged_ancestor_outputs(
self,
execution_id: str,
stage_ref_id: str,
) -> dict[str, Any]:
"""
Get merged outputs from all ancestor stages.
Traverses the DAG upstream, collects outputs, and merges them
according to topological order (latest wins).
Args:
execution_id: The execution ID
stage_ref_id: The reference ID of the stage
Returns:
Merged dictionary of outputs
"""
pass
# ========== Query Operations ==========
[docs]
@abstractmethod
def retrieve_by_pipeline_config_id(
self,
pipeline_config_id: str,
criteria: WorkflowCriteria | None = None,
) -> Iterator[Workflow]:
"""
Retrieve executions by pipeline config ID.
Args:
pipeline_config_id: The pipeline config ID
criteria: Optional query criteria
Returns:
Iterator of matching executions
"""
pass
[docs]
@abstractmethod
def retrieve_by_application(
self,
application: str,
criteria: WorkflowCriteria | None = None,
) -> Iterator[Workflow]:
"""
Retrieve executions by application.
Args:
application: The application name
criteria: Optional query criteria
Returns:
Iterator of matching executions
"""
pass
# ========== Pause/Resume Operations ==========
[docs]
@abstractmethod
def pause(
self,
execution_id: str,
paused_by: str,
) -> None:
"""
Pause an execution.
Args:
execution_id: The execution ID
paused_by: Who paused it
"""
pass
[docs]
@abstractmethod
def resume(self, execution_id: str) -> None:
"""
Resume a paused execution.
Args:
execution_id: The execution ID
"""
pass
# ========== Cancel Operations ==========
[docs]
@abstractmethod
def cancel(
self,
execution_id: str,
canceled_by: str,
reason: str,
) -> None:
"""
Cancel an execution.
Args:
execution_id: The execution ID
canceled_by: Who canceled it
reason: Cancellation reason
"""
pass
# ========== Message Deduplication ==========
[docs]
def is_message_processed(self, message_id: str) -> bool:
"""
Check if a message has already been processed.
Used for idempotency - prevents duplicate message processing.
Args:
message_id: The unique message ID
Returns:
True if the message has been processed before
"""
# Default implementation: no deduplication (always returns False)
return False
[docs]
def mark_message_processed(
self,
message_id: str,
handler_type: str | None = None,
execution_id: str | None = None,
) -> None:
"""
Mark a message as successfully processed.
Args:
message_id: The unique message ID
handler_type: Optional handler type for debugging
execution_id: Optional execution ID for debugging
"""
# Default implementation: no-op
pass
[docs]
def cleanup_old_processed_messages(self, max_age_hours: float = 24.0) -> int:
"""
Clean up old processed message records.
Args:
max_age_hours: Delete records older than this many hours
Returns:
Number of records deleted
"""
# Default implementation: no cleanup
return 0
# ========== Optional Methods ==========
[docs]
def is_healthy(self) -> bool:
"""
Check if the repository is healthy.
Returns:
True if healthy
"""
return True
[docs]
def count_by_application(self, application: str) -> int:
"""
Count executions for an application.
Args:
application: The application name
Returns:
Number of executions
"""
return sum(1 for _ in self.retrieve_by_application(application))
[docs]
def exists(self, execution_id: str) -> bool:
"""
Check if a workflow exists.
Args:
execution_id: The workflow ID to check
Returns:
True if the workflow exists, False otherwise
"""
try:
self.retrieve_execution_summary(execution_id)
return True
except Exception:
return False
[docs]
@contextmanager
def transaction(self, queue: Queue | None = None) -> Iterator[StoreTransaction]:
"""
Create an atomic transaction for store + queue operations.
Use this when you need to atomically update both stage state AND
queue a message. This prevents orphaned workflows from crashes
between separate store and queue operations.
Default implementation provides a no-op transaction that just
delegates to the normal store methods. SQLite and PostgreSQL
implementations provide true atomic transactions.
Args:
queue: Optional queue for pushing messages. Required for
backends that don't have integrated queue support.
Usage:
with store.transaction(queue) as txn:
txn.store_stage(stage)
txn.push_message(message)
# Commits on success, rolls back on exception
Yields:
StoreTransaction with store_stage() and push_message() methods
"""
from .noop_transaction import NoOpTransaction
txn = NoOpTransaction(self, queue)
try:
yield txn
# Flush pending messages on successful exit
txn._flush_messages()
except Exception:
# Don't flush on exception
raise