"""
SkipStageHandler - handles stage skipping.
This handler is invoked when a stage should be skipped due to conditional
execution or start time expiry.
"""
from __future__ import annotations
import logging
from datetime import timedelta
from typing import TYPE_CHECKING
from stabilize.handlers.base import StabilizeHandler
from stabilize.models.status import WorkflowStatus
from stabilize.queue.messages import (
CompleteWorkflow,
ContinueParentStage,
SkipStage,
StartStage,
)
from stabilize.resilience.config import HandlerConfig
if TYPE_CHECKING:
from stabilize.events.recorder import EventRecorder
from stabilize.models.stage import StageExecution
from stabilize.persistence.store import WorkflowStore
from stabilize.queue import Queue
logger = logging.getLogger(__name__)
[docs]
class SkipStageHandler(StabilizeHandler[SkipStage]):
"""
Handler for SkipStage messages.
Execution flow:
1. Check if stage is still in a skippable state
2. Set stage status to SKIPPED
3. Set end time
4. Atomically store the stage and trigger downstream stages
"""
def __init__(
self,
queue: Queue,
repository: WorkflowStore,
retry_delay: timedelta | None = None,
handler_config: HandlerConfig | None = None,
event_recorder: EventRecorder | None = None,
) -> None:
super().__init__(queue, repository, retry_delay, handler_config, event_recorder=event_recorder)
@property
def message_type(self) -> type[SkipStage]:
return SkipStage
[docs]
def handle(self, message: SkipStage) -> None:
"""Handle the SkipStage message.
Retries on ConcurrencyError (optimistic lock failure) using
configurable retry settings.
"""
self.retry_on_concurrency_error(
lambda: self._handle_with_retry(message),
f"skipping stage {message.stage_id}",
)
def _handle_with_retry(self, message: SkipStage) -> None:
"""Inner handle logic to be retried."""
def on_stage(stage: StageExecution) -> None:
# Check if stage is still in a skippable state
# Only NOT_STARTED stages can be skipped - RUNNING stages should be CANCELED
# (VALID_TRANSITIONS doesn't allow RUNNING -> SKIPPED)
if stage.status != WorkflowStatus.NOT_STARTED:
logger.debug(
"Ignoring SkipStage for %s (%s) - already %s (only NOT_STARTED can be skipped)",
stage.name,
stage.id,
stage.status,
)
return
# Mark stage as skipped
self.set_stage_status(stage, WorkflowStatus.SKIPPED)
stage.end_time = self.current_time_millis()
logger.info("Skipped stage %s (%s)", stage.name, stage.id)
if self.event_recorder:
self.set_event_context(stage.execution.id if stage.execution else "")
self.event_recorder.record_stage_skipped(
stage, reason="Stage skipped", source_handler="SkipStageHandler"
)
# Get downstream stages and parent info BEFORE transaction
execution = stage.execution
downstream_stages = self.repository.get_downstream_stages(execution.id, stage.ref_id)
phase = stage.synthetic_stage_owner
# Atomic: store stage + push all downstream/parent messages together
with self.repository.transaction(self.queue) as txn:
txn.store_stage(stage)
# Message deduplication
if message.message_id:
txn.mark_message_processed(
message_id=message.message_id,
handler_type="SkipStage",
execution_id=message.execution_id,
)
if downstream_stages:
# Start all downstream stages
for downstream in downstream_stages:
txn.push_message(
StartStage(
execution_type=execution.type.value,
execution_id=execution.id,
stage_id=downstream.id,
)
)
elif phase is not None:
# Synthetic stage - notify parent
parent_id = stage.parent_stage_id
if parent_id:
txn.push_message(
ContinueParentStage(
execution_type=execution.type.value,
execution_id=execution.id,
stage_id=parent_id,
phase=phase,
)
)
else:
# Terminal stage - complete workflow
txn.push_message(
CompleteWorkflow(
execution_type=execution.type.value,
execution_id=execution.id,
)
)
self.with_stage(message, on_stage)