Persistence

WorkflowStore interface package.

class stabilize.persistence.store.WorkflowStore[source]

Bases: ABC

Abstract interface for execution persistence.

abstractmethod add_stage(stage)[source]

Add a new stage to an execution.

Parameters:

stage (StageExecution) – The stage to add

Return type:

None

abstractmethod cancel(execution_id, canceled_by, reason)[source]

Cancel an execution.

Parameters:
  • execution_id (str) – The execution ID

  • canceled_by (str) – Who canceled it

  • reason (str) – Cancellation reason

Return type:

None

cleanup_old_processed_messages(max_age_hours=24.0)[source]

Clean up old processed message records.

Parameters:

max_age_hours (float) – Delete records older than this many hours

Returns:

Number of records deleted

Return type:

int

count_by_application(application)[source]

Count executions for an application.

Parameters:

application (str) – The application name

Returns:

Number of executions

Return type:

int

abstractmethod delete(execution_id)[source]

Delete an execution and all its stages.

Parameters:

execution_id (str) – The execution ID

Return type:

None

exists(execution_id)[source]

Check if a workflow exists.

Parameters:

execution_id (str) – The workflow ID to check

Returns:

True if the workflow exists, False otherwise

Return type:

bool

abstractmethod get_downstream_stages(execution_id, stage_ref_id)[source]

Get downstream stages for a given stage.

Parameters:
  • execution_id (str) – The execution ID

  • stage_ref_id (str) – The reference ID of the stage

Returns:

List of downstream stages

Return type:

list[StageExecution]

abstractmethod get_merged_ancestor_outputs(execution_id, stage_ref_id)[source]

Get merged outputs from all ancestor stages.

Traverses the DAG upstream, collects outputs, and merges them according to topological order (latest wins).

Parameters:
  • execution_id (str) – The execution ID

  • stage_ref_id (str) – The reference ID of the stage

Returns:

Merged dictionary of outputs

Return type:

dict[str, Any]

abstractmethod get_synthetic_stages(execution_id, parent_stage_id)[source]

Get synthetic stages for a given parent stage.

Parameters:
  • execution_id (str) – The execution ID

  • parent_stage_id (str) – The parent stage ID

Returns:

List of synthetic stages

Return type:

list[StageExecution]

abstractmethod get_upstream_stages(execution_id, stage_ref_id)[source]

Get upstream stages for a given stage.

Parameters:
  • execution_id (str) – The execution ID

  • stage_ref_id (str) – The reference ID of the stage

Returns:

List of upstream stages

Return type:

list[StageExecution]

is_healthy()[source]

Check if the repository is healthy.

Returns:

True if healthy

Return type:

bool

is_message_processed(message_id)[source]

Check if a message has already been processed.

Used for idempotency - prevents duplicate message processing.

Parameters:

message_id (str) – The unique message ID

Returns:

True if the message has been processed before

Return type:

bool

mark_message_processed(message_id, handler_type=None, execution_id=None)[source]

Mark a message as successfully processed.

Parameters:
  • message_id (str) – The unique message ID

  • handler_type (str | None) – Optional handler type for debugging

  • execution_id (str | None) – Optional execution ID for debugging

Return type:

None

abstractmethod pause(execution_id, paused_by)[source]

Pause an execution.

Parameters:
  • execution_id (str) – The execution ID

  • paused_by (str) – Who paused it

Return type:

None

abstractmethod remove_stage(execution, stage_id)[source]

Remove a stage from an execution.

Parameters:
  • execution (Workflow) – The execution

  • stage_id (str) – The stage ID to remove

Return type:

None

abstractmethod resume(execution_id)[source]

Resume a paused execution.

Parameters:

execution_id (str) – The execution ID

Return type:

None

abstractmethod retrieve(execution_id)[source]

Retrieve an execution by ID.

Parameters:

execution_id (str) – The execution ID

Returns:

The execution

Raises:

WorkflowNotFoundError – If not found

Return type:

Workflow

abstractmethod retrieve_by_application(application, criteria=None)[source]

Retrieve executions by application.

