"""
CompleteTaskHandler - handles task completion.
This handler updates task status and triggers either the next task
or stage completion.
"""
from __future__ import annotations
import logging
from datetime import timedelta
from typing import TYPE_CHECKING
from stabilize.handlers.base import StabilizeHandler
from stabilize.models.status import WorkflowStatus
from stabilize.queue.messages import (
CompleteStage,
CompleteTask,
StartTask,
)
from stabilize.resilience.config import HandlerConfig
if TYPE_CHECKING:
from stabilize.events.recorder import EventRecorder
from stabilize.models.stage import StageExecution
from stabilize.models.task import TaskExecution
from stabilize.persistence.store import WorkflowStore
from stabilize.queue import Queue
logger = logging.getLogger(__name__)
[docs]
class CompleteTaskHandler(StabilizeHandler[CompleteTask]):
"""
Handler for CompleteTask messages.
Execution flow:
1. Update task status and end time
2. If there's a next task: Push StartTask
3. Otherwise: Push CompleteStage
"""
def __init__(
self,
queue: Queue,
repository: WorkflowStore,
retry_delay: timedelta | None = None,
handler_config: HandlerConfig | None = None,
event_recorder: EventRecorder | None = None,
) -> None:
super().__init__(queue, repository, retry_delay, handler_config, event_recorder)
@property
def message_type(self) -> type[CompleteTask]:
return CompleteTask
[docs]
def handle(self, message: CompleteTask) -> None:
"""Handle the CompleteTask message.
Uses atomic transactions to ensure stage updates and message pushes
are committed together, preventing orphaned states.
Retries on ConcurrencyError (optimistic lock failure) using
configurable retry settings.
"""
self.retry_on_concurrency_error(
lambda: self._handle_with_retry(message),
f"completing task {message.task_id}",
)
def _handle_with_retry(self, message: CompleteTask) -> None:
"""Inner handle logic to be retried."""
def on_task(stage: StageExecution, task: TaskExecution) -> None:
# Idempotency check - only complete tasks that are RUNNING
if task.status != WorkflowStatus.RUNNING:
logger.debug(
"Ignoring CompleteTask for %s (%s) - already %s",
task.name,
task.id,
task.status,
)
# Mark message as processed to prevent infinite reprocessing
if message.message_id:
with self.repository.transaction(self.queue) as txn:
txn.mark_message_processed(
message_id=message.message_id,
handler_type="CompleteTask",
execution_id=message.execution_id,
)
return
# Update task status
self.set_task_status(task, message.status)
task.end_time = self.current_time_millis()
logger.debug("Task %s completed with status %s", task.name, message.status)
# Record event if event recorder is configured
if self.event_recorder:
workflow_id = stage.execution.id if stage.execution else ""
self.set_event_context(workflow_id)
if message.status.is_failure:
error = "Task failed"
if task.task_exception_details:
error = task.task_exception_details.get("exception", str(task.task_exception_details))
self.event_recorder.record_task_failed(
task,
workflow_id=workflow_id,
error=error,
source_handler="CompleteTaskHandler",
)
elif message.status == WorkflowStatus.SKIPPED:
pass # Skipped tasks don't need completion events
else:
self.event_recorder.record_task_completed(
task,
workflow_id=workflow_id,
outputs=stage.outputs,
source_handler="CompleteTaskHandler",
)
# For REDIRECT status, the task initiated a jump to another stage.
# JumpToStageHandler handles the flow control, so we just complete
# the task and don't start the next task or complete the stage.
if message.status == WorkflowStatus.REDIRECT:
logger.debug(
"Task %s completed with REDIRECT - flow handled by JumpToStageHandler",
task.name,
)
# Atomic: store stage only (no next message)
with self.repository.transaction(self.queue) as txn:
txn.store_stage(stage)
if message.message_id:
txn.mark_message_processed(
message_id=message.message_id,
handler_type="CompleteTask",
execution_id=message.execution_id,
)
return
# Check for next task
next_task = stage.next_task(task)
# Atomic: store stage + push next message together
with self.repository.transaction(self.queue) as txn:
txn.store_stage(stage)
# Atomic deduplication
if message.message_id:
txn.mark_message_processed(
message_id=message.message_id,
handler_type="CompleteTask",
execution_id=message.execution_id,
)
if next_task is not None:
txn.push_message(
StartTask(
execution_type=message.execution_type,
execution_id=message.execution_id,
stage_id=message.stage_id,
task_id=next_task.id,
)
)
else:
# No more tasks - complete stage
txn.push_message(
CompleteStage(
execution_type=message.execution_type,
execution_id=message.execution_id,
stage_id=message.stage_id,
)
)
self.with_task(message, on_task)