Tasks
Tasks are the building blocks of Stabilize workflows.
The Task Interface
All tasks must implement the Task interface:
from stabilize import Task, TaskResult, StageExecution
class MyTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
# Access inputs
data = stage.context.get("data")
# Do work...
result = process(data)
# Return success
return TaskResult.success(outputs={"result": result})
Built-in Tasks
Stabilize comes with robust built-in tasks.
ShellTask
Executes shell commands. Safely handles stdout/stderr capturing and timeouts.
TaskExecution.create("shell", "shell")
# Context: {"command": "ls -la", "cwd": "/tmp"}
PythonTask
Executes Python code. Can run inline scripts or module functions.
TaskExecution.create("python", "python")
# Context: {"script": "RESULT = {'value': 42}"}
HTTPTask
Makes HTTP requests. Supports all methods, JSON, auth, and retries.
TaskExecution.create("http", "http")
# Context: {"url": "https://api.com", "method": "POST", "json": {...}}
DockerTask
Runs Docker containers. Supports volumes, env vars, and resource limits.
TaskExecution.create("docker", "docker")
# Context: {"action": "run", "image": "alpine", "command": "echo hi"}
Advanced Task Features
Skippable Tasks
Implement SkippableTask to conditionally skip execution logic.
class MySkippable(SkippableTask):
def is_enabled(self, stage: StageExecution) -> bool:
return stage.context.get("feature_flag") == True
Retryable Tasks
Implement RetryableTask for polling or unreliable operations. The engine handles backoff and timeouts.
Task Cleanup
Implement on_cleanup to release resources when a stage enters a terminal state:
class MyTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
# Acquire resources
temp_file = create_temp_file()
stage.context["temp_file"] = temp_file
# Do work...
return TaskResult.success()
def on_cleanup(self, stage: StageExecution) -> None:
"""Called automatically on terminal state (success, failure, or cancel)."""
temp_file = stage.context.get("temp_file")
if temp_file:
cleanup_temp_file(temp_file)
The on_cleanup method is:
Called automatically when stage enters terminal state
Called even after process crash (on recovery)
Timeout-protected (30 seconds default)
For more complex cleanup, use the finalizer registry:
from stabilize.finalizers import get_finalizer_registry
class MyTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
# Register cleanup callback
registry = get_finalizer_registry()
registry.register(
stage.id,
"cleanup_external_resource",
lambda: cleanup_external_api(stage.context["resource_id"])
)
# Do work...
return TaskResult.success()