Events API

Configuration

stabilize.events.configure_event_sourcing(event_store, publish_to_bus=True, bus_max_workers=4)[source]

One-line setup for event sourcing.

This configures both the event bus and event recorder, setting up the global instances for use throughout the application.

Parameters:
  • event_store (EventStore) – The event store for persisting events.

  • publish_to_bus (bool) – Whether to publish events to the bus.

  • bus_max_workers (int) – Maximum workers for async bus handlers.

Returns:

The configured EventRecorder.

Return type:

EventRecorder

Example

from stabilize.events import configure_event_sourcing, SqliteEventStore

event_store = SqliteEventStore(“sqlite:///app.db”, create_tables=True) recorder = configure_event_sourcing(event_store)

stabilize.events.configure_event_bus(max_workers=4, error_handler=None)[source]

Configure the global event bus.

Must be called before first use of get_event_bus().

Parameters:
  • max_workers (int) – Maximum worker threads for async handlers.

  • error_handler (Callable[[str, Event, Exception], None] | None) – Optional global error handler.

Returns:

The configured event bus.

Return type:

EventBus

stabilize.events.configure_event_recorder(event_store, publish_to_bus=True)[source]

Configure the global event recorder.

Parameters:
  • event_store (EventStore) – Store for persisting events.

  • publish_to_bus (bool) – Whether to publish events to the bus.

Returns:

The configured event recorder.

Return type:

EventRecorder

Base Types

class stabilize.events.Event(event_id=<factory>, event_type=EventType.STATUS_CHANGED, timestamp=<factory>, sequence=0, entity_type=EntityType.WORKFLOW, entity_id='', workflow_id='', version=0, data=<factory>, metadata=<factory>, schema_version=1)[source]

Immutable event record.

Events are the source of truth for all state changes in the system. They are append-only and can be replayed to reconstruct state.

Parameters:
  • event_id (str)

  • event_type (EventType)

  • timestamp (datetime)

  • sequence (int)

  • entity_type (EntityType)

  • entity_id (str)

  • workflow_id (str)

  • version (int)

  • data (dict[str, Any])

  • metadata (EventMetadata)

  • schema_version (int)

event_id

Unique identifier (ULID for time-ordering).

Type:

str

event_type

Type of event from EventType enum.

Type:

stabilize.events.base.EventType

timestamp

When the event occurred (UTC).

Type:

datetime.datetime

sequence

Global ordering number (assigned by EventStore on append).

Type:

int

entity_type

Type of entity this event is about.

Type:

stabilize.events.base.EntityType

entity_id

ID of the entity.

Type:

str

workflow_id

Workflow ID for correlation (always set).

Type:

str

version

Entity version after this event (for optimistic concurrency).

Type:

int

data

Event-specific payload.

Type:

dict[str, Any]

metadata

Tracing and debugging metadata.

Type:

stabilize.events.base.EventMetadata

with_sequence(sequence)[source]

Return a new event with the given sequence number.

Parameters:

sequence (int)

Return type:

Event

to_dict()[source]

Convert event to dictionary for storage.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create event from dictionary.

Parameters:

data (dict[str, Any])

Return type:

Event

class stabilize.events.EventType(value)[source]

All event types in the system.

Events are organized by entity lifecycle: - workflow.* - Workflow-level events - stage.* - Stage-level events - task.* - Task-level events - status.* - Generic status changes - context.* - Context/output updates

WORKFLOW_CREATED = 'workflow.created'
WORKFLOW_STARTED = 'workflow.started'
WORKFLOW_COMPLETED = 'workflow.completed'
WORKFLOW_FAILED = 'workflow.failed'
WORKFLOW_CANCELED = 'workflow.canceled'
WORKFLOW_PAUSED = 'workflow.paused'
WORKFLOW_RESUMED = 'workflow.resumed'
STAGE_STARTED = 'stage.started'
STAGE_COMPLETED = 'stage.completed'
STAGE_FAILED = 'stage.failed'
STAGE_SKIPPED = 'stage.skipped'
STAGE_CANCELED = 'stage.canceled'
TASK_STARTED = 'task.started'
TASK_COMPLETED = 'task.completed'
TASK_FAILED = 'task.failed'
TASK_RETRIED = 'task.retried'
STATUS_CHANGED = 'status.changed'
CONTEXT_UPDATED = 'context.updated'
OUTPUTS_UPDATED = 'outputs.updated'
JUMP_EXECUTED = 'jump.executed'
class stabilize.events.EntityType(value)[source]

Entity types that can emit events.

WORKFLOW = 'workflow'
STAGE = 'stage'
TASK = 'task'
class stabilize.events.EventMetadata(correlation_id, causation_id=None, actor='system', source_handler=None)[source]

Metadata attached to every event for tracing and debugging.

Parameters:
  • correlation_id (str)

  • causation_id (str | None)

  • actor (str)

  • source_handler (str | None)

correlation_id

Links related events across a workflow execution. Typically the workflow execution ID.

Type:

str

causation_id

ID of the event that caused this event (for event chains).

Type:

str | None

actor

Who or what triggered this event (user ID or “system”).

Type:

str

source_handler

The handler class that emitted this event.

Type:

str | None

to_dict()[source]

Convert metadata to dictionary for storage.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create metadata from dictionary.

Parameters:

data (dict[str, Any])

Return type:

EventMetadata

Event Bus

class stabilize.events.EventBus(max_workers=4, error_handler=None)[source]

Thread-safe in-process event bus.

Supports: - Sync and async subscription modes - Filtering by event type, entity type, and workflow - Error isolation (one handler failure doesn’t affect others) - Statistics tracking

Parameters:
  • max_workers (int)

  • error_handler (Callable[[str, Event, Exception], None] | None)

subscribe(subscription_id, handler, event_types=None, entity_types=None, workflow_filter=None, mode=SubscriptionMode.SYNC)[source]

Subscribe to events.

Parameters:
  • subscription_id (str) – Unique identifier for this subscription.

  • handler (Callable[[Event], Any]) – Callable that receives events.

  • event_types (set[EventType] | None) – Only receive these event types (None = all).

  • entity_types (set[EntityType] | None) – Only receive events for these entity types (None = all).

  • workflow_filter (str | None) – Only receive events for this workflow ID.

  • mode (SubscriptionMode) – SYNC (blocking) or ASYNC (fire-and-forget).

Return type:

None

unsubscribe(subscription_id)[source]

Unsubscribe from events.

Parameters:

subscription_id (str) – The subscription to remove.

Returns:

True if subscription was found and removed.

Return type:

bool

enable_subscription(subscription_id)[source]

Enable a subscription.

Parameters:

subscription_id (str)

Return type:

None

disable_subscription(subscription_id)[source]

Disable a subscription without removing it.

Parameters:

subscription_id (str)

Return type:

None

publish(event)[source]

Publish an event to all matching subscribers.

Sync handlers are called in the current thread. Async handlers are submitted to the thread pool.

Parameters:

event (Event) – The event to publish.

Return type:

None

publish_batch(events)[source]

Publish multiple events.

Parameters:

events (list[Event])

Return type:

None

on_error(handler)[source]

Set or replace the global error handler.

Parameters:

handler (Callable[[str, Event, Exception], None]) – Callable(subscription_id, event, error)

Returns:

The handler (for use as decorator).

Return type:

Callable[[str, Event, Exception], None]

property stats: EventBusStats

Get current statistics.

get_subscriptions()[source]

Get list of subscription IDs.

Return type:

list[str]

shutdown(wait=True)[source]

Shutdown the event bus.

Parameters:

wait (bool) – If True, wait for pending async handlers to complete.

Return type:

None

reset()[source]

Reset statistics and clear subscriptions (for testing).

Return type:

None

class stabilize.events.Subscription(id, handler, event_types=None, entity_types=None, workflow_filter=None, mode=SubscriptionMode.SYNC, enabled=True)[source]

A subscription to events.

Subscribers can filter by: - event_types: Only specific event types - entity_types: Only specific entity types - workflow_id: Only events for a specific workflow

Parameters:
matches(event)[source]

Check if this subscription should receive the event.

Parameters:

event (Event)

Return type:

bool

class stabilize.events.SubscriptionMode(value)[source]

How a subscription processes events.

SYNC = 'sync'
ASYNC = 'async'
class stabilize.events.EventBusStats(events_published=0, events_delivered=0, errors=0, subscriptions_active=0, async_fallbacks=0)[source]

Statistics for the event bus.

Parameters:
  • events_published (int)

  • events_delivered (int)

  • errors (int)

  • subscriptions_active (int)

  • async_fallbacks (int)

stabilize.events.get_event_bus()[source]

Get the global event bus instance.

Return type:

EventBus

stabilize.events.reset_event_bus()[source]

Reset the global event bus (for testing).

Return type:

None

Event Recorder

class stabilize.events.EventRecorder(event_store, publish_to_bus=True)[source]

Records events for all state transitions.

Integrates with EventStore for persistence and EventBus for pub/sub notifications. All record methods return the created event with its assigned sequence number.

Parameters:
stabilize.events.get_event_recorder()[source]

Get the global event recorder instance, if configured.

Return type:

EventRecorder | None

stabilize.events.set_event_context(correlation_id, causation_id=None, actor='system')[source]

Set the current event context for correlation tracking.

Parameters:
  • correlation_id (str) – Links related events (typically workflow ID).

  • causation_id (str | None) – ID of the event that caused this action.

  • actor (str) – Who triggered this action.

Return type:

None

stabilize.events.get_event_metadata(source_handler=None)[source]

Get metadata for a new event from current context.

Parameters:

source_handler (str | None) – The handler creating this event.

Returns:

EventMetadata with current context values.

Return type:

EventMetadata

Event Stores

class stabilize.events.EventStore[source]

Abstract event store interface.

Event stores are append-only - events cannot be modified or deleted. Each event is assigned a monotonically increasing sequence number that provides global ordering.

Implementations must ensure: - Atomic appends (single event or batch) - Unique sequence numbers - Durable storage - Efficient querying by entity, workflow, and time

abstractmethod append(event, connection=None)[source]

Append a single event to the store.

Parameters:
  • event (Event) – The event to append. Sequence will be assigned.

  • connection (Any | None) – Optional database connection for transaction.

Returns:

The event with sequence number assigned.

Return type:

Event

abstractmethod append_batch(events, connection=None)[source]

Append multiple events atomically.

All events are appended in a single transaction. Sequence numbers are assigned in order.

Parameters:
  • events (list[Event]) – Events to append.

  • connection (Any | None) – Optional database connection for transaction.

Returns:

Events with sequence numbers assigned.

Return type:

list[Event]

abstractmethod get_events(query)[source]

Query events matching criteria.

Parameters:

query (EventQuery) – Query parameters.

Returns:

Iterator of matching events ordered by sequence.

Return type:

Iterator[Event]

abstractmethod get_events_for_entity(entity_type, entity_id, from_version=0)[source]

Get all events for a specific entity.

Parameters:
  • entity_type (EntityType) – Type of entity.

  • entity_id (str) – Entity identifier.

  • from_version (int) – Only get events after this version.

Returns:

List of events for the entity.

Return type:

list[Event]

abstractmethod get_events_for_workflow(workflow_id, from_sequence=0)[source]

Get all events for a workflow execution.

Parameters:
  • workflow_id (str) – Workflow identifier.

  • from_sequence (int) – Only get events after this sequence.

Returns:

List of events for the workflow.

Return type:

list[Event]

abstractmethod get_current_sequence()[source]

Get the current (latest) global sequence number.

Returns:

The highest sequence number in the store, or 0 if empty.

Return type:

int

abstractmethod get_events_since(sequence, limit=1000)[source]

Get events since a sequence number.

Used for catch-up subscriptions.

Parameters:
  • sequence (int) – Get events after this sequence.

  • limit (int) – Maximum number of events to return.

Returns:

List of events after the sequence.

Return type:

list[Event]

get_event_by_id(event_id)[source]

Get a single event by its ID.

Parameters:

event_id (str) – The event identifier.

Returns:

The event if found, None otherwise.

Return type:

Event | None

count_events(query=None)[source]

Count events matching query.

Parameters:

query (EventQuery | None) – Optional query parameters.

Returns:

Number of matching events.

Return type:

int

class stabilize.events.EventQuery(entity_type=None, entity_id=None, workflow_id=None, event_types=None, from_sequence=None, to_sequence=None, from_timestamp=None, to_timestamp=None, limit=1000, offset=0, order_by='sequence', ascending=True)[source]

Query parameters for retrieving events.

All fields are optional - unset fields are not filtered. Results are ordered by sequence number ascending.

Parameters:
  • entity_type (EntityType | None)

  • entity_id (str | None)

  • workflow_id (str | None)

  • event_types (list[EventType] | None)

  • from_sequence (int | None)

  • to_sequence (int | None)

  • from_timestamp (datetime | None)

  • to_timestamp (datetime | None)

  • limit (int)

  • offset (int)

  • order_by (str)

  • ascending (bool)

class stabilize.events.AppendResult(events=<factory>, sequences=<factory>)[source]

Result of appending event(s) to the store.

Parameters:
  • events (list[Event])

  • sequences (list[int])

property first_sequence: int

Get the first sequence number assigned.

property last_sequence: int

Get the last sequence number assigned.

class stabilize.events.SqliteEventStore(connection_string, create_tables=True)[source]

SQLite implementation of event store.

Uses the shared ConnectionManager for thread-local connections. Events are stored in an append-only table with auto-incrementing sequence numbers.

Thread-safety: - Uses thread-local connections from ConnectionManager - Sequence assignment is atomic via SQLite’s AUTOINCREMENT

Parameters:
  • connection_string (str)

  • create_tables (bool)

Projections

class stabilize.events.Projection[source]

Abstract base class for projections.

Projections apply events to build a specialized view of the data. They are typically used to: - Build human-readable timelines - Aggregate metrics - Create search indexes - Derive state for specific queries

Projections are stateful and should be reset before replaying events from the beginning.

abstract property name: str

Unique name for this projection.

abstractmethod apply(event)[source]

Apply an event to update the projection state.

This method should be idempotent - applying the same event multiple times should have the same effect as applying it once.

Parameters:

event (Event) – The event to apply.

Return type:

None

abstractmethod get_state()[source]

Get the current projection state.

Returns:

The current state, type depends on the projection.

Return type:

Any

reset()[source]

Reset the projection to its initial state.

Override this method in subclasses if the projection needs cleanup beyond just reinitializing.

Return type:

None

handles_event_type(event)[source]

Check if this projection handles the given event type.

By default, all events are handled. Override to filter.

Parameters:

event (Event) – The event to check.

Returns:

True if this projection should process the event.

Return type:

bool

class stabilize.events.StageMetricsProjection[source]

Aggregates stage execution metrics for analytics.

Tracks: - Execution counts per stage type - Success/failure rates - Duration statistics (avg, median, p95)

property name: str

Unique name for this projection.

apply(event)[source]

Apply an event to update the projection state.

This method should be idempotent - applying the same event multiple times should have the same effect as applying it once.

Parameters:

event (Event) – The event to apply.

Return type:

None

get_state()[source]

Get all stage metrics.

Return type:

dict[str, StageMetrics]

get_task_state()[source]

Get all task metrics.

Return type:

dict[str, TaskMetrics]

get_metrics_for_type(stage_type)[source]

Get metrics for a specific stage type.

Parameters:

stage_type (str)

Return type:

StageMetrics | None

get_task_metrics_for_name(task_name)[source]

Get metrics for a specific task name.

Parameters:

task_name (str)

Return type:

TaskMetrics | None

reset()[source]

Reset all metrics.

Return type:

None

get_top_slowest_stages(n=10)[source]

Get the n slowest stage types by average duration.

Parameters:

n (int)

Return type:

list[tuple[str, float]]

get_top_failing_stages(n=10)[source]

Get the n stage types with highest failure rates.

Parameters:

n (int)

Return type:

list[tuple[str, float]]

to_dict()[source]

Convert all metrics to dictionary.

Return type:

dict[str, Any]

class stabilize.events.StageMetrics(stage_type, execution_count=0, success_count=0, failure_count=0, skip_count=0, durations=<factory>)[source]

Aggregated metrics for a stage type.

Parameters:
  • stage_type (str)

  • execution_count (int)

  • success_count (int)

  • failure_count (int)

  • skip_count (int)

  • durations (list[int])

property success_rate: float

Calculate success rate as a percentage.

property failure_rate: float

Calculate failure rate as a percentage.

property avg_duration_ms: float

Calculate average duration in milliseconds.

property median_duration_ms: float

Calculate median duration in milliseconds.

property p95_duration_ms: float

Calculate 95th percentile duration in milliseconds.

property min_duration_ms: int

Get minimum duration in milliseconds.

property max_duration_ms: int

Get maximum duration in milliseconds.

to_dict()[source]

Convert to dictionary for serialization.

Return type:

dict[str, Any]

class stabilize.events.WorkflowTimelineProjection(workflow_id)[source]

Builds a human-readable timeline from workflow events.

The timeline shows: - Workflow start/end with status - Stage start/end with duration - Task start/end with duration - Any failures or skips

Parameters:

workflow_id (str)

property name: str

Unique name for this projection.

apply(event)[source]

Apply an event to the timeline.

Parameters:

event (Event)

Return type:

None

get_state()[source]

Get the current timeline.

Return type:

WorkflowTimeline

reset()[source]

Reset the projection.

Return type:

None

get_stages()[source]

Get only stage-related entries.

Return type:

list[TimelineEntry]

get_tasks()[source]

Get only task-related entries.

Return type:

list[TimelineEntry]

get_failures()[source]

Get only failure events.

Return type:

list[TimelineEntry]

class stabilize.events.WorkflowTimeline(workflow_id, entries=<factory>, total_duration_ms=None, status=None, start_time=None, end_time=None)[source]

Complete timeline for a workflow execution.

Parameters:
  • workflow_id (str)

  • entries (list[TimelineEntry])

  • total_duration_ms (int | None)

  • status (str | None)

  • start_time (datetime | None)

  • end_time (datetime | None)

to_dict()[source]

Convert to dictionary for serialization.

Return type:

dict[str, Any]

class stabilize.events.TimelineEntry(timestamp, event_type, entity_type, entity_id, entity_name=None, status=None, duration_ms=None, details=<factory>)[source]

A single entry in the workflow timeline.

Parameters:
  • timestamp (datetime)

  • event_type (str)

  • entity_type (str)

  • entity_id (str)

  • entity_name (str | None)

  • status (str | None)

  • duration_ms (int | None)

  • details (dict[str, Any])

to_dict()[source]

Convert to dictionary for serialization.

Return type:

dict[str, Any]

Replay

class stabilize.events.EventReplayer(event_store, snapshot_store=None)[source]

Replays events to reconstruct state.

Supports: - Full state reconstruction from events - Time-travel queries (state at a specific point) - Incremental replay from snapshots - Custom event handlers for testing/debugging

Parameters:
rebuild_workflow_state(workflow_id, as_of_sequence=None)[source]

Rebuild complete workflow state from events.

Parameters:
  • workflow_id (str) – The workflow to rebuild.

  • as_of_sequence (int | None) – Rebuild state as of this sequence (for time travel).

Returns:

Dictionary containing the complete workflow state.

Return type:

dict[str, Any]

time_travel_query(workflow_id, as_of_time)[source]

Get workflow state at a specific point in time.

Parameters:
  • workflow_id (str) – The workflow to query.

  • as_of_time (datetime) – Get state as of this timestamp.

Returns:

Dictionary containing the workflow state at that time.

Return type:

dict[str, Any]

replay_workflow_from_checkpoint(workflow_id, checkpoint_sequence, event_handler)[source]

Replay events through a custom handler.

Useful for: - Testing event handlers - Debugging workflow execution - Building custom projections

Parameters:
  • workflow_id (str) – The workflow to replay.

  • checkpoint_sequence (int) – Start replaying from this sequence.

  • event_handler (Callable[[Event], None]) – Handler to call for each event.

Return type:

None

get_entity_history(entity_type, entity_id)[source]

Get the complete event history for an entity.

Parameters:
  • entity_type (EntityType) – Type of entity.

  • entity_id (str) – Entity identifier.

Returns:

List of events for the entity.

Return type:

list[dict[str, Any]]

compare_states(workflow_id, sequence1, sequence2)[source]

Compare workflow state at two different points.

Useful for debugging what changed between two points in time.

Parameters:
  • workflow_id (str) – The workflow to compare.

  • sequence1 (int) – First sequence number.

  • sequence2 (int) – Second sequence number.

Returns:

Dictionary showing differences between the two states.

Return type:

dict[str, Any]

class stabilize.events.ReplayResult(entity_type, entity_id, final_state=<factory>, events_replayed=0, from_snapshot=False, snapshot_sequence=0)[source]

Result of replaying events for an entity.

Parameters:
  • entity_type (EntityType)

  • entity_id (str)

  • final_state (dict[str, Any])

  • events_replayed (int)

  • from_snapshot (bool)

  • snapshot_sequence (int)

class stabilize.events.WorkflowState(workflow_id, status=None, application=None, name=None, start_time=None, end_time=None, context=<factory>, stages=<factory>, tasks=<factory>)[source]

Reconstructed workflow state from events.

Parameters:
  • workflow_id (str)

  • status (str | None)

  • application (str | None)

  • name (str | None)

  • start_time (datetime | None)

  • end_time (datetime | None)

  • context (dict[str, Any])

  • stages (dict[str, dict[str, Any]])

  • tasks (dict[str, dict[str, Any]])

to_dict()[source]

Convert to dictionary.

Return type:

dict[str, Any]

Snapshots

class stabilize.events.Snapshot(entity_type, entity_id, workflow_id, version, sequence, state=<factory>, created_at=<factory>)[source]

A snapshot of entity state at a point in time.

Snapshots capture: - Entity identity (type, id, workflow) - Version at snapshot time - Last event sequence included - Serialized state

Parameters:
  • entity_type (EntityType)

  • entity_id (str)

  • workflow_id (str)

  • version (int)

  • sequence (int)

  • state (dict[str, Any])

  • created_at (datetime)

to_dict()[source]

Convert snapshot to dictionary for storage.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create snapshot from dictionary.

Parameters:

data (dict[str, Any])

Return type:

Snapshot

class stabilize.events.SnapshotStore(event_store, policy=None)[source]

Store for managing entity snapshots.

Uses the event store’s snapshot tables for persistence.

Parameters:
save_snapshot(snapshot)[source]

Save a snapshot.

Parameters:

snapshot (Snapshot) – The snapshot to save.

Return type:

None

get_latest_snapshot(entity_type, entity_id)[source]

Get the latest snapshot for an entity.

Parameters:
  • entity_type (EntityType) – Type of entity.

  • entity_id (str) – Entity identifier.

Returns:

The latest snapshot or None if not found.

Return type:

Snapshot | None

create_workflow_snapshot(workflow_state, workflow_id, version, sequence)[source]

Create a snapshot for a workflow.

Parameters:
  • workflow_state (dict[str, Any]) – The workflow state to snapshot.

  • workflow_id (str) – Workflow identifier.

  • version (int) – Current version.

  • sequence (int) – Last event sequence.

Returns:

The created snapshot.

Return type:

Snapshot

should_snapshot(entity_id, events_since_snapshot)[source]

Check if an entity should be snapshotted.

Parameters:
  • entity_id (str)

  • events_since_snapshot (int)

Return type:

bool

record_event(entity_id)[source]

Record that an event was added.

Parameters:

entity_id (str)

Return type:

None

class stabilize.events.SnapshotPolicy(event_interval=100, time_interval_seconds=3600)[source]

Determines when to create snapshots.

Snapshots are created based on: - Number of events since last snapshot - Time since last snapshot - Entity importance (workflows more than tasks)

Parameters:
  • event_interval (int)

  • time_interval_seconds (int)

should_snapshot(entity_id, events_since)[source]

Check if an entity should be snapshotted.

Parameters:
  • entity_id (str) – The entity identifier.

  • events_since (int) – Number of events since last snapshot.

Returns:

True if a snapshot should be created.

Return type:

bool

record_snapshot(entity_id)[source]

Record that a snapshot was created.

Parameters:

entity_id (str)

Return type:

None

record_event(entity_id)[source]

Record that an event was added for an entity.

Parameters:

entity_id (str)

Return type:

None

Subscriptions

class stabilize.events.DurableSubscription(id, handler, event_types=None, entity_filter=None, last_sequence=0, webhook_url=None, enabled=True, error_count=0, max_errors=10)[source]

A durable subscription that survives restarts.

Position is persisted to the database, allowing the subscription to resume from where it left off.

Parameters:
  • id (str)

  • handler (Callable[[Event], None])

  • event_types (list[EventType] | None)

  • entity_filter (dict[str, Any] | None)

  • last_sequence (int)

  • webhook_url (str | None)

  • enabled (bool)

  • error_count (int)

  • max_errors (int)

matches(event)[source]

Check if this subscription should receive the event.

Parameters:

event (Event)

Return type:

bool

class stabilize.events.SubscriptionManager(event_store, poll_interval=1.0, batch_size=100)[source]

Manages durable subscriptions that survive restarts.

Features: - Persisted position (survives restarts) - Catch-up from last position - At-least-once delivery - Automatic error handling with backoff - Optional webhook integration

Parameters:
  • event_store (EventStore)

  • poll_interval (float)

  • batch_size (int)

create_subscription(subscription_id, handler, event_types=None, entity_filter=None, webhook_url=None, start_from='latest')[source]

Create a durable subscription.

Parameters:
  • subscription_id (str) – Unique identifier for this subscription.

  • handler (Callable[[Event], None]) – Function to call with each event.

  • event_types (list[EventType] | None) – Only receive these event types (None = all).

  • entity_filter (dict[str, Any] | None) – Filter by entity properties.

  • webhook_url (str | None) – Optional webhook URL to call.

  • start_from (str) – “latest” (current position), “beginning” (sequence 0), or a sequence number string.

Return type:

None

delete_subscription(subscription_id)[source]

Delete a subscription.

Parameters:

subscription_id (str) – The subscription to delete.

Returns:

True if subscription was found and deleted.

Return type:

bool

load_subscription(subscription_id, handler)[source]

Load a persisted subscription.

Parameters:
  • subscription_id (str) – The subscription to load.

  • handler (Callable[[Event], None]) – Handler function for events.

Returns:

True if subscription was found and loaded.

Return type:

bool

start()[source]

Start polling for events.

Return type:

None

stop()[source]

Stop polling for events.

Return type:

None

process_once()[source]

Process pending events once (for testing).

Returns:

Number of events processed.

Return type:

int

get_subscription_status(subscription_id)[source]

Get status of a subscription.

Parameters:

subscription_id (str)

Return type:

dict[str, Any] | None

get_all_subscription_status()[source]

Get status of all subscriptions.

Return type:

list[dict[str, Any]]