Parameters:
  • application (str) – The application name

  • criteria (WorkflowCriteria | None) – Optional query criteria

Returns:

Iterator of matching executions

Return type:

Iterator[Workflow]

abstractmethod retrieve_by_pipeline_config_id(pipeline_config_id, criteria=None)[source]

Retrieve executions by pipeline config ID.

Parameters:
  • pipeline_config_id (str) – The pipeline config ID

  • criteria (WorkflowCriteria | None) – Optional query criteria

Returns:

Iterator of matching executions

Return type:

Iterator[Workflow]

abstractmethod retrieve_execution_summary(execution_id)[source]

Retrieve execution metadata without stages.

Parameters:

execution_id (str) – The execution ID

Returns:

The execution with empty stages list

Raises:

WorkflowNotFoundError – If not found

Return type:

Workflow

abstractmethod retrieve_stage(stage_id)[source]

Retrieve a single stage by ID.

The returned stage will have a partial parent execution attached (containing metadata but no other stages).

Parameters:

stage_id (str) – The stage ID

Returns:

The stage execution

Raises:

ValueError – If stage not found

Return type:

StageExecution

abstractmethod store(execution)[source]

Store a complete execution.

Creates the execution and all its stages.

Parameters:

execution (Workflow) – The execution to store

Return type:

None

abstractmethod store_stage(stage, expected_phase=None)[source]

Store or update a stage.

Parameters:
  • stage (StageExecution) – The stage to store

  • expected_phase (str | None) – If provided, the WHERE clause includes status = expected_phase. This enables phase-aware optimistic locking where updates only succeed if the stage is in the expected status.

Return type:

None

transaction(queue=None)[source]

Create an atomic transaction for store + queue operations.

Use this when you need to atomically update both stage state AND queue a message. This prevents orphaned workflows from crashes between separate store and queue operations.

Default implementation provides a no-op transaction that just delegates to the normal store methods. SQLite and PostgreSQL implementations provide true atomic transactions.

Parameters:

queue (Queue | None) – Optional queue for pushing messages. Required for backends that don’t have integrated queue support.

Return type:

Iterator[StoreTransaction]

Usage:
with store.transaction(queue) as txn:

txn.store_stage(stage) txn.push_message(message)

# Commits on success, rolls back on exception

Yields:

StoreTransaction with store_stage() and push_message() methods

Parameters:

queue (Queue | None)

Return type:

Iterator[StoreTransaction]

abstractmethod update_status(execution)[source]

Update the status of an execution.

Parameters:

execution (Workflow) – The execution with updated status

Return type:

None

class stabilize.persistence.store.WorkflowCriteria(page_size=20, statuses=None, start_time_before=None, start_time_after=None)[source]

Bases: object

Criteria for querying executions.

Parameters:
  • page_size (int)

  • statuses (set[WorkflowStatus] | None)

  • start_time_before (int | None)

  • start_time_after (int | None)

page_size: int = 20
start_time_after: int | None = None
start_time_before: int | None = None
statuses: set[WorkflowStatus] | None = None
exception stabilize.persistence.store.WorkflowNotFoundError(execution_id)[source]

Bases: Exception

Raised when an execution cannot be found.

Parameters:

execution_id (str)

class stabilize.persistence.store.StoreTransaction[source]

Bases: ABC

Abstract interface for atomic store + queue transactions.

Implementations must ensure that store_stage() and push_message() are committed atomically - either both succeed or both are rolled back.

property is_atomic: bool

Whether this transaction provides true database-level atomicity.

Returns True if all operations within this transaction are guaranteed to either all succeed or all fail atomically (no partial state).

Override this in subclasses. Default is False for safety.

abstractmethod mark_message_processed(message_id, handler_type=None, execution_id=None)[source]

Mark a message as successfully processed within the transaction.

This ensures that message processing is only marked complete if the transaction commits successfully.

Parameters:
  • message_id (str)

  • handler_type (str | None)

  • execution_id (str | None)

Return type:

None

abstractmethod push_message(message, delay=0)[source]

Push a message to the queue within the transaction.

Parameters:
Return type:

None

