Source code for stabilize.persistence.sqlite.store.store

"""
SQLite execution repository.

Lightweight persistence using SQLite for development and small deployments.
Uses singleton ConnectionManager for efficient connection sharing.

Enterprise Features:
- Atomic transactions for store + queue operations
- Dead letter queue support
- Thread-safe connection management via WAL mode
"""

from __future__ import annotations

import logging
import sqlite3
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING

from stabilize.persistence.sqlite.operations import (
    cleanup_old_processed_messages as _cleanup_old_processed_messages,
)
from stabilize.persistence.sqlite.operations import is_message_processed as _is_message_processed
from stabilize.persistence.sqlite.operations import (
    mark_message_processed as _mark_message_processed,
)
from stabilize.persistence.sqlite.schema import create_tables
from stabilize.persistence.sqlite.store.queries import SqliteQueriesMixin
from stabilize.persistence.sqlite.store.stage_ops import SqliteStageOpsMixin
from stabilize.persistence.sqlite.store.workflow_crud import SqliteWorkflowCrudMixin
from stabilize.persistence.store import WorkflowStore

if TYPE_CHECKING:
    from stabilize.persistence.store import StoreTransaction
    from stabilize.queue import Queue

logger = logging.getLogger(__name__)


[docs] class SqliteWorkflowStore( SqliteWorkflowCrudMixin, SqliteStageOpsMixin, SqliteQueriesMixin, WorkflowStore, ): """ SQLite implementation of WorkflowStore. Uses native sqlite3 for file-based or in-memory storage. Suitable for development, testing, and single-node deployments. Features: - WAL mode for better concurrent read performance - Foreign key support enabled - JSON stored as TEXT strings - Arrays stored as JSON strings - Thread-local connections managed by singleton ConnectionManager """ def __init__( self, connection_string: str, create_tables: bool = False, ) -> None: """ Initialize the repository. Args: connection_string: SQLite connection string (e.g., sqlite:///./db.sqlite) create_tables: Whether to create tables if they don't exist """ from stabilize.persistence.connection import get_connection_manager self.connection_string = connection_string self._manager = get_connection_manager() # Verify connection works conn = self._get_connection() conn.execute("SELECT 1") if create_tables: self._create_tables() def _get_connection(self) -> sqlite3.Connection: """ Get thread-local connection from ConnectionManager. Returns a connection configured with: - Row factory for dict-like access - Foreign keys enabled - WAL journal mode for concurrency - 30 second busy timeout """ return self._manager.get_sqlite_connection(self.connection_string)
[docs] def close(self) -> None: """Close SQLite connection for current thread.""" self._manager.close_sqlite_connection(self.connection_string)
def _create_tables(self) -> None: """Create database tables if they don't exist.""" create_tables(self._get_connection())
[docs] def is_healthy(self) -> bool: """Check if the database connection is healthy.""" try: conn = self._get_connection() conn.execute("SELECT 1") return True except Exception: return False
[docs] @contextmanager def transaction(self, queue: Queue | None = None) -> Iterator[StoreTransaction]: """Create atomic transaction for store + queue operations. Use this when you need to atomically update both stage state AND queue a message. This prevents orphaned workflows from crashes between separate store and queue operations. SQLite implementation writes directly to the queue_messages table in the same transaction, so the queue parameter is ignored. Args: queue: Ignored (for API compatibility with base class) Usage: with store.transaction() as txn: txn.store_stage(stage) txn.push_message(message) # Auto-commits on success, rolls back on exception Yields: AtomicTransaction with store_stage() and push_message() methods """ from stabilize.persistence.sqlite.transaction import AtomicTransaction conn = self._get_connection() txn = AtomicTransaction(conn, self) try: yield txn conn.commit() except Exception: conn.rollback() # Restore in-memory versions to match rolled-back database state txn.rollback_versions() raise
# ========== Message Deduplication ==========
[docs] def is_message_processed(self, message_id: str) -> bool: """Check if a message has already been processed.""" return _is_message_processed(self._get_connection(), message_id)
[docs] def mark_message_processed( self, message_id: str, handler_type: str | None = None, execution_id: str | None = None, ) -> None: """Mark a message as successfully processed.""" _mark_message_processed(self._get_connection(), message_id, handler_type, execution_id)
[docs] def cleanup_old_processed_messages(self, max_age_hours: float = 24.0) -> int: """Clean up old processed message records.""" return _cleanup_old_processed_messages(self._get_connection(), max_age_hours)