Source code for stabilize.handlers.start_workflow

"""
StartWorkflowHandler - handles pipeline execution startup.

This handler is triggered when a new pipeline execution is started.
It finds initial stages (those with no dependencies) and queues them
for execution.
"""

from __future__ import annotations

import logging
from datetime import timedelta
from typing import TYPE_CHECKING

from stabilize.audit import audit
from stabilize.handlers.base import StabilizeHandler
from stabilize.models.status import WorkflowStatus
from stabilize.persistence.store import WorkflowCriteria
from stabilize.queue.messages import (
    CancelWorkflow,
    StartStage,
    StartWorkflow,
)
from stabilize.resilience.config import HandlerConfig

if TYPE_CHECKING:
    from stabilize.events.recorder import EventRecorder
    from stabilize.models.workflow import Workflow
    from stabilize.persistence.store import WorkflowStore
    from stabilize.queue import Queue

logger = logging.getLogger(__name__)


[docs] class StartWorkflowHandler(StabilizeHandler[StartWorkflow]): """ Handler for StartWorkflow messages. When a pipeline execution starts: 1. Check if execution should be queued (concurrent limits) 2. Find initial stages (no dependencies) 3. Push StartStage for each initial stage 4. Mark execution as RUNNING """ def __init__( self, queue: Queue, repository: WorkflowStore, retry_delay: timedelta | None = None, handler_config: HandlerConfig | None = None, event_recorder: EventRecorder | None = None, ) -> None: super().__init__(queue, repository, retry_delay, handler_config, event_recorder) @property def message_type(self) -> type[StartWorkflow]: return StartWorkflow
[docs] def handle(self, message: StartWorkflow) -> None: """Handle the StartWorkflow message. Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings. """ self.retry_on_concurrency_error( lambda: self._handle_with_retry(message), f"starting workflow {message.execution_id}", )
def _handle_with_retry(self, message: StartWorkflow) -> None: """Inner handle logic to be retried.""" def on_execution(execution: Workflow) -> None: # Check if already started or canceled if execution.status != WorkflowStatus.NOT_STARTED: logger.warning( "Execution %s already has status %s, ignoring StartWorkflow", execution.id, execution.status, ) return if execution.is_canceled: logger.info("Execution %s was canceled before start", execution.id) self._terminate(execution) return # Check if start time has expired if self._is_after_start_time_expiry(execution): logger.warning("Execution %s start time expired, canceling", execution.id) self.queue.push( CancelWorkflow( execution_type=message.execution_type, execution_id=message.execution_id, user="system", reason="Could not begin execution before start time expiry", ) ) return # Check if should queue (concurrent execution limits) if self._should_queue(execution): logger.info( "Execution %s queued due to concurrent execution limit (limit=%d)", execution.id, execution.max_concurrent_executions, ) self.set_workflow_status(execution, WorkflowStatus.BUFFERED) # Atomic: update execution status + message deduplication with self.repository.transaction(self.queue) as txn: txn.update_workflow_status(execution) if message.message_id: txn.mark_message_processed( message_id=message.message_id, handler_type="StartWorkflow", execution_id=message.execution_id, ) return self._start(execution, message) self.with_execution(message, on_execution) def _should_queue(self, execution: Workflow) -> bool: """Check if execution should be queued due to concurrency limits.""" if not execution.is_limit_concurrent or not execution.pipeline_config_id: return False if execution.max_concurrent_executions <= 0: return False # Count currently running executions for this pipeline config criteria = WorkflowCriteria( statuses={WorkflowStatus.RUNNING}, page_size=execution.max_concurrent_executions + 1, # Fetch just enough to check limit ) running_count = 0 for _ in self.repository.retrieve_by_pipeline_config_id(execution.pipeline_config_id, criteria): running_count += 1 if running_count >= execution.max_concurrent_executions: return True return False def _start( self, execution: Workflow, message: StartWorkflow, ) -> None: """Start the execution.""" initial_stages = execution.initial_stages() if not initial_stages: logger.warning("No initial stages found for execution %s", execution.id) self.set_workflow_status(execution, WorkflowStatus.TERMINAL) # Atomic: update execution status + message deduplication with self.repository.transaction(self.queue) as txn: txn.update_workflow_status(execution) if message.message_id: txn.mark_message_processed( message_id=message.message_id, handler_type="StartWorkflow", execution_id=message.execution_id, ) # Publish ExecutionComplete event return # Mark as running self.set_workflow_status(execution, WorkflowStatus.RUNNING) execution.start_time = self.current_time_millis() # Atomic: update execution status + queue all initial stages + message deduplication with self.repository.transaction(self.queue) as txn: txn.update_workflow_status(execution) if message.message_id: txn.mark_message_processed( message_id=message.message_id, handler_type="StartWorkflow", execution_id=message.execution_id, ) for stage in initial_stages: logger.debug( "Queuing initial stage %s (%s) for execution %s", stage.name, stage.id, execution.id, ) txn.push_message( StartStage( execution_type=message.execution_type, execution_id=message.execution_id, stage_id=stage.id, ) ) # Record events if event recorder is configured if self.event_recorder: self.set_event_context(execution.id) self.event_recorder.record_workflow_created( execution, source_handler="StartWorkflowHandler", ) self.event_recorder.record_workflow_started( execution, initial_stage_ids=[s.id for s in initial_stages], source_handler="StartWorkflowHandler", ) # Audit log audit( event_type="WORKFLOW_LIFECYCLE", action="START", user=execution.trigger.user or "system", resource_type="workflow", resource_id=execution.id, details={ "application": execution.application, "name": execution.name, "initial_stages": len(initial_stages), }, ) logger.info( "Started execution %s with %d initial stage(s)", execution.id, len(initial_stages), ) def _terminate(self, execution: Workflow) -> None: """Terminate a canceled execution.""" # Publish ExecutionComplete event if execution.pipeline_config_id: # Queue start waiting executions pass def _is_after_start_time_expiry(self, execution: Workflow) -> bool: """Check if current time is past start time expiry.""" if execution.start_time_expiry is None: return False return self.current_time_millis() > execution.start_time_expiry