"""
TaskExecution model.
A task is the smallest unit of work within a stage. Each stage contains
one or more tasks that execute sequentially.
"""
from __future__ import annotations
import weakref
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from stabilize.models.status import WorkflowStatus
if TYPE_CHECKING:
from stabilize.models.snapshot import TaskStateSnapshot
from stabilize.models.stage import StageExecution
def _generate_task_id() -> str:
"""Generate a unique task ID using ULID."""
from ulid import ULID
return str(ULID())
[docs]
@dataclass
class TaskExecution:
"""
Represents a single task within a stage execution.
Tasks are the atomic units of work in a pipeline. They execute sequentially
within a stage, and each task produces a result that can include:
- Status updates
- Context modifications (stage-scoped)
- Output values (pipeline-scoped)
Attributes:
id: Unique identifier for this task execution
name: Human-readable name for this task
implementing_class: Fully qualified class name of the task implementation
status: Current execution status
start_time: Epoch milliseconds when task started
end_time: Epoch milliseconds when task completed
stage_start: True if this is the first task in the stage
stage_end: True if this is the last task in the stage
loop_start: True if this task starts a loop
loop_end: True if this task ends a loop
task_exception_details: Exception information if task failed
"""
id: str = field(default_factory=_generate_task_id)
name: str = ""
implementing_class: str = ""
status: WorkflowStatus = WorkflowStatus.NOT_STARTED
start_time: int | None = None
end_time: int | None = None
stage_start: bool = False
stage_end: bool = False
loop_start: bool = False
loop_end: bool = False
task_exception_details: dict[str, Any] = field(default_factory=dict)
version: int = 0
# Back-reference to parent stage (set after construction)
# Can be weakref (default) or strong ref (for standalone tasks)
_stage: weakref.ReferenceType[StageExecution] | StageExecution | None = field(default=None, repr=False)
@property
def stage(self) -> StageExecution | None:
"""Get the parent stage for this task."""
if self._stage is None:
return None
if isinstance(self._stage, weakref.ReferenceType):
return self._stage()
return self._stage
@stage.setter
def stage(self, value: StageExecution) -> None:
"""Set the parent stage for this task (weakref by default)."""
self._stage = weakref.ref(value)
[docs]
def set_stage_strong(self, value: StageExecution) -> None:
"""Set the parent stage for this task as a strong reference."""
self._stage = value
[docs]
def has_stage(self) -> bool:
"""Check if this task is attached to a stage.
Returns True if the task has a valid stage reference that hasn't
been garbage collected. This is useful for safely checking before
accessing stage data when using weak references.
Returns:
True if stage reference is valid, False otherwise.
"""
if self._stage is None:
return False
if isinstance(self._stage, weakref.ReferenceType):
return self._stage() is not None
return True
@property
def is_stage_start(self) -> bool:
"""Check if this task starts the stage."""
return self.stage_start
@property
def is_stage_end(self) -> bool:
"""Check if this task ends the stage."""
return self.stage_end
@property
def is_loop_start(self) -> bool:
"""Check if this task starts a loop."""
return self.loop_start
@property
def is_loop_end(self) -> bool:
"""Check if this task ends a loop."""
return self.loop_end
[docs]
def cleanup(self) -> None:
"""Explicitly break circular references."""
self._stage = None
[docs]
def set_exception_details(self, exception: dict[str, Any]) -> None:
"""Store exception details for this task."""
self.task_exception_details["exception"] = exception
# ========== Phase-Version State Tracking ==========
@property
def phase_version(self) -> tuple[str, int]:
"""Return (status_name, version) for optimistic locking with phase check.
This combines the current status and version number into a single
tuple for phase-aware optimistic locking.
Returns:
Tuple of (status_name, version) for comparison.
"""
return (self.status.name, self.version)
[docs]
def state_snapshot(self) -> TaskStateSnapshot:
"""Return frozen copy of current state for read-only use.
Creates an immutable snapshot of the current task state that
can be safely passed to external systems, cached, or logged
without risk of mutation.
Returns:
Frozen TaskStateSnapshot with current state.
"""
from stabilize.models.snapshot import TaskStateSnapshot, _freeze_dict
return TaskStateSnapshot(
id=self.id,
name=self.name,
status=self.status,
version=self.version,
implementing_class=self.implementing_class,
start_time=self.start_time,
end_time=self.end_time,
exception_details=_freeze_dict(self.task_exception_details),
)
[docs]
@classmethod
def create(
cls,
name: str,
implementing_class: str,
stage_start: bool = False,
stage_end: bool = False,
) -> TaskExecution:
"""
Factory method to create a new task execution.
Args:
name: Human-readable task name
implementing_class: Class name or callable reference for task
stage_start: Whether this is the first task
stage_end: Whether this is the last task
Returns:
A new TaskExecution instance
"""
return cls(
name=name,
implementing_class=implementing_class,
stage_start=stage_start,
stage_end=stage_end,
)