Source code for stabilize.events.store.interface

"""
Event store interface.

Defines the abstract interface for event storage backends.
All implementations must support atomic append operations
and efficient querying by various criteria.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from stabilize.events.base import EntityType, Event, EventType


[docs] @dataclass class EventQuery: """ Query parameters for retrieving events. All fields are optional - unset fields are not filtered. Results are ordered by sequence number ascending. """ entity_type: EntityType | None = None entity_id: str | None = None workflow_id: str | None = None event_types: list[EventType] | None = None from_sequence: int | None = None to_sequence: int | None = None from_timestamp: datetime | None = None to_timestamp: datetime | None = None limit: int = 1000 offset: int = 0 # Ordering order_by: str = "sequence" # sequence or timestamp ascending: bool = True
[docs] @dataclass class AppendResult: """Result of appending event(s) to the store.""" events: list[Event] = field(default_factory=list) sequences: list[int] = field(default_factory=list) @property def first_sequence(self) -> int: """Get the first sequence number assigned.""" return self.sequences[0] if self.sequences else 0 @property def last_sequence(self) -> int: """Get the last sequence number assigned.""" return self.sequences[-1] if self.sequences else 0
[docs] class EventStore(ABC): """ Abstract event store interface. Event stores are append-only - events cannot be modified or deleted. Each event is assigned a monotonically increasing sequence number that provides global ordering. Implementations must ensure: - Atomic appends (single event or batch) - Unique sequence numbers - Durable storage - Efficient querying by entity, workflow, and time """
[docs] @abstractmethod def append(self, event: Event, connection: Any | None = None) -> Event: """ Append a single event to the store. Args: event: The event to append. Sequence will be assigned. connection: Optional database connection for transaction. Returns: The event with sequence number assigned. """ pass
[docs] @abstractmethod def append_batch(self, events: list[Event], connection: Any | None = None) -> list[Event]: """ Append multiple events atomically. All events are appended in a single transaction. Sequence numbers are assigned in order. Args: events: Events to append. connection: Optional database connection for transaction. Returns: Events with sequence numbers assigned. """ pass
[docs] @abstractmethod def get_events(self, query: EventQuery) -> Iterator[Event]: """ Query events matching criteria. Args: query: Query parameters. Returns: Iterator of matching events ordered by sequence. """ pass
[docs] @abstractmethod def get_events_for_entity( self, entity_type: EntityType, entity_id: str, from_version: int = 0, ) -> list[Event]: """ Get all events for a specific entity. Args: entity_type: Type of entity. entity_id: Entity identifier. from_version: Only get events after this version. Returns: List of events for the entity. """ pass
[docs] @abstractmethod def get_events_for_workflow( self, workflow_id: str, from_sequence: int = 0, ) -> list[Event]: """ Get all events for a workflow execution. Args: workflow_id: Workflow identifier. from_sequence: Only get events after this sequence. Returns: List of events for the workflow. """ pass
[docs] @abstractmethod def get_current_sequence(self) -> int: """ Get the current (latest) global sequence number. Returns: The highest sequence number in the store, or 0 if empty. """ pass
[docs] @abstractmethod def get_events_since( self, sequence: int, limit: int = 1000, ) -> list[Event]: """ Get events since a sequence number. Used for catch-up subscriptions. Args: sequence: Get events after this sequence. limit: Maximum number of events to return. Returns: List of events after the sequence. """ pass
[docs] def get_event_by_id(self, event_id: str) -> Event | None: """ Get a single event by its ID. Args: event_id: The event identifier. Returns: The event if found, None otherwise. """ # Default implementation using query for event in self.get_events(EventQuery(limit=1)): # This is inefficient - implementations should override pass return None
[docs] def count_events(self, query: EventQuery | None = None) -> int: """ Count events matching query. Args: query: Optional query parameters. Returns: Number of matching events. """ # Default implementation - implementations should override count = 0 for _ in self.get_events(query or EventQuery(limit=100000)): count += 1 return count