Persistence
WorkflowStore interface package.
- class stabilize.persistence.store.WorkflowStore[source]
Bases:
ABCAbstract interface for execution persistence.
- abstractmethod add_stage(stage)[source]
Add a new stage to an execution.
- Parameters:
stage (StageExecution) – The stage to add
- Return type:
None
- abstractmethod cancel(execution_id, canceled_by, reason)[source]
Cancel an execution.
- Parameters:
execution_id (str) – The execution ID
canceled_by (str) – Who canceled it
reason (str) – Cancellation reason
- Return type:
None
- cleanup_old_processed_messages(max_age_hours=24.0)[source]
Clean up old processed message records.
- Parameters:
max_age_hours (float) – Delete records older than this many hours
- Returns:
Number of records deleted
- Return type:
int
- count_by_application(application)[source]
Count executions for an application.
- Parameters:
application (str) – The application name
- Returns:
Number of executions
- Return type:
int
- abstractmethod delete(execution_id)[source]
Delete an execution and all its stages.
- Parameters:
execution_id (str) – The execution ID
- Return type:
None
- exists(execution_id)[source]
Check if a workflow exists.
- Parameters:
execution_id (str) – The workflow ID to check
- Returns:
True if the workflow exists, False otherwise
- Return type:
bool
- abstractmethod get_downstream_stages(execution_id, stage_ref_id)[source]
Get downstream stages for a given stage.
- Parameters:
execution_id (str) – The execution ID
stage_ref_id (str) – The reference ID of the stage
- Returns:
List of downstream stages
- Return type:
list[StageExecution]
- abstractmethod get_merged_ancestor_outputs(execution_id, stage_ref_id)[source]
Get merged outputs from all ancestor stages.
Traverses the DAG upstream, collects outputs, and merges them according to topological order (latest wins).
- Parameters:
execution_id (str) – The execution ID
stage_ref_id (str) – The reference ID of the stage
- Returns:
Merged dictionary of outputs
- Return type:
dict[str, Any]
- abstractmethod get_synthetic_stages(execution_id, parent_stage_id)[source]
Get synthetic stages for a given parent stage.
- Parameters:
execution_id (str) – The execution ID
parent_stage_id (str) – The parent stage ID
- Returns:
List of synthetic stages
- Return type:
list[StageExecution]
- abstractmethod get_upstream_stages(execution_id, stage_ref_id)[source]
Get upstream stages for a given stage.
- Parameters:
execution_id (str) – The execution ID
stage_ref_id (str) – The reference ID of the stage
- Returns:
List of upstream stages
- Return type:
list[StageExecution]
- is_healthy()[source]
Check if the repository is healthy.
- Returns:
True if healthy
- Return type:
bool
- is_message_processed(message_id)[source]
Check if a message has already been processed.
Used for idempotency - prevents duplicate message processing.
- Parameters:
message_id (str) – The unique message ID
- Returns:
True if the message has been processed before
- Return type:
bool
- mark_message_processed(message_id, handler_type=None, execution_id=None)[source]
Mark a message as successfully processed.
- Parameters:
message_id (str) – The unique message ID
handler_type (str | None) – Optional handler type for debugging
execution_id (str | None) – Optional execution ID for debugging
- Return type:
None
- abstractmethod pause(execution_id, paused_by)[source]
Pause an execution.
- Parameters:
execution_id (str) – The execution ID
paused_by (str) – Who paused it
- Return type:
None
- abstractmethod remove_stage(execution, stage_id)[source]
Remove a stage from an execution.
- Parameters:
execution (Workflow) – The execution
stage_id (str) – The stage ID to remove
- Return type:
None
- abstractmethod resume(execution_id)[source]
Resume a paused execution.
- Parameters:
execution_id (str) – The execution ID
- Return type:
None
- abstractmethod retrieve(execution_id)[source]
Retrieve an execution by ID.
- Parameters:
execution_id (str) – The execution ID
- Returns:
The execution
- Raises:
WorkflowNotFoundError – If not found
- Return type:
- abstractmethod retrieve_by_application(application, criteria=None)[source]
Retrieve executions by application.
- Parameters:
application (str) – The application name
criteria (WorkflowCriteria | None) – Optional query criteria
- Returns:
Iterator of matching executions
- Return type:
Iterator[Workflow]
- abstractmethod retrieve_by_pipeline_config_id(pipeline_config_id, criteria=None)[source]
Retrieve executions by pipeline config ID.
- Parameters:
pipeline_config_id (str) – The pipeline config ID
criteria (WorkflowCriteria | None) – Optional query criteria
- Returns:
Iterator of matching executions
- Return type:
Iterator[Workflow]
- abstractmethod retrieve_execution_summary(execution_id)[source]
Retrieve execution metadata without stages.
- Parameters:
execution_id (str) – The execution ID
- Returns:
The execution with empty stages list
- Raises:
WorkflowNotFoundError – If not found
- Return type:
- abstractmethod retrieve_stage(stage_id)[source]
Retrieve a single stage by ID.
The returned stage will have a partial parent execution attached (containing metadata but no other stages).
- Parameters:
stage_id (str) – The stage ID
- Returns:
The stage execution
- Raises:
ValueError – If stage not found
- Return type:
- abstractmethod store(execution)[source]
Store a complete execution.
Creates the execution and all its stages.
- Parameters:
execution (Workflow) – The execution to store
- Return type:
None
- abstractmethod store_stage(stage, expected_phase=None)[source]
Store or update a stage.
- Parameters:
stage (StageExecution) – The stage to store
expected_phase (str | None) – If provided, the WHERE clause includes status = expected_phase. This enables phase-aware optimistic locking where updates only succeed if the stage is in the expected status.
- Return type:
None
- transaction(queue=None)[source]
Create an atomic transaction for store + queue operations.
Use this when you need to atomically update both stage state AND queue a message. This prevents orphaned workflows from crashes between separate store and queue operations.
Default implementation provides a no-op transaction that just delegates to the normal store methods. SQLite and PostgreSQL implementations provide true atomic transactions.
- Parameters:
queue (Queue | None) – Optional queue for pushing messages. Required for backends that don’t have integrated queue support.
- Return type:
Iterator[StoreTransaction]
- Usage:
- with store.transaction(queue) as txn:
txn.store_stage(stage) txn.push_message(message)
# Commits on success, rolls back on exception
- Yields:
StoreTransaction with store_stage() and push_message() methods
- Parameters:
queue (Queue | None)
- Return type:
Iterator[StoreTransaction]
- class stabilize.persistence.store.WorkflowCriteria(page_size=20, statuses=None, start_time_before=None, start_time_after=None)[source]
Bases:
objectCriteria for querying executions.
- Parameters:
page_size (int)
statuses (set[WorkflowStatus] | None)
start_time_before (int | None)
start_time_after (int | None)
- page_size: int = 20
- start_time_after: int | None = None
- start_time_before: int | None = None
- statuses: set[WorkflowStatus] | None = None
- exception stabilize.persistence.store.WorkflowNotFoundError(execution_id)[source]
Bases:
ExceptionRaised when an execution cannot be found.
- Parameters:
execution_id (str)
- class stabilize.persistence.store.StoreTransaction[source]
Bases:
ABCAbstract interface for atomic store + queue transactions.
Implementations must ensure that store_stage() and push_message() are committed atomically - either both succeed or both are rolled back.
- property is_atomic: bool
Whether this transaction provides true database-level atomicity.
Returns True if all operations within this transaction are guaranteed to either all succeed or all fail atomically (no partial state).
Override this in subclasses. Default is False for safety.
- abstractmethod mark_message_processed(message_id, handler_type=None, execution_id=None)[source]
Mark a message as successfully processed within the transaction.
This ensures that message processing is only marked complete if the transaction commits successfully.
- Parameters:
message_id (str)
handler_type (str | None)
execution_id (str | None)
- Return type:
None
- abstractmethod push_message(message, delay=0)[source]
Push a message to the queue within the transaction.
- Parameters:
message (Message)
delay (float)
- Return type:
None
- abstractmethod store_stage(stage, expected_phase=None)[source]
Store or update a stage within the transaction.
- Parameters:
stage (StageExecution) – The stage to store
expected_phase (str | None) – If provided, adds status check to WHERE clause for phase-aware optimistic locking (CAS pattern).
- Return type:
None
- class stabilize.persistence.store.NoOpTransaction(store, queue=None)[source]
Bases:
StoreTransactionDefault transaction that buffers operations and flushes on commit.
WARNING: This implementation is NOT truly atomic. A crash during flush can leave partial state in the database. This is only suitable for: - Testing environments - Non-critical workloads where eventual consistency is acceptable
For production critical systems requiring 100% atomicity, use: - SqliteWorkflowStore (provides true atomic transactions) - PostgresWorkflowStore (provides true atomic transactions)
Operations are buffered and flushed in careful order to minimize inconsistency windows, but true atomicity is impossible without database-level transaction support.
- Parameters:
store (WorkflowStore)
queue (Queue | None)
- mark_message_processed(message_id, handler_type=None, execution_id=None)[source]
Buffer processed message mark to be stored when transaction completes.
- Parameters:
message_id (str)
handler_type (str | None)
execution_id (str | None)
- Return type:
None
- push_message(message, delay=0)[source]
Buffer message to be pushed when transaction completes.
Messages are buffered and flushed when the context manager exits successfully. If an exception occurs, messages are not pushed.
- Parameters:
message (Message)
delay (float)
- Return type:
None
- store_stage(stage, expected_phase=None)[source]
Buffer stage to be stored when transaction completes.
Stages are buffered and flushed when the context manager exits successfully. If an exception occurs, stages are not stored.
- Parameters:
stage (StageExecution) – Stage to store
expected_phase (str | None) – If provided, passed to store.store_stage() for CAS.
- Return type:
None
SQLite persistence implementation.
- class stabilize.persistence.sqlite.SqliteWorkflowStore(connection_string, create_tables=False)[source]
Bases:
SqliteWorkflowCrudMixin,SqliteStageOpsMixin,SqliteQueriesMixin,WorkflowStoreSQLite implementation of WorkflowStore.
Uses native sqlite3 for file-based or in-memory storage. Suitable for development, testing, and single-node deployments.
Features: - WAL mode for better concurrent read performance - Foreign key support enabled - JSON stored as TEXT strings - Arrays stored as JSON strings - Thread-local connections managed by singleton ConnectionManager
- Parameters:
connection_string (str)
create_tables (bool)
- cleanup_old_processed_messages(max_age_hours=24.0)[source]
Clean up old processed message records.
- Parameters:
max_age_hours (float)
- Return type:
int
- is_message_processed(message_id)[source]
Check if a message has already been processed.
- Parameters:
message_id (str)
- Return type:
bool
- mark_message_processed(message_id, handler_type=None, execution_id=None)[source]
Mark a message as successfully processed.
- Parameters:
message_id (str)
handler_type (str | None)
execution_id (str | None)
- Return type:
None
- transaction(queue=None)[source]
Create atomic transaction for store + queue operations.
Use this when you need to atomically update both stage state AND queue a message. This prevents orphaned workflows from crashes between separate store and queue operations.
SQLite implementation writes directly to the queue_messages table in the same transaction, so the queue parameter is ignored.
- Parameters:
queue (Queue | None) – Ignored (for API compatibility with base class)
- Return type:
Iterator[StoreTransaction]
- Usage:
- with store.transaction() as txn:
txn.store_stage(stage) txn.push_message(message)
# Auto-commits on success, rolls back on exception
- Yields:
AtomicTransaction with store_stage() and push_message() methods
- Parameters:
queue (Queue | None)
- Return type:
Iterator[StoreTransaction]
PostgreSQL persistence implementation.
- class stabilize.persistence.postgres.PostgresWorkflowStore(connection_string)[source]
Bases:
WorkflowStorePostgreSQL implementation of WorkflowStore.
Uses native psycopg3 with connection pooling for database operations. Supports concurrent access and provides efficient queries for pipeline execution tracking.
- Parameters:
connection_string (str)
- cancel(execution_id, canceled_by, reason)[source]
Cancel an execution.
- Parameters:
execution_id (str)
canceled_by (str)
reason (str)
- Return type:
None
- cleanup_old_processed_messages(max_age_hours=24.0)[source]
Clean up old processed message records.
- Parameters:
max_age_hours (float)
- Return type:
int
- get_downstream_stages(execution_id, stage_ref_id)[source]
Get downstream stages with tasks loaded.
- Parameters:
execution_id (str)
stage_ref_id (str)
- Return type:
list[Any]
- get_merged_ancestor_outputs(execution_id, stage_ref_id)[source]
Get merged outputs from all ancestor stages.
- Parameters:
execution_id (str)
stage_ref_id (str)
- Return type:
dict[str, Any]
- get_synthetic_stages(execution_id, parent_stage_id)[source]
Get synthetic stages with tasks loaded.
- Parameters:
execution_id (str)
parent_stage_id (str)
- Return type:
list[Any]
- get_upstream_stages(execution_id, stage_ref_id)[source]
Get upstream stages with tasks loaded.
- Parameters:
execution_id (str)
stage_ref_id (str)
- Return type:
list[Any]
- is_message_processed(message_id)[source]
Check if a message has already been processed.
- Parameters:
message_id (str)
- Return type:
bool
- mark_message_processed(message_id, handler_type=None, execution_id=None)[source]
Mark a message as successfully processed.
- Parameters:
message_id (str)
handler_type (str | None)
execution_id (str | None)
- Return type:
None
- pause(execution_id, paused_by)[source]
Pause an execution.
- Parameters:
execution_id (str)
paused_by (str)
- Return type:
None
- remove_stage(execution, stage_id)[source]
Remove a stage.
- Parameters:
execution (Workflow)
stage_id (str)
- Return type:
None
- resume(execution_id)[source]
Resume a paused execution.
- Parameters:
execution_id (str)
- Return type:
None
- retrieve(execution_id)[source]
Retrieve an execution by ID.
- Parameters:
execution_id (str)
- Return type:
- retrieve_by_application(application, criteria=None)[source]
Retrieve executions by application.
- Parameters:
application (str)
criteria (WorkflowCriteria | None)
- Return type:
Iterator[Workflow]
- retrieve_by_pipeline_config_id(pipeline_config_id, criteria=None)[source]
Retrieve executions by pipeline config ID.
- Parameters:
pipeline_config_id (str)
criteria (WorkflowCriteria | None)
- Return type:
Iterator[Workflow]
- retrieve_execution_summary(execution_id)[source]
Retrieve execution metadata without stages.
- Parameters:
execution_id (str)
- Return type:
- retrieve_stage(stage_id)[source]
Retrieve a single stage by ID.
- Parameters:
stage_id (str)
- Return type:
Any
- store(execution)[source]
Store a complete execution.
- Parameters:
execution (Workflow)
- Return type:
None
- store_stage(stage, expected_phase=None, connection=None)[source]
Store or update a stage.
- Parameters:
stage (Any) – The stage to store
expected_phase (str | None) – If provided, adds status check to WHERE clause for phase-aware optimistic locking.
connection (Any | None) – Optional existing connection to use
- Return type:
None
- transaction(queue=None)[source]
Create an atomic transaction for store + queue operations.
- Parameters:
queue (Any | None)
- Return type:
Iterator[StoreTransaction]