abstractmethod store_stage(stage, expected_phase=None)[source]

Store or update a stage within the transaction.

Parameters:
  • stage (StageExecution) – The stage to store

  • expected_phase (str | None) – If provided, adds status check to WHERE clause for phase-aware optimistic locking (CAS pattern).

Return type:

None

abstractmethod update_workflow_status(workflow)[source]

Update workflow status within the transaction.

Parameters:

workflow (Workflow)

Return type:

None

class stabilize.persistence.store.NoOpTransaction(store, queue=None)[source]

Bases: StoreTransaction

Default transaction that buffers operations and flushes on commit.

WARNING: This implementation is NOT truly atomic. A crash during flush can leave partial state in the database. This is only suitable for: - Testing environments - Non-critical workloads where eventual consistency is acceptable

For production critical systems requiring 100% atomicity, use: - SqliteWorkflowStore (provides true atomic transactions) - PostgresWorkflowStore (provides true atomic transactions)

Operations are buffered and flushed in careful order to minimize inconsistency windows, but true atomicity is impossible without database-level transaction support.

Parameters:
mark_message_processed(message_id, handler_type=None, execution_id=None)[source]

Buffer processed message mark to be stored when transaction completes.

Parameters:
  • message_id (str)

  • handler_type (str | None)

  • execution_id (str | None)

Return type:

None

push_message(message, delay=0)[source]

Buffer message to be pushed when transaction completes.

Messages are buffered and flushed when the context manager exits successfully. If an exception occurs, messages are not pushed.

Parameters:
Return type:

None

store_stage(stage, expected_phase=None)[source]

Buffer stage to be stored when transaction completes.

Stages are buffered and flushed when the context manager exits successfully. If an exception occurs, stages are not stored.

Parameters:
  • stage (StageExecution) – Stage to store

  • expected_phase (str | None) – If provided, passed to store.store_stage() for CAS.

Return type:

None

update_workflow_status(workflow)[source]

Buffer workflow status update when transaction completes.

Workflows are buffered and flushed when the context manager exits successfully. If an exception occurs, workflows are not updated.

Parameters:

workflow (Workflow)

Return type:

None

SQLite persistence implementation.

class stabilize.persistence.sqlite.SqliteWorkflowStore(connection_string, create_tables=False)[source]

Bases: SqliteWorkflowCrudMixin, SqliteStageOpsMixin, SqliteQueriesMixin, WorkflowStore

SQLite implementation of WorkflowStore.

Uses native sqlite3 for file-based or in-memory storage. Suitable for development, testing, and single-node deployments.

Features: - WAL mode for better concurrent read performance - Foreign key support enabled - JSON stored as TEXT strings - Arrays stored as JSON strings - Thread-local connections managed by singleton ConnectionManager

Parameters:
  • connection_string (str)

  • create_tables (bool)

cleanup_old_processed_messages(max_age_hours=24.0)[source]

Clean up old processed message records.

Parameters:

max_age_hours (float)

Return type:

int

close()[source]

Close SQLite connection for current thread.

Return type:

None

is_healthy()[source]

Check if the database connection is healthy.

Return type:

bool

is_message_processed(message_id)[source]

Check if a message has already been processed.

Parameters:

message_id (str)

Return type:

bool

mark_message_processed(message_id, handler_type=None, execution_id=None)[source]

Mark a message as successfully processed.

Parameters:
  • message_id (str)

  • handler_type (str | None)

  • execution_id (str | None)

Return type:

None

transaction(queue=None)[source]

Create atomic transaction for store + queue operations.

Use this when you need to atomically update both stage state AND queue a message. This prevents orphaned workflows from crashes between separate store and queue operations.

SQLite implementation writes directly to the queue_messages table in the same transaction, so the queue parameter is ignored.

Parameters:

queue (Queue | None) – Ignored (for API compatibility with base class)

Return type:

Iterator[StoreTransaction]

Usage:
with store.transaction() as txn:

txn.store_stage(stage) txn.push_message(message)

# Auto-commits on success, rolls back on exception

Yields:

AtomicTransaction with store_stage() and push_message() methods

Parameters:

