Source code for stabilize.events.projections.timeline

"""
Workflow timeline projection.

Builds a human-readable timeline of workflow execution events,
showing stages, tasks, and their durations.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

from stabilize.events.base import EntityType, Event, EventType
from stabilize.events.projections.base import Projection


[docs] @dataclass class TimelineEntry: """A single entry in the workflow timeline.""" timestamp: datetime event_type: str entity_type: str entity_id: str entity_name: str | None = None status: str | None = None duration_ms: int | None = None details: dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary for serialization.""" return { "timestamp": self.timestamp.isoformat(), "event_type": self.event_type, "entity_type": self.entity_type, "entity_id": self.entity_id, "entity_name": self.entity_name, "status": self.status, "duration_ms": self.duration_ms, "details": self.details, }
[docs] @dataclass class WorkflowTimeline: """Complete timeline for a workflow execution.""" workflow_id: str entries: list[TimelineEntry] = field(default_factory=list) total_duration_ms: int | None = None status: str | None = None start_time: datetime | None = None end_time: datetime | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary for serialization.""" return { "workflow_id": self.workflow_id, "entries": [e.to_dict() for e in self.entries], "total_duration_ms": self.total_duration_ms, "status": self.status, "start_time": self.start_time.isoformat() if self.start_time else None, "end_time": self.end_time.isoformat() if self.end_time else None, }
[docs] class WorkflowTimelineProjection(Projection): """ Builds a human-readable timeline from workflow events. The timeline shows: - Workflow start/end with status - Stage start/end with duration - Task start/end with duration - Any failures or skips """ def __init__(self, workflow_id: str) -> None: """ Initialize the timeline projection. Args: workflow_id: The workflow ID to build timeline for. """ self._workflow_id = workflow_id self._timeline = WorkflowTimeline(workflow_id=workflow_id) self._start_times: dict[str, datetime] = {} self._processed_sequences: set[int] = set() @property def name(self) -> str: return f"workflow-timeline-{self._workflow_id}"
[docs] def apply(self, event: Event) -> None: """Apply an event to the timeline.""" if event.workflow_id != self._workflow_id: return if event.sequence > 0 and event.sequence in self._processed_sequences: return if event.sequence > 0: self._processed_sequences.add(event.sequence) entry = self._create_entry(event) if entry: self._timeline.entries.append(entry) # Update workflow-level state self._update_workflow_state(event)
def _create_entry(self, event: Event) -> TimelineEntry | None: """Create a timeline entry from an event.""" # Extract common fields entity_name = event.data.get("name") or event.data.get("ref_id") status = event.data.get("status") duration_ms = event.data.get("duration_ms") # Track start times for duration calculation if event.event_type in { EventType.WORKFLOW_STARTED, EventType.STAGE_STARTED, EventType.TASK_STARTED, }: self._start_times[event.entity_id] = event.timestamp # Calculate duration for completion events if event.event_type in { EventType.WORKFLOW_COMPLETED, EventType.WORKFLOW_FAILED, EventType.STAGE_COMPLETED, EventType.STAGE_FAILED, EventType.TASK_COMPLETED, EventType.TASK_FAILED, }: if duration_ms is None and event.entity_id in self._start_times: start = self._start_times[event.entity_id] delta = event.timestamp - start duration_ms = int(delta.total_seconds() * 1000) # Build details dict details = {} if event.event_type == EventType.WORKFLOW_CREATED: details = { "application": event.data.get("application"), "type": event.data.get("type"), "stage_count": event.data.get("stage_count"), } elif event.event_type == EventType.STAGE_STARTED: details = { "type": event.data.get("type"), "is_synthetic": event.data.get("is_synthetic"), "task_count": event.data.get("task_count"), } elif event.event_type in {EventType.STAGE_FAILED, EventType.TASK_FAILED}: details = {"error": event.data.get("error")} elif event.event_type == EventType.STAGE_SKIPPED: details = {"reason": event.data.get("reason")} # Skip internal events that don't need timeline entries if event.event_type in { EventType.STATUS_CHANGED, EventType.CONTEXT_UPDATED, EventType.OUTPUTS_UPDATED, }: return None return TimelineEntry( timestamp=event.timestamp, event_type=event.event_type.value, entity_type=event.entity_type.value, entity_id=event.entity_id, entity_name=entity_name, status=status, duration_ms=duration_ms, details=details, ) def _update_workflow_state(self, event: Event) -> None: """Update workflow-level state from event.""" if event.entity_type != EntityType.WORKFLOW: return if event.event_type == EventType.WORKFLOW_STARTED: self._timeline.start_time = event.timestamp elif event.event_type in { EventType.WORKFLOW_COMPLETED, EventType.WORKFLOW_FAILED, EventType.WORKFLOW_CANCELED, }: self._timeline.end_time = event.timestamp self._timeline.status = event.data.get("status") # Calculate total duration if self._timeline.start_time and self._timeline.end_time: delta = self._timeline.end_time - self._timeline.start_time self._timeline.total_duration_ms = int(delta.total_seconds() * 1000)
[docs] def get_state(self) -> WorkflowTimeline: """Get the current timeline.""" return self._timeline
[docs] def reset(self) -> None: """Reset the projection.""" self._timeline = WorkflowTimeline(workflow_id=self._workflow_id) self._start_times.clear() self._processed_sequences.clear()
[docs] def get_stages(self) -> list[TimelineEntry]: """Get only stage-related entries.""" return [e for e in self._timeline.entries if e.entity_type == "stage"]
[docs] def get_tasks(self) -> list[TimelineEntry]: """Get only task-related entries.""" return [e for e in self._timeline.entries if e.entity_type == "task"]
[docs] def get_failures(self) -> list[TimelineEntry]: """Get only failure events.""" return [ e for e in self._timeline.entries if e.event_type in {"stage.failed", "task.failed", "workflow.failed", "workflow.canceled"} ]