Source code for stabilize.tasks.result

"""
TaskResult - result of task execution.

This module defines the TaskResult class that encapsulates the result
of executing a task, including status, context updates, and outputs.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

from stabilize.models.status import WorkflowStatus


[docs] @dataclass class TaskResult: """ Result of a task execution. Tasks return a TaskResult to indicate their status and provide data to downstream stages. Attributes: status: The execution status after the task runs context: Data scoped to the current stage (merged into stage.context) outputs: Data available to downstream stages (merged into stage.outputs) target_stage_ref_id: For jump_to results, the ref_id of the stage to jump to """ status: WorkflowStatus context: dict[str, Any] = field(default_factory=dict) outputs: dict[str, Any] = field(default_factory=dict) target_stage_ref_id: str | None = field(default=None) # ========== Factory Methods ==========
[docs] @classmethod def success( cls, outputs: dict[str, Any] | None = None, context: dict[str, Any] | None = None, ) -> TaskResult: """ Create a successful result. Args: outputs: Values available to downstream stages context: Values scoped to current stage Returns: A TaskResult with SUCCEEDED status """ return cls( status=WorkflowStatus.SUCCEEDED, context=context or {}, outputs=outputs or {}, )
[docs] @classmethod def running( cls, context: dict[str, Any] | None = None, ) -> TaskResult: """ Create a running result (task will be re-executed). Use this when a task needs to poll/wait for something. The task will be re-queued and executed again after a backoff period. Args: context: Updated context values Returns: A TaskResult with RUNNING status """ return cls( status=WorkflowStatus.RUNNING, context=context or {}, )
[docs] @classmethod def terminal( cls, error: str, context: dict[str, Any] | None = None, ) -> TaskResult: """ Create a terminal failure result. The stage and pipeline will fail. Args: error: Error message context: Additional context Returns: A TaskResult with TERMINAL status """ ctx = context or {} ctx["error"] = error return cls( status=WorkflowStatus.TERMINAL, context=ctx, )
[docs] @classmethod def failed_continue( cls, error: str, outputs: dict[str, Any] | None = None, context: dict[str, Any] | None = None, ) -> TaskResult: """ Create a failed result that allows pipeline to continue. The stage will be marked as failed but downstream stages will run. Args: error: Error message outputs: Values available to downstream stages context: Additional context Returns: A TaskResult with FAILED_CONTINUE status """ ctx = context or {} ctx["error"] = error return cls( status=WorkflowStatus.FAILED_CONTINUE, context=ctx, outputs=outputs or {}, )
[docs] @classmethod def skipped(cls) -> TaskResult: """ Create a skipped result. Returns: A TaskResult with SKIPPED status """ return cls(status=WorkflowStatus.SKIPPED)
[docs] @classmethod def canceled( cls, outputs: dict[str, Any] | None = None, ) -> TaskResult: """ Create a canceled result. Args: outputs: Final outputs to preserve Returns: A TaskResult with CANCELED status """ return cls( status=WorkflowStatus.CANCELED, outputs=outputs or {}, )
[docs] @classmethod def stopped( cls, outputs: dict[str, Any] | None = None, ) -> TaskResult: """ Create a stopped result. Args: outputs: Final outputs to preserve Returns: A TaskResult with STOPPED status """ return cls( status=WorkflowStatus.STOPPED, outputs=outputs or {}, )
[docs] @classmethod def suspend( cls, context: dict[str, Any] | None = None, ) -> TaskResult: """ Create a suspended result (waiting for external signal). The stage will be set to SUSPENDED status and will wait for a SignalStage message to resume (WCP-23/24). Args: context: Context to preserve while suspended Returns: A TaskResult with SUSPENDED status """ return cls( status=WorkflowStatus.SUSPENDED, context=context or {}, )
[docs] @classmethod def redirect( cls, context: dict[str, Any] | None = None, ) -> TaskResult: """ Create a redirect result. Indicates a decision branch should be followed. Args: context: Context for the redirect Returns: A TaskResult with REDIRECT status """ return cls( status=WorkflowStatus.REDIRECT, context=context or {}, )
[docs] @classmethod def jump_to( cls, target_stage_ref_id: str, context: dict[str, Any] | None = None, outputs: dict[str, Any] | None = None, ) -> TaskResult: """ Create a jump result to redirect flow to a different stage. The target stage will be reset to NOT_STARTED and re-executed with the provided context merged into its existing context. This enables dynamic routing patterns like retry loops, conditional branching, and error recovery flows. Args: target_stage_ref_id: The ref_id of the stage to jump to context: Context to merge into target stage outputs: Outputs to preserve (available to target stage) Returns: A TaskResult with REDIRECT status and target_stage_ref_id set Example: class RouterTask(Task): def execute(self, stage: StageExecution) -> TaskResult: if stage.context.get("tests_passed"): return TaskResult.success() else: return TaskResult.jump_to( "implement_stage", context={"retry_reason": "tests failed"} ) """ return cls( status=WorkflowStatus.REDIRECT, context=context or {}, outputs=outputs or {}, target_stage_ref_id=target_stage_ref_id, )
# ========== Builder Pattern ==========
[docs] @classmethod def builder(cls, status: WorkflowStatus) -> TaskResultBuilder: """ Create a builder for more complex results. Args: status: The execution status Returns: A TaskResultBuilder """ return TaskResultBuilder(status)
# ========== Utility Methods ==========
[docs] def merge_outputs(self, other: TaskResult | None) -> TaskResult: """ Merge outputs from another result. Args: other: Result to merge outputs from Returns: A new TaskResult with merged outputs """ if other is None: return self merged_context = dict(self.context) merged_context.update(other.context) merged_outputs = dict(self.outputs) merged_outputs.update(other.outputs) return TaskResult( status=self.status, context=merged_context, outputs=merged_outputs, )
[docs] class TaskResultBuilder: """ Builder for TaskResult objects. Provides a fluent API for constructing complex task results. """ def __init__(self, status: WorkflowStatus) -> None: self._status = status self._context: dict[str, Any] = {} self._outputs: dict[str, Any] = {}
[docs] def context(self, context: dict[str, Any]) -> TaskResultBuilder: """Set the context.""" self._context = context return self
[docs] def outputs(self, outputs: dict[str, Any]) -> TaskResultBuilder: """Set the outputs.""" self._outputs = outputs return self
[docs] def add_context(self, key: str, value: Any) -> TaskResultBuilder: """Add a context value.""" self._context[key] = value return self
[docs] def add_output(self, key: str, value: Any) -> TaskResultBuilder: """Add an output value.""" self._outputs[key] = value return self
[docs] def build(self) -> TaskResult: """Build the TaskResult.""" return TaskResult( status=self._status, context=self._context, outputs=self._outputs, )