queue (Queue | None)

Return type:

Iterator[StoreTransaction]

PostgreSQL persistence implementation.

class stabilize.persistence.postgres.PostgresWorkflowStore(connection_string)[source]

Bases: WorkflowStore

PostgreSQL implementation of WorkflowStore.

Uses native psycopg3 with connection pooling for database operations. Supports concurrent access and provides efficient queries for pipeline execution tracking.

Parameters:

connection_string (str)

add_stage(stage)[source]

Add a new stage.

Parameters:

stage (Any)

Return type:

None

cancel(execution_id, canceled_by, reason)[source]

Cancel an execution.

Parameters:
  • execution_id (str)

  • canceled_by (str)

  • reason (str)

Return type:

None

cleanup_old_processed_messages(max_age_hours=24.0)[source]

Clean up old processed message records.

Parameters:

max_age_hours (float)

Return type:

int

close()[source]

Close the connection pool via connection manager.

Return type:

None

delete(execution_id)[source]

Delete an execution.

Parameters:

execution_id (str)

Return type:

None

get_downstream_stages(execution_id, stage_ref_id)[source]

Get downstream stages with tasks loaded.

Parameters:
  • execution_id (str)

  • stage_ref_id (str)

Return type:

list[Any]

get_merged_ancestor_outputs(execution_id, stage_ref_id)[source]

Get merged outputs from all ancestor stages.

Parameters:
  • execution_id (str)

  • stage_ref_id (str)

Return type:

dict[str, Any]

get_synthetic_stages(execution_id, parent_stage_id)[source]

Get synthetic stages with tasks loaded.

Parameters:
  • execution_id (str)

  • parent_stage_id (str)

Return type:

list[Any]

get_upstream_stages(execution_id, stage_ref_id)[source]

Get upstream stages with tasks loaded.

Parameters:
  • execution_id (str)

  • stage_ref_id (str)

Return type:

list[Any]

is_healthy()[source]

Check if the database connection is healthy.

Return type:

bool

is_message_processed(message_id)[source]

Check if a message has already been processed.

Parameters:

message_id (str)

Return type:

bool

mark_message_processed(message_id, handler_type=None, execution_id=None)[source]

Mark a message as successfully processed.

Parameters:
  • message_id (str)

  • handler_type (str | None)

  • execution_id (str | None)

Return type:

None

pause(execution_id, paused_by)[source]

Pause an execution.

Parameters:
  • execution_id (str)

  • paused_by (str)

Return type:

None

remove_stage(execution, stage_id)[source]

Remove a stage.

Parameters:
Return type:

None

resume(execution_id)[source]

Resume a paused execution.

Parameters:

execution_id (str)

Return type:

None

retrieve(execution_id)[source]

Retrieve an execution by ID.

Parameters:

execution_id (str)

Return type:

Workflow

retrieve_by_application(application, criteria=None)[source]

Retrieve executions by application.

Parameters:
Return type:

Iterator[Workflow]

retrieve_by_pipeline_config_id(pipeline_config_id, criteria=None)[source]

Retrieve executions by pipeline config ID.

Parameters:
Return type:

Iterator[Workflow]

retrieve_execution_summary(execution_id)[source]

Retrieve execution metadata without stages.

Parameters:

execution_id (str)

Return type:

Workflow

retrieve_stage(stage_id)[source]

Retrieve a single stage by ID.

Parameters:

stage_id (str)

Return type:

Any

store(execution)[source]

Store a complete execution.

Parameters:

execution (Workflow)

Return type:

None

store_stage(stage, expected_phase=None, connection=None)[source]

Store or update a stage.

Parameters:
  • stage (Any) – The stage to store

  • expected_phase (str | None) – If provided, adds status check to WHERE clause for phase-aware optimistic locking.

  • connection (Any | None) – Optional existing connection to use

Return type:

None

transaction(queue=None)[source]

Create an atomic transaction for store + queue operations.

Parameters:

queue (Any | None)

Return type:

Iterator[StoreTransaction]

update_status(execution)[source]

Update execution status.

Parameters:

execution (Workflow)

Return type:

None