Source code for stabilize.handlers.start_waiting_workflows

"""
StartWaitingWorkflowsHandler - starts buffered executions.

This handler is triggered when a pipeline execution completes.
It checks if there are any BUFFERED executions for the same pipeline config
and starts them if capacity allows.
"""

from __future__ import annotations

import logging
from datetime import timedelta
from typing import TYPE_CHECKING

from stabilize.handlers.base import StabilizeHandler
from stabilize.models.status import WorkflowStatus
from stabilize.persistence.store import WorkflowCriteria
from stabilize.queue.messages import (
    StartWaitingWorkflows,
    StartWorkflow,
)
from stabilize.resilience.config import HandlerConfig

if TYPE_CHECKING:
    from stabilize.events.recorder import EventRecorder
    from stabilize.persistence.store import WorkflowStore
    from stabilize.queue import Queue

logger = logging.getLogger(__name__)


[docs] class StartWaitingWorkflowsHandler(StabilizeHandler[StartWaitingWorkflows]): """ Handler for StartWaitingWorkflows messages. Execution flow: 1. Find buffered executions for the pipeline config 2. Check concurrent limits 3. Start executions up to the limit 4. If purge_queue is True, cancel remaining buffered executions """ def __init__( self, queue: Queue, repository: WorkflowStore, retry_delay: timedelta | None = None, handler_config: HandlerConfig | None = None, event_recorder: EventRecorder | None = None, ) -> None: super().__init__(queue, repository, retry_delay, handler_config, event_recorder=event_recorder) @property def message_type(self) -> type[StartWaitingWorkflows]: return StartWaitingWorkflows
[docs] def handle(self, message: StartWaitingWorkflows) -> None: """Handle the StartWaitingWorkflows message. Retries on ConcurrencyError (optimistic lock failure) using configurable retry settings. """ self.retry_on_concurrency_error( lambda: self._handle_with_retry(message), f"starting waiting workflows for {message.pipeline_config_id}", )
def _handle_with_retry(self, message: StartWaitingWorkflows) -> None: """Inner handle logic to be retried.""" if not message.pipeline_config_id: return # Get buffered executions (oldest first) criteria = WorkflowCriteria( statuses={WorkflowStatus.BUFFERED}, page_size=self.handler_config.default_page_size, ) # Note: repository retrieve usually orders by start_time DESC. # We want oldest first (FIFO). Ideally we'd sort by created_at ASC. # The default implementation sorts by start_time DESC. # Start time for buffered is NOT_STARTED time? Or created_at? # store.retrieve... returns executions. # We will fetch and sort in memory for now. buffered = list(self.repository.retrieve_by_pipeline_config_id(message.pipeline_config_id, criteria)) # Sort by creation time (assuming id is ULID/monotonic or created_at) # ULID is monotonic. buffered.sort(key=lambda w: w.id) if not buffered: return # Check running count running_criteria = WorkflowCriteria(statuses={WorkflowStatus.RUNNING}) running = list(self.repository.retrieve_by_pipeline_config_id(message.pipeline_config_id, running_criteria)) running_count = len(running) # We assume all buffered have same config limit (from pipeline config) # Use limit from first buffered execution limit = buffered[0].max_concurrent_executions if limit <= 0: # No limit? Just start all? # If they were buffered, they presumably had a limit. # But if config changed, maybe now 0. # Safety: start one by one. available_slots = len(buffered) else: available_slots = max(0, limit - running_count) logger.info( "Processing waiting workflows for %s: %d buffered, %d running, %d limit, %d slots", message.pipeline_config_id, len(buffered), running_count, limit, available_slots, ) for i, execution in enumerate(buffered): if i < available_slots: # Promote to NOT_STARTED and push StartWorkflow message atomically self.set_workflow_status(execution, WorkflowStatus.NOT_STARTED) logger.info("Promoting buffered execution %s to queued", execution.id) # Atomic: update execution status + push StartWorkflow with self.repository.transaction(self.queue) as txn: txn.update_workflow_status(execution) txn.push_message( StartWorkflow( execution_type=execution.type.value, execution_id=execution.id, ) ) if self.event_recorder: self.set_event_context(execution.id) self.event_recorder.record_workflow_started( execution, source_handler="StartWaitingWorkflowsHandler" ) elif message.purge_queue: # Cancel remaining logger.info("Purging waiting execution %s", execution.id) self.repository.cancel( execution.id, canceled_by="system", reason="Queue purged by keepWaitingPipelines=False policy", ) else: # Leave in buffer pass # Mark message as processed after all buffered executions have been handled # This ensures idempotency - if re-delivered, the same buffered executions # won't be found (they're now NOT_STARTED or CANCELED) # Note: execution_id is None since this is a system-level message, not per-execution if message.message_id: with self.repository.transaction(self.queue) as txn: txn.mark_message_processed( message_id=message.message_id, handler_type="StartWaitingWorkflows", execution_id=None, )