Resilience
Stabilize is designed to survive failures through optimistic locking, atomic deduplication, crash recovery, circuit breakers, bulkheads, and configuration versioning.
Optimistic Locking
The database layer uses a version column to detect concurrent modifications. Every stage and task tracks its version, and updates include a version check:
UPDATE stage_executions SET
status = :status,
version = version + 1
WHERE id = :id AND version = :expected_version
If the WHERE clause fails (another process modified the row), a ConcurrencyError is raised:
from stabilize.errors import ConcurrencyError
try:
repository.store_stage(stage)
except ConcurrencyError:
# Another process modified this stage
# Handle accordingly (retry, ignore, or fail)
pass
Phase-Aware Optimistic Locking
For stronger guarantees, Stabilize supports phase-aware locking via expected_phase:
# Update only if stage is still RUNNING
repository.store_stage(
stage,
expected_phase="RUNNING"
)
This prevents state transitions from stale handlers. The phase_version property combines status and version:
stage = get_stage(stage_id)
phase, version = stage.phase_version # ("RUNNING", 3)
State Snapshots
For read-only access to state, use frozen snapshots:
from stabilize.models.snapshot import StageStateSnapshot
# Get immutable copy
snapshot = stage.state_snapshot()
# snapshot.status, snapshot.version, snapshot.context, snapshot.outputs
# All are read-only
StartStage Race Condition
A common race condition occurs when multiple upstream stages complete simultaneously and all try to start the same downstream stage:
A B
\ /
\ /
\ /
C <- Both A and B complete, both send StartStage for C
The StartStageHandler safely handles this by catching ConcurrencyError:
try:
with self.repository.transaction(self.queue) as txn:
txn.store_stage(stage)
txn.push_message(StartTask(...))
except ConcurrencyError:
# Another handler already started this stage - safe to ignore
logger.debug("Stage already started by another handler")
return
This is safe because the stage is already being processed by another handler, and message deduplication prevents duplicate task execution.
Atomic Deduplication
Message processing is idempotent. The engine atomically checks if a message was processed within the same transaction as the state update. This guarantees “exactly-once” semantics for state transitions.
Each message has a unique message_id, and processed messages are tracked in the processed_messages table:
with repository.transaction(queue) as txn:
txn.store_stage(stage)
txn.mark_message_processed(
message_id=message.message_id,
handler_type="StartStage",
execution_id=message.execution_id,
)
txn.push_message(next_message)
If the transaction rolls back, the message is not marked as processed and will be retried.
Bloom Filter Deduplication
For high-throughput scenarios, Stabilize uses a probabilistic bloom filter as a first-pass deduplication check:
from stabilize.queue.dedup import BloomDeduplicator, get_deduplicator
# Get the global deduplicator (singleton)
dedup = get_deduplicator(
expected_items=100_000,
false_positive_rate=0.001
)
# Check if message might have been seen
if dedup.maybe_seen(message_id):
# Possible duplicate - fall back to DB check
if db.is_message_processed(message_id):
return # Skip duplicate
# Process message...
# Mark as seen for future fast-path checks
dedup.mark_seen(message_id)
The bloom filter guarantees:
No false negatives: If it says “not seen”, it’s definitely new
Low false positives: ~0.1% chance of unnecessary DB lookups
Thread-safe: Safe for concurrent access
Monitor filter health:
dedup.fill_ratio # 0.0 to 1.0 (reset when > 0.7)
dedup.items_added # Count of items marked
dedup.size_bytes # Memory usage
dedup.should_reset() # True if filter is getting full
Circuit Breakers
Circuit breakers prevent cascading failures when external services are down. After a threshold of failures, the circuit opens and subsequent calls fail fast without attempting the operation.
Stabilize provides per-workflow, per-task-type circuit breakers:
from stabilize.resilience.circuits import WorkflowCircuitFactory
from stabilize.resilience.config import ResilienceConfig
config = ResilienceConfig(
circuit_failure_threshold=Fraction(5, 10), # 50% failure rate
circuit_cooldown_seconds=30.0, # Wait 30s before retry
)
factory = WorkflowCircuitFactory(config)
# Get circuit for a specific workflow and task type
circuit = factory.get_circuit("workflow_123", "http")
Circuit breaker states:
CLOSED: Normal operation, requests pass through
OPEN: Failures exceeded threshold, requests fail immediately
HALF-OPEN: After cooldown, one request allowed to test recovery
When a circuit opens, a TransientError is raised:
from stabilize.errors import TransientError
try:
result = execute_with_resilience(...)
except TransientError as e:
if "Circuit breaker open" in str(e):
# Back off and retry later
pass
Bulkheads
Bulkheads isolate task types (e.g., HTTP vs Shell) to prevent resource exhaustion in one area from affecting others. Each task type has its own thread pool with independent limits.
from stabilize.resilience.bulkheads import TaskBulkheadManager
from stabilize.resilience.config import ResilienceConfig
config = ResilienceConfig()
manager = TaskBulkheadManager(config)
# Execute with bulkhead isolation
result = manager.execute_with_timeout(
task_type="http",
func=make_request,
url=api_url,
timeout=30.0
)
if result.success:
response = result.result
else:
handle_error(result.error)
Default bulkhead limits per task type:
Task Type |
Max Concurrent |
Max Queue Size |
|---|---|---|
shell |
5 |
20 |
python |
3 |
20 |
http |
10 |
20 |
docker |
3 |
20 |
ssh |
5 |
20 |
Customize via environment variables:
export STABILIZE_BULKHEAD_HTTP_MAX_CONCURRENT=20
export STABILIZE_BULKHEAD_SHELL_MAX_CONCURRENT=10
Monitor bulkhead health:
stats = manager.get_all_stats()
# {'shell': {'active': 2, 'queued': 0, ...}, 'http': {...}}
Finalizers and Cleanup
Tasks can register cleanup callbacks (finalizers) that execute when a stage enters a terminal state:
from stabilize.tasks.interface import Task
from stabilize.finalizers import get_finalizer_registry
class MyTask(Task):
def execute(self, stage):
# Register cleanup
registry = get_finalizer_registry()
registry.register(
stage.id,
"cleanup_temp_files",
lambda: cleanup_temp_files(stage)
)
# Do work...
return TaskResult.success()
def on_cleanup(self, stage):
"""Called automatically on terminal state."""
cleanup_resources(stage)
Finalizers are:
Guaranteed to run: Even on process crash (on restart)
Timeout-protected: Each finalizer has a 30s timeout
Logged: Results are tracked for debugging
View pending finalizers:
registry = get_finalizer_registry()
pending = registry.pending() # List of stage IDs with pending cleanup
Configuration Versioning
Stabilize configurations are immutable and versioned. This ensures consistent behavior and enables config drift detection.
All config classes are frozen dataclasses:
from stabilize.resilience.config import (
ResilienceConfig,
HandlerConfig,
BulkheadConfig
)
# Configs are immutable
config = HandlerConfig()
config.max_workers = 20 # Raises FrozenInstanceError!
# Each config has a fingerprint
fingerprint = config.config_fingerprint() # e.g., "a1b2c3d4e5f67890"
Workflow stores the config version used at start:
workflow = Workflow(
application="my_app",
name="pipeline",
config_version=config.config_fingerprint()
)
This enables:
Drift detection: Compare current vs. stored config
Reproducibility: Know exact config used for any workflow
Audit trails: Track config changes over time
Load config from environment:
# Reads STABILIZE_* environment variables
config = ResilienceConfig.from_env()
# Or with custom values
config = ResilienceConfig(
database_url="postgresql://user:pass@host/db",
circuit_failure_threshold=Fraction(3, 10),
circuit_cooldown_seconds=60.0,
)
Crash Recovery
On startup, WorkflowRecovery scans for “stuck” workflows (e.g., process crash during execution) and re-queues them safely:
from stabilize.recovery import WorkflowRecovery, recover_on_startup
# At application startup
recovery = WorkflowRecovery(store, queue)
results = recovery.recover_pending_workflows()
# Or use the convenience function
recover_on_startup(store, queue)
Recovery automatically:
Finds workflows in
RUNNINGorNOT_STARTEDstateRe-queues their current stages for continuation
Uses idempotency to prevent duplicate work
Executes pending finalizers for crashed stages
DAG Readiness Evaluation
Stabilize uses a pure function to evaluate stage readiness:
from stabilize.dag.readiness import (
evaluate_readiness,
PredicatePhase,
ReadinessResult
)
result = evaluate_readiness(
stage=stage,
upstream_stages=upstream_stages,
jump_bypass=False
)
if result.phase == PredicatePhase.READY:
# All upstreams complete, proceed
start_stage(stage)
elif result.phase == PredicatePhase.NOT_READY:
# Upstreams still running, re-queue
requeue_message()
elif result.phase == PredicatePhase.SKIP:
# Conditions not met (upstream failed/canceled)
skip_stage(stage, reason=result.reason)
The ReadinessResult includes:
phase: READY, NOT_READY, SKIP, or UNDEFINEDreason: Human-readable explanationfailed_upstream_ids: List of upstream stages that failed
Key Files
src/stabilize/handlers/start_stage.py- ConcurrencyError handlingsrc/stabilize/persistence/store.py- Optimistic locking implementationsrc/stabilize/recovery.py- Crash recovery logicsrc/stabilize/errors.py- ConcurrencyError definitionsrc/stabilize/resilience/circuits.py- Circuit breaker factorysrc/stabilize/resilience/bulkheads.py- Bulkhead managersrc/stabilize/resilience/config.py- Frozen configurationsrc/stabilize/queue/dedup.py- Bloom filter deduplicationsrc/stabilize/dag/readiness.py- DAG readiness evaluationsrc/stabilize/finalizers.py- Cleanup callback registry