Event Sourcing
Stabilize includes a built-in event sourcing system that records every state transition as an immutable event. This enables full audit trails, time-travel debugging, and analytics projections.
Overview
When enabled, all handlers automatically record events as they process messages. Events are appended to an event store and published to an in-process event bus for real-time subscriptions.
Key capabilities:
Audit trail: Every workflow, stage, and task transition is recorded.
Event replay: Reconstruct workflow state at any point in time.
Projections: Build metrics, timelines, and custom views from events.
Subscriptions: React to events in real-time (logging, webhooks, etc.).
Snapshots: Speed up replay for long-running workflows.
Quick Setup
Enable event sourcing with a single call:
from stabilize.events import configure_event_sourcing, SqliteEventStore
event_store = SqliteEventStore("sqlite:///events.db", create_tables=True)
configure_event_sourcing(event_store)
# That's it — all handlers now record events automatically.
The configure_event_sourcing function sets up a global event recorder and
event bus. Handlers detect the recorder via a global fallback, so no changes
to existing workflow code are needed.
Event Types
Events are organized by entity lifecycle:
Workflow events:
Event Type |
Description |
|---|---|
|
Workflow was created |
|
Workflow execution started |
|
Workflow finished successfully |
|
Workflow failed |
|
Workflow was canceled |
|
Workflow was paused |
|
Workflow was resumed |
Stage events:
Event Type |
Description |
|---|---|
|
Stage execution started |
|
Stage finished successfully |
|
Stage failed |
|
Stage was skipped |
|
Stage was canceled |
Task events:
Event Type |
Description |
|---|---|
|
Task execution started |
|
Task finished successfully |
|
Task failed |
|
Task is being retried |
State change events:
Event Type |
Description |
|---|---|
|
Generic status change |
|
Stage context was updated |
|
Stage outputs were updated |
|
Dynamic jump was executed |
Subscribing to Events
Use the event bus to receive events in real-time:
from stabilize.events import get_event_bus, EventType
bus = get_event_bus()
# Subscribe to all events
bus.subscribe("logger", lambda e: print(f"{e.event_type.value}: {e.entity_id}"))
# Subscribe to specific event types
bus.subscribe(
"failure-alert",
lambda e: send_alert(e),
event_types={EventType.WORKFLOW_FAILED, EventType.TASK_FAILED},
)
# Filter by workflow
bus.subscribe(
"workflow-monitor",
lambda e: track(e),
workflow_filter="my-workflow-id",
)
Projections
Projections build read-only views from events. Stabilize includes two built-in projections:
WorkflowTimelineProjection — builds a human-readable execution timeline:
from stabilize.events import WorkflowTimelineProjection
timeline_proj = WorkflowTimelineProjection(workflow.id)
# Apply events (from store or via bus subscription)
for event in event_store.get_events_for_workflow(workflow.id):
timeline_proj.apply(event)
timeline = timeline_proj.get_state()
print(f"Duration: {timeline.total_duration_ms}ms")
print(f"Status: {timeline.status}")
for entry in timeline_proj.get_stages():
print(f" {entry.event_type}: {entry.entity_name} ({entry.duration_ms}ms)")
StageMetricsProjection — aggregates execution metrics:
from stabilize.events import StageMetricsProjection
metrics = StageMetricsProjection()
# Subscribe to the bus for real-time metrics
bus.subscribe("metrics", metrics.apply)
# After workflows run, query metrics
for stage_type, m in metrics.get_state().items():
print(f"{stage_type}: {m.execution_count} runs, {m.success_rate:.0f}% success")
Event Replay
The EventReplayer reconstructs workflow state from events:
from stabilize.events import EventReplayer
replayer = EventReplayer(event_store)
# Rebuild current state
state = replayer.rebuild_workflow_state(workflow.id)
print(state["status"])
print(state["stages"])
# Time-travel: state at a specific sequence number
partial = replayer.rebuild_workflow_state(workflow.id, as_of_sequence=50)
# Time-travel: state at a specific point in time
from datetime import datetime, UTC
historical = replayer.time_travel_query(workflow.id, as_of_time=some_datetime)
Event Stores
Three event store backends are available:
SQLite (development, testing, and single-node production):
from stabilize.events import SqliteEventStore
store = SqliteEventStore("sqlite:///events.db", create_tables=True)
PostgreSQL (production, requires stabilize[postgres]):
from stabilize.events import PostgresEventStore
store = PostgresEventStore("postgresql://user:pass@host/db")
Snapshots
For long-running workflows with many events, snapshots speed up replay by providing periodic checkpoints:
from stabilize.events import SnapshotPolicy, SnapshotStore
# Snapshot every 100 events
policy = SnapshotPolicy(every_n_events=100)
# The replayer uses snapshots automatically when available
replayer = EventReplayer(event_store, snapshot_store=snapshot_store)
state = replayer.rebuild_workflow_state(workflow.id) # Starts from latest snapshot