Events API
Configuration
- stabilize.events.configure_event_sourcing(event_store, publish_to_bus=True, bus_max_workers=4)[source]
One-line setup for event sourcing.
This configures both the event bus and event recorder, setting up the global instances for use throughout the application.
- Parameters:
event_store (EventStore) – The event store for persisting events.
publish_to_bus (bool) – Whether to publish events to the bus.
bus_max_workers (int) – Maximum workers for async bus handlers.
- Returns:
The configured EventRecorder.
- Return type:
Example
from stabilize.events import configure_event_sourcing, SqliteEventStore
event_store = SqliteEventStore(“sqlite:///app.db”, create_tables=True) recorder = configure_event_sourcing(event_store)
- stabilize.events.configure_event_bus(max_workers=4, error_handler=None)[source]
Configure the global event bus.
Must be called before first use of get_event_bus().
- stabilize.events.configure_event_recorder(event_store, publish_to_bus=True)[source]
Configure the global event recorder.
- Parameters:
event_store (EventStore) – Store for persisting events.
publish_to_bus (bool) – Whether to publish events to the bus.
- Returns:
The configured event recorder.
- Return type:
Base Types
- class stabilize.events.Event(event_id=<factory>, event_type=EventType.STATUS_CHANGED, timestamp=<factory>, sequence=0, entity_type=EntityType.WORKFLOW, entity_id='', workflow_id='', version=0, data=<factory>, metadata=<factory>, schema_version=1)[source]
Immutable event record.
Events are the source of truth for all state changes in the system. They are append-only and can be replayed to reconstruct state.
- Parameters:
event_id (str)
event_type (EventType)
timestamp (datetime)
sequence (int)
entity_type (EntityType)
entity_id (str)
workflow_id (str)
version (int)
data (dict[str, Any])
metadata (EventMetadata)
schema_version (int)
- event_id
Unique identifier (ULID for time-ordering).
- Type:
str
- event_type
Type of event from EventType enum.
- timestamp
When the event occurred (UTC).
- Type:
datetime.datetime
- sequence
Global ordering number (assigned by EventStore on append).
- Type:
int
- entity_type
Type of entity this event is about.
- entity_id
ID of the entity.
- Type:
str
- workflow_id
Workflow ID for correlation (always set).
- Type:
str
- version
Entity version after this event (for optimistic concurrency).
- Type:
int
- data
Event-specific payload.
- Type:
dict[str, Any]
- metadata
Tracing and debugging metadata.
- class stabilize.events.EventType(value)[source]
All event types in the system.
Events are organized by entity lifecycle: - workflow.* - Workflow-level events - stage.* - Stage-level events - task.* - Task-level events - status.* - Generic status changes - context.* - Context/output updates
- WORKFLOW_CREATED = 'workflow.created'
- WORKFLOW_STARTED = 'workflow.started'
- WORKFLOW_COMPLETED = 'workflow.completed'
- WORKFLOW_FAILED = 'workflow.failed'
- WORKFLOW_CANCELED = 'workflow.canceled'
- WORKFLOW_PAUSED = 'workflow.paused'
- WORKFLOW_RESUMED = 'workflow.resumed'
- STAGE_STARTED = 'stage.started'
- STAGE_COMPLETED = 'stage.completed'
- STAGE_FAILED = 'stage.failed'
- STAGE_SKIPPED = 'stage.skipped'
- STAGE_CANCELED = 'stage.canceled'
- TASK_STARTED = 'task.started'
- TASK_COMPLETED = 'task.completed'
- TASK_FAILED = 'task.failed'
- TASK_RETRIED = 'task.retried'
- STATUS_CHANGED = 'status.changed'
- CONTEXT_UPDATED = 'context.updated'
- OUTPUTS_UPDATED = 'outputs.updated'
- JUMP_EXECUTED = 'jump.executed'
- class stabilize.events.EntityType(value)[source]
Entity types that can emit events.
- WORKFLOW = 'workflow'
- STAGE = 'stage'
- TASK = 'task'
- class stabilize.events.EventMetadata(correlation_id, causation_id=None, actor='system', source_handler=None)[source]
Metadata attached to every event for tracing and debugging.
- Parameters:
correlation_id (str)
causation_id (str | None)
actor (str)
source_handler (str | None)
- correlation_id
Links related events across a workflow execution. Typically the workflow execution ID.
- Type:
str
- causation_id
ID of the event that caused this event (for event chains).
- Type:
str | None
- actor
Who or what triggered this event (user ID or “system”).
- Type:
str
- source_handler
The handler class that emitted this event.
- Type:
str | None
Event Bus
- class stabilize.events.EventBus(max_workers=4, error_handler=None)[source]
Thread-safe in-process event bus.
Supports: - Sync and async subscription modes - Filtering by event type, entity type, and workflow - Error isolation (one handler failure doesn’t affect others) - Statistics tracking
- Parameters:
max_workers (int)
error_handler (Callable[[str, Event, Exception], None] | None)
- subscribe(subscription_id, handler, event_types=None, entity_types=None, workflow_filter=None, mode=SubscriptionMode.SYNC)[source]
Subscribe to events.
- Parameters:
subscription_id (str) – Unique identifier for this subscription.
handler (Callable[[Event], Any]) – Callable that receives events.
event_types (set[EventType] | None) – Only receive these event types (None = all).
entity_types (set[EntityType] | None) – Only receive events for these entity types (None = all).
workflow_filter (str | None) – Only receive events for this workflow ID.
mode (SubscriptionMode) – SYNC (blocking) or ASYNC (fire-and-forget).
- Return type:
None
- unsubscribe(subscription_id)[source]
Unsubscribe from events.
- Parameters:
subscription_id (str) – The subscription to remove.
- Returns:
True if subscription was found and removed.
- Return type:
bool
- enable_subscription(subscription_id)[source]
Enable a subscription.
- Parameters:
subscription_id (str)
- Return type:
None
- disable_subscription(subscription_id)[source]
Disable a subscription without removing it.
- Parameters:
subscription_id (str)
- Return type:
None
- publish(event)[source]
Publish an event to all matching subscribers.
Sync handlers are called in the current thread. Async handlers are submitted to the thread pool.
- Parameters:
event (Event) – The event to publish.
- Return type:
None
- publish_batch(events)[source]
Publish multiple events.
- Parameters:
events (list[Event])
- Return type:
None
- property stats: EventBusStats
Get current statistics.
- class stabilize.events.Subscription(id, handler, event_types=None, entity_types=None, workflow_filter=None, mode=SubscriptionMode.SYNC, enabled=True)[source]
A subscription to events.
Subscribers can filter by: - event_types: Only specific event types - entity_types: Only specific entity types - workflow_id: Only events for a specific workflow
- Parameters:
id (str)
handler (Callable[[Event], Any])
event_types (set[EventType] | None)
entity_types (set[EntityType] | None)
workflow_filter (str | None)
mode (SubscriptionMode)
enabled (bool)
- class stabilize.events.SubscriptionMode(value)[source]
How a subscription processes events.
- SYNC = 'sync'
- ASYNC = 'async'
Event Recorder
- class stabilize.events.EventRecorder(event_store, publish_to_bus=True)[source]
Records events for all state transitions.
Integrates with EventStore for persistence and EventBus for pub/sub notifications. All record methods return the created event with its assigned sequence number.
- Parameters:
event_store (EventStore)
publish_to_bus (bool)
- stabilize.events.get_event_recorder()[source]
Get the global event recorder instance, if configured.
- Return type:
EventRecorder | None
- stabilize.events.set_event_context(correlation_id, causation_id=None, actor='system')[source]
Set the current event context for correlation tracking.
- Parameters:
correlation_id (str) – Links related events (typically workflow ID).
causation_id (str | None) – ID of the event that caused this action.
actor (str) – Who triggered this action.
- Return type:
None
Event Stores
- class stabilize.events.EventStore[source]
Abstract event store interface.
Event stores are append-only - events cannot be modified or deleted. Each event is assigned a monotonically increasing sequence number that provides global ordering.
Implementations must ensure: - Atomic appends (single event or batch) - Unique sequence numbers - Durable storage - Efficient querying by entity, workflow, and time
- abstractmethod append_batch(events, connection=None)[source]
Append multiple events atomically.
All events are appended in a single transaction. Sequence numbers are assigned in order.
- abstractmethod get_events(query)[source]
Query events matching criteria.
- Parameters:
query (EventQuery) – Query parameters.
- Returns:
Iterator of matching events ordered by sequence.
- Return type:
Iterator[Event]
- abstractmethod get_events_for_entity(entity_type, entity_id, from_version=0)[source]
Get all events for a specific entity.
- Parameters:
entity_type (EntityType) – Type of entity.
entity_id (str) – Entity identifier.
from_version (int) – Only get events after this version.
- Returns:
List of events for the entity.
- Return type:
list[Event]
- abstractmethod get_events_for_workflow(workflow_id, from_sequence=0)[source]
Get all events for a workflow execution.
- Parameters:
workflow_id (str) – Workflow identifier.
from_sequence (int) – Only get events after this sequence.
- Returns:
List of events for the workflow.
- Return type:
list[Event]
- abstractmethod get_current_sequence()[source]
Get the current (latest) global sequence number.
- Returns:
The highest sequence number in the store, or 0 if empty.
- Return type:
int
- abstractmethod get_events_since(sequence, limit=1000)[source]
Get events since a sequence number.
Used for catch-up subscriptions.
- Parameters:
sequence (int) – Get events after this sequence.
limit (int) – Maximum number of events to return.
- Returns:
List of events after the sequence.
- Return type:
list[Event]
- get_event_by_id(event_id)[source]
Get a single event by its ID.
- Parameters:
event_id (str) – The event identifier.
- Returns:
The event if found, None otherwise.
- Return type:
Event | None
- count_events(query=None)[source]
Count events matching query.
- Parameters:
query (EventQuery | None) – Optional query parameters.
- Returns:
Number of matching events.
- Return type:
int
- class stabilize.events.EventQuery(entity_type=None, entity_id=None, workflow_id=None, event_types=None, from_sequence=None, to_sequence=None, from_timestamp=None, to_timestamp=None, limit=1000, offset=0, order_by='sequence', ascending=True)[source]
Query parameters for retrieving events.
All fields are optional - unset fields are not filtered. Results are ordered by sequence number ascending.
- Parameters:
entity_type (EntityType | None)
entity_id (str | None)
workflow_id (str | None)
event_types (list[EventType] | None)
from_sequence (int | None)
to_sequence (int | None)
from_timestamp (datetime | None)
to_timestamp (datetime | None)
limit (int)
offset (int)
order_by (str)
ascending (bool)
- class stabilize.events.AppendResult(events=<factory>, sequences=<factory>)[source]
Result of appending event(s) to the store.
- Parameters:
events (list[Event])
sequences (list[int])
- property first_sequence: int
Get the first sequence number assigned.
- property last_sequence: int
Get the last sequence number assigned.
- class stabilize.events.SqliteEventStore(connection_string, create_tables=True)[source]
SQLite implementation of event store.
Uses the shared ConnectionManager for thread-local connections. Events are stored in an append-only table with auto-incrementing sequence numbers.
Thread-safety: - Uses thread-local connections from ConnectionManager - Sequence assignment is atomic via SQLite’s AUTOINCREMENT
- Parameters:
connection_string (str)
create_tables (bool)
Projections
- class stabilize.events.Projection[source]
Abstract base class for projections.
Projections apply events to build a specialized view of the data. They are typically used to: - Build human-readable timelines - Aggregate metrics - Create search indexes - Derive state for specific queries
Projections are stateful and should be reset before replaying events from the beginning.
- abstract property name: str
Unique name for this projection.
- abstractmethod apply(event)[source]
Apply an event to update the projection state.
This method should be idempotent - applying the same event multiple times should have the same effect as applying it once.
- Parameters:
event (Event) – The event to apply.
- Return type:
None
- abstractmethod get_state()[source]
Get the current projection state.
- Returns:
The current state, type depends on the projection.
- Return type:
Any
- class stabilize.events.StageMetricsProjection[source]
Aggregates stage execution metrics for analytics.
Tracks: - Execution counts per stage type - Success/failure rates - Duration statistics (avg, median, p95)
- property name: str
Unique name for this projection.
- apply(event)[source]
Apply an event to update the projection state.
This method should be idempotent - applying the same event multiple times should have the same effect as applying it once.
- Parameters:
event (Event) – The event to apply.
- Return type:
None
- get_state()[source]
Get all stage metrics.
- Return type:
dict[str, StageMetrics]
- get_metrics_for_type(stage_type)[source]
Get metrics for a specific stage type.
- Parameters:
stage_type (str)
- Return type:
StageMetrics | None
- get_task_metrics_for_name(task_name)[source]
Get metrics for a specific task name.
- Parameters:
task_name (str)
- Return type:
TaskMetrics | None
- get_top_slowest_stages(n=10)[source]
Get the n slowest stage types by average duration.
- Parameters:
n (int)
- Return type:
list[tuple[str, float]]
- class stabilize.events.StageMetrics(stage_type, execution_count=0, success_count=0, failure_count=0, skip_count=0, durations=<factory>)[source]
Aggregated metrics for a stage type.
- Parameters:
stage_type (str)
execution_count (int)
success_count (int)
failure_count (int)
skip_count (int)
durations (list[int])
- property success_rate: float
Calculate success rate as a percentage.
- property failure_rate: float
Calculate failure rate as a percentage.
- property avg_duration_ms: float
Calculate average duration in milliseconds.
- property median_duration_ms: float
Calculate median duration in milliseconds.
- property p95_duration_ms: float
Calculate 95th percentile duration in milliseconds.
- property min_duration_ms: int
Get minimum duration in milliseconds.
- property max_duration_ms: int
Get maximum duration in milliseconds.
- class stabilize.events.WorkflowTimelineProjection(workflow_id)[source]
Builds a human-readable timeline from workflow events.
The timeline shows: - Workflow start/end with status - Stage start/end with duration - Task start/end with duration - Any failures or skips
- Parameters:
workflow_id (str)
- property name: str
Unique name for this projection.
- get_stages()[source]
Get only stage-related entries.
- Return type:
list[TimelineEntry]
- get_tasks()[source]
Get only task-related entries.
- Return type:
list[TimelineEntry]
- get_failures()[source]
Get only failure events.
- Return type:
list[TimelineEntry]
- class stabilize.events.WorkflowTimeline(workflow_id, entries=<factory>, total_duration_ms=None, status=None, start_time=None, end_time=None)[source]
Complete timeline for a workflow execution.
- Parameters:
workflow_id (str)
entries (list[TimelineEntry])
total_duration_ms (int | None)
status (str | None)
start_time (datetime | None)
end_time (datetime | None)
- class stabilize.events.TimelineEntry(timestamp, event_type, entity_type, entity_id, entity_name=None, status=None, duration_ms=None, details=<factory>)[source]
A single entry in the workflow timeline.
- Parameters:
timestamp (datetime)
event_type (str)
entity_type (str)
entity_id (str)
entity_name (str | None)
status (str | None)
duration_ms (int | None)
details (dict[str, Any])
Replay
- class stabilize.events.EventReplayer(event_store, snapshot_store=None)[source]
Replays events to reconstruct state.
Supports: - Full state reconstruction from events - Time-travel queries (state at a specific point) - Incremental replay from snapshots - Custom event handlers for testing/debugging
- Parameters:
event_store (EventStore)
snapshot_store (SnapshotStore | None)
- rebuild_workflow_state(workflow_id, as_of_sequence=None)[source]
Rebuild complete workflow state from events.
- Parameters:
workflow_id (str) – The workflow to rebuild.
as_of_sequence (int | None) – Rebuild state as of this sequence (for time travel).
- Returns:
Dictionary containing the complete workflow state.
- Return type:
dict[str, Any]
- time_travel_query(workflow_id, as_of_time)[source]
Get workflow state at a specific point in time.
- Parameters:
workflow_id (str) – The workflow to query.
as_of_time (datetime) – Get state as of this timestamp.
- Returns:
Dictionary containing the workflow state at that time.
- Return type:
dict[str, Any]
- replay_workflow_from_checkpoint(workflow_id, checkpoint_sequence, event_handler)[source]
Replay events through a custom handler.
Useful for: - Testing event handlers - Debugging workflow execution - Building custom projections
- Parameters:
workflow_id (str) – The workflow to replay.
checkpoint_sequence (int) – Start replaying from this sequence.
event_handler (Callable[[Event], None]) – Handler to call for each event.
- Return type:
None
- get_entity_history(entity_type, entity_id)[source]
Get the complete event history for an entity.
- Parameters:
entity_type (EntityType) – Type of entity.
entity_id (str) – Entity identifier.
- Returns:
List of events for the entity.
- Return type:
list[dict[str, Any]]
- compare_states(workflow_id, sequence1, sequence2)[source]
Compare workflow state at two different points.
Useful for debugging what changed between two points in time.
- Parameters:
workflow_id (str) – The workflow to compare.
sequence1 (int) – First sequence number.
sequence2 (int) – Second sequence number.
- Returns:
Dictionary showing differences between the two states.
- Return type:
dict[str, Any]
- class stabilize.events.ReplayResult(entity_type, entity_id, final_state=<factory>, events_replayed=0, from_snapshot=False, snapshot_sequence=0)[source]
Result of replaying events for an entity.
- Parameters:
entity_type (EntityType)
entity_id (str)
final_state (dict[str, Any])
events_replayed (int)
from_snapshot (bool)
snapshot_sequence (int)
- class stabilize.events.WorkflowState(workflow_id, status=None, application=None, name=None, start_time=None, end_time=None, context=<factory>, stages=<factory>, tasks=<factory>)[source]
Reconstructed workflow state from events.
- Parameters:
workflow_id (str)
status (str | None)
application (str | None)
name (str | None)
start_time (datetime | None)
end_time (datetime | None)
context (dict[str, Any])
stages (dict[str, dict[str, Any]])
tasks (dict[str, dict[str, Any]])
Snapshots
- class stabilize.events.Snapshot(entity_type, entity_id, workflow_id, version, sequence, state=<factory>, created_at=<factory>)[source]
A snapshot of entity state at a point in time.
Snapshots capture: - Entity identity (type, id, workflow) - Version at snapshot time - Last event sequence included - Serialized state
- Parameters:
entity_type (EntityType)
entity_id (str)
workflow_id (str)
version (int)
sequence (int)
state (dict[str, Any])
created_at (datetime)
- class stabilize.events.SnapshotStore(event_store, policy=None)[source]
Store for managing entity snapshots.
Uses the event store’s snapshot tables for persistence.
- Parameters:
event_store (EventStore)
policy (SnapshotPolicy | None)
- save_snapshot(snapshot)[source]
Save a snapshot.
- Parameters:
snapshot (Snapshot) – The snapshot to save.
- Return type:
None
- get_latest_snapshot(entity_type, entity_id)[source]
Get the latest snapshot for an entity.
- Parameters:
entity_type (EntityType) – Type of entity.
entity_id (str) – Entity identifier.
- Returns:
The latest snapshot or None if not found.
- Return type:
Snapshot | None
- create_workflow_snapshot(workflow_state, workflow_id, version, sequence)[source]
Create a snapshot for a workflow.
- Parameters:
workflow_state (dict[str, Any]) – The workflow state to snapshot.
workflow_id (str) – Workflow identifier.
version (int) – Current version.
sequence (int) – Last event sequence.
- Returns:
The created snapshot.
- Return type:
- class stabilize.events.SnapshotPolicy(event_interval=100, time_interval_seconds=3600)[source]
Determines when to create snapshots.
Snapshots are created based on: - Number of events since last snapshot - Time since last snapshot - Entity importance (workflows more than tasks)
- Parameters:
event_interval (int)
time_interval_seconds (int)
- should_snapshot(entity_id, events_since)[source]
Check if an entity should be snapshotted.
- Parameters:
entity_id (str) – The entity identifier.
events_since (int) – Number of events since last snapshot.
- Returns:
True if a snapshot should be created.
- Return type:
bool
Subscriptions
- class stabilize.events.DurableSubscription(id, handler, event_types=None, entity_filter=None, last_sequence=0, webhook_url=None, enabled=True, error_count=0, max_errors=10)[source]
A durable subscription that survives restarts.
Position is persisted to the database, allowing the subscription to resume from where it left off.
- Parameters:
- class stabilize.events.SubscriptionManager(event_store, poll_interval=1.0, batch_size=100)[source]
Manages durable subscriptions that survive restarts.
Features: - Persisted position (survives restarts) - Catch-up from last position - At-least-once delivery - Automatic error handling with backoff - Optional webhook integration
- Parameters:
event_store (EventStore)
poll_interval (float)
batch_size (int)
- create_subscription(subscription_id, handler, event_types=None, entity_filter=None, webhook_url=None, start_from='latest')[source]
Create a durable subscription.
- Parameters:
subscription_id (str) – Unique identifier for this subscription.
handler (Callable[[Event], None]) – Function to call with each event.
event_types (list[EventType] | None) – Only receive these event types (None = all).
entity_filter (dict[str, Any] | None) – Filter by entity properties.
webhook_url (str | None) – Optional webhook URL to call.
start_from (str) – “latest” (current position), “beginning” (sequence 0), or a sequence number string.
- Return type:
None
- delete_subscription(subscription_id)[source]
Delete a subscription.
- Parameters:
subscription_id (str) – The subscription to delete.
- Returns:
True if subscription was found and deleted.
- Return type:
bool
- load_subscription(subscription_id, handler)[source]
Load a persisted subscription.
- Parameters:
subscription_id (str) – The subscription to load.
handler (Callable[[Event], None]) – Handler function for events.
- Returns:
True if subscription was found and loaded.
- Return type:
bool
- process_once()[source]
Process pending events once (for testing).
- Returns:
Number of events processed.
- Return type:
int