Source code for stabilize.events.replay

"""
Event replay for state reconstruction.

The EventReplayer allows rebuilding workflow state at any point
in time by replaying events from the event store.
"""

from __future__ import annotations

import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any

from stabilize.events.base import EntityType, Event, EventType

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from stabilize.events.snapshots import SnapshotStore
    from stabilize.events.store.interface import EventStore


[docs] @dataclass class ReplayResult: """Result of replaying events for an entity.""" entity_type: EntityType entity_id: str final_state: dict[str, Any] = field(default_factory=dict) events_replayed: int = 0 from_snapshot: bool = False snapshot_sequence: int = 0
[docs] @dataclass class WorkflowState: """Reconstructed workflow state from events.""" workflow_id: str status: str | None = None application: str | None = None name: str | None = None start_time: datetime | None = None end_time: datetime | None = None context: dict[str, Any] = field(default_factory=dict) stages: dict[str, dict[str, Any]] = field(default_factory=dict) # stage_id -> state tasks: dict[str, dict[str, Any]] = field(default_factory=dict) # task_id -> state
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary.""" return { "workflow_id": self.workflow_id, "status": self.status, "application": self.application, "name": self.name, "start_time": self.start_time.isoformat() if self.start_time else None, "end_time": self.end_time.isoformat() if self.end_time else None, "context": self.context, "stages": self.stages, "tasks": self.tasks, }
[docs] class EventReplayer: """ Replays events to reconstruct state. Supports: - Full state reconstruction from events - Time-travel queries (state at a specific point) - Incremental replay from snapshots - Custom event handlers for testing/debugging """ def __init__( self, event_store: EventStore, snapshot_store: SnapshotStore | None = None, ) -> None: """ Initialize the event replayer. Args: event_store: The event store to replay from. snapshot_store: Optional snapshot store for faster replay. """ self._event_store = event_store self._snapshot_store = snapshot_store
[docs] def rebuild_workflow_state( self, workflow_id: str, as_of_sequence: int | None = None, ) -> dict[str, Any]: """ Rebuild complete workflow state from events. Args: workflow_id: The workflow to rebuild. as_of_sequence: Rebuild state as of this sequence (for time travel). Returns: Dictionary containing the complete workflow state. """ state = WorkflowState(workflow_id=workflow_id) start_sequence = 0 # Try to start from snapshot if self._snapshot_store: snapshot = self._snapshot_store.get_latest_snapshot(EntityType.WORKFLOW, workflow_id) if snapshot and (as_of_sequence is None or snapshot.sequence <= as_of_sequence): # Load state from snapshot state = self._load_state_from_snapshot(snapshot) start_sequence = snapshot.sequence # Replay events since snapshot if as_of_sequence is not None: # Get events up to the specified sequence events = [ e for e in self._event_store.get_events_for_workflow(workflow_id, start_sequence) if e.sequence <= as_of_sequence ] else: events = self._event_store.get_events_for_workflow(workflow_id, start_sequence) # Check for sequence gaps last_seq = 0 for event in events: if last_seq > 0 and event.sequence > last_seq + 1: logger.warning( "Sequence gap detected in workflow %s: %d -> %d (missing %d events)", workflow_id, last_seq, event.sequence, event.sequence - last_seq - 1, ) self._apply_event(state, event) last_seq = event.sequence return state.to_dict()
[docs] def time_travel_query( self, workflow_id: str, as_of_time: datetime, ) -> dict[str, Any]: """ Get workflow state at a specific point in time. Args: workflow_id: The workflow to query. as_of_time: Get state as of this timestamp. Returns: Dictionary containing the workflow state at that time. """ from stabilize.events.store.interface import EventQuery state = WorkflowState(workflow_id=workflow_id) # Query events up to the specified time query = EventQuery( workflow_id=workflow_id, to_timestamp=as_of_time, limit=100000, # Get all events up to that time ) for event in self._event_store.get_events(query): self._apply_event(state, event) return state.to_dict()
[docs] def replay_workflow_from_checkpoint( self, workflow_id: str, checkpoint_sequence: int, event_handler: Callable[[Event], None], ) -> None: """ Replay events through a custom handler. Useful for: - Testing event handlers - Debugging workflow execution - Building custom projections Args: workflow_id: The workflow to replay. checkpoint_sequence: Start replaying from this sequence. event_handler: Handler to call for each event. """ events = self._event_store.get_events_for_workflow(workflow_id, checkpoint_sequence) for event in events: event_handler(event)
def _load_state_from_snapshot( self, snapshot: Any, ) -> WorkflowState: """Load WorkflowState from a snapshot.""" state_dict = snapshot.state return WorkflowState( workflow_id=snapshot.entity_id, status=state_dict.get("status"), application=state_dict.get("application"), name=state_dict.get("name"), context=state_dict.get("context", {}), stages=state_dict.get("stages", {}), tasks=state_dict.get("tasks", {}), ) def _apply_event(self, state: WorkflowState, event: Event) -> None: """Apply an event to update the state.""" if event.entity_type == EntityType.WORKFLOW: self._apply_workflow_event(state, event) elif event.entity_type == EntityType.STAGE: self._apply_stage_event(state, event) elif event.entity_type == EntityType.TASK: self._apply_task_event(state, event) def _apply_workflow_event(self, state: WorkflowState, event: Event) -> None: """Apply a workflow event.""" if event.event_type == EventType.WORKFLOW_CREATED: state.application = event.data.get("application") state.name = event.data.get("name") elif event.event_type == EventType.WORKFLOW_STARTED: state.start_time = event.timestamp state.status = "RUNNING" if "context" in event.data: state.context.update(event.data["context"]) elif event.event_type == EventType.WORKFLOW_COMPLETED: state.end_time = event.timestamp state.status = event.data.get("status", "SUCCEEDED") elif event.event_type == EventType.WORKFLOW_FAILED: state.end_time = event.timestamp state.status = event.data.get("status", "TERMINAL") elif event.event_type == EventType.WORKFLOW_CANCELED: state.end_time = event.timestamp state.status = "CANCELED" elif event.event_type == EventType.WORKFLOW_PAUSED: state.status = "PAUSED" elif event.event_type == EventType.WORKFLOW_RESUMED: state.status = "RUNNING" elif event.event_type == EventType.CONTEXT_UPDATED: if "context" in event.data: state.context.update(event.data["context"]) def _apply_stage_event(self, state: WorkflowState, event: Event) -> None: """Apply a stage event.""" stage_id = event.entity_id # Ensure stage entry exists if stage_id not in state.stages: state.stages[stage_id] = { "id": stage_id, "ref_id": event.data.get("ref_id"), "type": event.data.get("type"), "name": event.data.get("name"), } stage = state.stages[stage_id] if event.event_type == EventType.STAGE_STARTED: stage["status"] = "RUNNING" stage["start_time"] = event.timestamp.isoformat() elif event.event_type == EventType.STAGE_COMPLETED: stage["status"] = event.data.get("status", "SUCCEEDED") stage["end_time"] = event.timestamp.isoformat() if "outputs" in event.data: stage["outputs"] = event.data["outputs"] elif event.event_type == EventType.STAGE_FAILED: stage["status"] = event.data.get("status", "TERMINAL") stage["end_time"] = event.timestamp.isoformat() stage["error"] = event.data.get("error") elif event.event_type == EventType.STAGE_SKIPPED: stage["status"] = "SKIPPED" stage["skip_reason"] = event.data.get("reason") elif event.event_type == EventType.STAGE_CANCELED: stage["status"] = "CANCELED" def _apply_task_event(self, state: WorkflowState, event: Event) -> None: """Apply a task event.""" task_id = event.entity_id # Ensure task entry exists if task_id not in state.tasks: state.tasks[task_id] = { "id": task_id, "name": event.data.get("name"), "stage_id": event.data.get("stage_id"), } task = state.tasks[task_id] if event.event_type == EventType.TASK_STARTED: task["status"] = "RUNNING" task["start_time"] = event.timestamp.isoformat() elif event.event_type == EventType.TASK_COMPLETED: task["status"] = event.data.get("status", "SUCCEEDED") task["end_time"] = event.timestamp.isoformat() if "outputs" in event.data: task["outputs"] = event.data["outputs"] elif event.event_type == EventType.TASK_FAILED: task["status"] = event.data.get("status", "TERMINAL") task["end_time"] = event.timestamp.isoformat() task["error"] = event.data.get("error") elif event.event_type == EventType.TASK_RETRIED: task["retry_count"] = event.data.get("retry_count", task.get("retry_count", 0) + 1)
[docs] def get_entity_history( self, entity_type: EntityType, entity_id: str, ) -> list[dict[str, Any]]: """ Get the complete event history for an entity. Args: entity_type: Type of entity. entity_id: Entity identifier. Returns: List of events for the entity. """ events = self._event_store.get_events_for_entity(entity_type, entity_id) return [e.to_dict() for e in events]
[docs] def compare_states( self, workflow_id: str, sequence1: int, sequence2: int, ) -> dict[str, Any]: """ Compare workflow state at two different points. Useful for debugging what changed between two points in time. Args: workflow_id: The workflow to compare. sequence1: First sequence number. sequence2: Second sequence number. Returns: Dictionary showing differences between the two states. """ state1 = self.rebuild_workflow_state(workflow_id, as_of_sequence=sequence1) state2 = self.rebuild_workflow_state(workflow_id, as_of_sequence=sequence2) return { "before": state1, "after": state2, "sequence_range": {"from": sequence1, "to": sequence2}, }