Source code for stabilize.events.store.sqlite.store

"""
SQLite event store implementation.

Provides durable, append-only event storage using SQLite.
Events are stored with auto-incrementing sequence numbers
for global ordering.
"""

from __future__ import annotations

import sqlite3
import threading

from stabilize.events.store.interface import EventStore
from stabilize.events.store.sqlite.events import SqliteEventStoreMixin
from stabilize.events.store.sqlite.schema import (
    EVENTS_SCHEMA,
    SNAPSHOTS_SCHEMA,
    SUBSCRIPTIONS_SCHEMA,
)
from stabilize.events.store.sqlite.snapshots import SqliteSnapshotsMixin
from stabilize.events.store.sqlite.subscriptions import SqliteSubscriptionsMixin
from stabilize.persistence.connection import get_connection_manager


[docs] class SqliteEventStore( SqliteEventStoreMixin, SqliteSnapshotsMixin, SqliteSubscriptionsMixin, EventStore, ): """ SQLite implementation of event store. Uses the shared ConnectionManager for thread-local connections. Events are stored in an append-only table with auto-incrementing sequence numbers. Thread-safety: - Uses thread-local connections from ConnectionManager - Sequence assignment is atomic via SQLite's AUTOINCREMENT """ def __init__( self, connection_string: str, create_tables: bool = True, ) -> None: """ Initialize SQLite event store. Args: connection_string: SQLite connection string (e.g., "sqlite:///./app.db") create_tables: Whether to create tables if they don't exist. """ self._connection_string = connection_string self._lock = threading.Lock() if create_tables: self._create_tables() def _get_connection(self) -> sqlite3.Connection: """Get thread-local connection.""" return get_connection_manager().get_sqlite_connection(self._connection_string) def _create_tables(self) -> None: """Create event tables if they don't exist.""" conn = self._get_connection() conn.executescript(EVENTS_SCHEMA) conn.executescript(SNAPSHOTS_SCHEMA) conn.executescript(SUBSCRIPTIONS_SCHEMA) conn.commit()