Source code for stabilize.handlers.cancel_region

"""
CancelRegionHandler - cancels all stages in a named region.

Implements WCP-25: Cancel Region.
Finds all stages with matching cancel_region and pushes CancelStage for each active one.
"""

from __future__ import annotations

import logging
from datetime import timedelta
from typing import TYPE_CHECKING

from stabilize.handlers.base import StabilizeHandler
from stabilize.queue.messages import CancelRegion, CancelStage
from stabilize.resilience.config import HandlerConfig

if TYPE_CHECKING:
    from stabilize.events.recorder import EventRecorder
    from stabilize.persistence.store import WorkflowStore
    from stabilize.queue import Queue

logger = logging.getLogger(__name__)


[docs] class CancelRegionHandler(StabilizeHandler[CancelRegion]): """ Handler for CancelRegion messages. Execution flow: 1. Load the workflow execution 2. Find all stages with matching cancel_region 3. For each active stage, push CancelStage """ def __init__( self, queue: Queue, repository: WorkflowStore, retry_delay: timedelta | None = None, handler_config: HandlerConfig | None = None, event_recorder: EventRecorder | None = None, ) -> None: super().__init__(queue, repository, retry_delay, handler_config, event_recorder=event_recorder) @property def message_type(self) -> type[CancelRegion]: return CancelRegion
[docs] def handle(self, message: CancelRegion) -> None: """Handle the CancelRegion message.""" def on_execution(execution: object) -> None: from stabilize.models.workflow import Workflow if not isinstance(execution, Workflow): return region = message.region if not region: logger.warning("CancelRegion message with empty region, ignoring") return # Find all stages in the region that are still active stages_to_cancel = [s for s in execution.stages if s.cancel_region == region and not s.status.is_complete] if not stages_to_cancel: logger.debug( "No active stages found in cancel region '%s'", region, ) return logger.info( "Cancelling %d stages in region '%s'", len(stages_to_cancel), region, ) with self.repository.transaction(self.queue) as txn: if message.message_id: txn.mark_message_processed( message_id=message.message_id, handler_type="CancelRegion", execution_id=message.execution_id, ) for stage in stages_to_cancel: txn.push_message( CancelStage( execution_type=message.execution_type, execution_id=message.execution_id, stage_id=stage.id, ) ) self.with_execution(message, on_execution)