Tasks

Built-in Tasks

Task interface definitions.

This module defines the Task interface and its variants (RetryableTask, SkippableTask) that all task implementations must follow.

class stabilize.tasks.interface.Task[source]

Bases: ABC

Base interface for all tasks.

Tasks are the atomic units of work in a pipeline. Each task: - Receives the current stage context - Performs some work - Returns a TaskResult indicating status and any outputs

Example

class DeployTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:

# Get inputs from context cluster = stage.context.get(“cluster”) image = stage.context.get(“image”)

# Do the work deployment_id = deploy(cluster, image)

# Return result with outputs return TaskResult.success(

outputs={“deploymentId”: deployment_id}

)

abstractmethod execute(stage)[source]

Execute the task.

Parameters:

stage (StageExecution) – The stage execution context

Returns:

TaskResult indicating status and any outputs

Raises:

Exception – Any exception will be caught and handled by the runner

Return type:

TaskResult

on_timeout(stage)[source]

Called when the task times out.

Override to provide custom timeout handling. If None is returned, the default timeout behavior applies.

Parameters:

stage (StageExecution) – The stage execution context

Returns:

Optional TaskResult to use instead of default timeout

Return type:

TaskResult | None

on_cancel(stage)[source]

Called when the execution is canceled.

Override to provide cleanup logic when execution is canceled.

Parameters:

stage (StageExecution) – The stage execution context

Returns:

Optional TaskResult with cleanup results

Return type:

TaskResult | None

on_cleanup(stage)[source]

Called when stage enters terminal state.

Override to clean up resources when the stage completes, fails, or is canceled. This is called after the stage status is set but before the CompleteStage message is pushed.

Unlike on_cancel (which is called only on cancellation), on_cleanup is called for all terminal states: TERMINAL, CANCELED, STOPPED, SUCCEEDED, FAILED_CONTINUE, and SKIPPED.

Use this for: - Releasing external resources (containers, connections) - Cleaning up temporary files - Sending notifications - Recording metrics

Parameters:

stage (StageExecution) – The stage execution context

Return type:

None

Example

def on_cleanup(self, stage: StageExecution) -> None:

container_id = stage.context.get(“container_id”) if container_id:

docker_client.remove(container_id, force=True)

property aliases: list[str]

Alternative names for this task type.

Used for backward compatibility when task types are renamed.

Returns:

List of alternative type names

class stabilize.tasks.interface.RetryableTask[source]

Bases: Task

A task that can be retried with timeout and backoff.

Retryable tasks return RUNNING status while waiting for some condition. They are re-executed after a backoff period until they succeed, fail, or timeout.

Example

class WaitForDeployTask(RetryableTask):
def get_timeout(self) -> timedelta:

return timedelta(minutes=30)

def get_backoff_period(self, stage, duration) -> timedelta:

return timedelta(seconds=10)

def execute(self, stage: StageExecution) -> TaskResult:

deployment_id = stage.context.get(“deploymentId”) status = check_deployment_status(deployment_id)

if status == “complete”:

return TaskResult.success()

elif status == “failed”:

return TaskResult.terminal(“Deployment failed”)

else:

return TaskResult.running()

abstractmethod get_timeout()[source]

Get the maximum time this task can run before timing out.

Returns:

Maximum execution time

Return type:

timedelta

get_backoff_period(stage, duration)[source]

Get the backoff period before retrying.

Override to implement dynamic backoff based on how long the task has been running.

Parameters:
  • stage (StageExecution) – The stage execution context

  • duration (timedelta) – How long the task has been running

Returns:

Time to wait before retrying

Return type:

timedelta

get_dynamic_timeout(stage)[source]

Get dynamic timeout based on stage context.

Override to implement context-based timeouts.

Parameters:

stage (StageExecution) – The stage execution context

Returns:

Timeout duration

Return type:

timedelta

get_dynamic_backoff_period(stage, duration)[source]

Get dynamic backoff based on stage context.

Parameters:
  • stage (StageExecution) – The stage execution context

  • duration (timedelta) – How long the task has been running

Returns:

Time to wait before retrying

Return type:

timedelta

class stabilize.tasks.interface.OverridableTimeoutRetryableTask[source]

Bases: RetryableTask

A retryable task whose timeout can be overridden by the stage.

The stage can set a ‘stageTimeoutMs’ context value to override the default timeout.

get_dynamic_timeout(stage)[source]

Get timeout, potentially overridden by stage context.

Parameters:

stage (StageExecution)

Return type:

timedelta

class stabilize.tasks.interface.SkippableTask[source]

Bases: Task

A task that can be conditionally skipped.

Override is_enabled() to control when the task should be skipped.

is_enabled(stage)[source]

Check if this task is enabled.

Override to implement skip logic.

Parameters:

stage (StageExecution) – The stage execution context

Returns:

True if task should execute, False to skip

Return type:

bool

execute(stage)[source]

Execute the task if enabled.

Parameters:

stage (StageExecution)

Return type:

TaskResult

abstractmethod do_execute(stage)[source]

Perform the actual task execution.

Parameters:

stage (StageExecution) – The stage execution context

Returns:

TaskResult indicating status

Return type:

TaskResult

class stabilize.tasks.interface.CallableTask(func, name=None)[source]

Bases: Task

A task that wraps a callable function.

Allows using simple functions as tasks without creating a class.

Example

def my_task(stage: StageExecution) -> TaskResult:

return TaskResult.success(outputs={“result”: “done”})

task = CallableTask(my_task)

Parameters:
execute(stage)[source]

Execute the wrapped function.

Parameters:

stage (StageExecution)

Return type:

TaskResult

property name: str

Get the task name.

class stabilize.tasks.interface.NoOpTask[source]

Bases: Task

A task that does nothing.

Useful for testing or placeholder stages.

execute(stage)[source]

Return success immediately.

Parameters:

stage (StageExecution)

Return type:

TaskResult

class stabilize.tasks.interface.WaitTask[source]

Bases: RetryableTask

A task that waits for a specified duration.

Reads ‘waitTime’ from stage context (in seconds).

get_timeout()[source]

Wait tasks have a long timeout.

Return type:

timedelta

get_backoff_period(stage, duration)[source]

Check every second.

Parameters:
Return type:

timedelta

execute(stage)[source]

Wait for the specified time.

Parameters:

stage (StageExecution)

Return type:

TaskResult

TaskResult - result of task execution.

This module defines the TaskResult class that encapsulates the result of executing a task, including status, context updates, and outputs.

class stabilize.tasks.result.TaskResult(status, context=<factory>, outputs=<factory>, target_stage_ref_id=None)[source]

Bases: object

Result of a task execution.

Tasks return a TaskResult to indicate their status and provide data to downstream stages.

Parameters:
  • status (WorkflowStatus)

  • context (dict[str, Any])

  • outputs (dict[str, Any])

  • target_stage_ref_id (str | None)

status

The execution status after the task runs

Type:

stabilize.models.status.WorkflowStatus

context

Data scoped to the current stage (merged into stage.context)

Type:

dict[str, Any]

outputs

Data available to downstream stages (merged into stage.outputs)

Type:

dict[str, Any]

target_stage_ref_id

For jump_to results, the ref_id of the stage to jump to

Type:

str | None

status: WorkflowStatus
context: dict[str, Any]
outputs: dict[str, Any]
target_stage_ref_id: str | None = None
classmethod success(outputs=None, context=None)[source]

Create a successful result.

Parameters:
  • outputs (dict[str, Any] | None) – Values available to downstream stages

  • context (dict[str, Any] | None) – Values scoped to current stage

Returns:

A TaskResult with SUCCEEDED status

Return type:

TaskResult

classmethod running(context=None)[source]

Create a running result (task will be re-executed).

Use this when a task needs to poll/wait for something. The task will be re-queued and executed again after a backoff period.

Parameters:

context (dict[str, Any] | None) – Updated context values

Returns:

A TaskResult with RUNNING status

Return type:

TaskResult

classmethod terminal(error, context=None)[source]

Create a terminal failure result.

The stage and pipeline will fail.

Parameters:
  • error (str) – Error message

  • context (dict[str, Any] | None) – Additional context

Returns:

A TaskResult with TERMINAL status

Return type:

TaskResult

classmethod failed_continue(error, outputs=None, context=None)[source]

Create a failed result that allows pipeline to continue.

The stage will be marked as failed but downstream stages will run.

Parameters:
  • error (str) – Error message

  • outputs (dict[str, Any] | None) – Values available to downstream stages

  • context (dict[str, Any] | None) – Additional context

Returns:

A TaskResult with FAILED_CONTINUE status

Return type:

TaskResult

classmethod skipped()[source]

Create a skipped result.

Returns:

A TaskResult with SKIPPED status

Return type:

TaskResult

classmethod canceled(outputs=None)[source]

Create a canceled result.

Parameters:

outputs (dict[str, Any] | None) – Final outputs to preserve

Returns:

A TaskResult with CANCELED status

Return type:

TaskResult

classmethod stopped(outputs=None)[source]

Create a stopped result.

Parameters:

outputs (dict[str, Any] | None) – Final outputs to preserve

Returns:

A TaskResult with STOPPED status

Return type:

TaskResult

classmethod suspend(context=None)[source]

Create a suspended result (waiting for external signal).

The stage will be set to SUSPENDED status and will wait for a SignalStage message to resume (WCP-23/24).

Parameters:

context (dict[str, Any] | None) – Context to preserve while suspended

Returns:

A TaskResult with SUSPENDED status

Return type:

TaskResult

classmethod redirect(context=None)[source]

Create a redirect result.

Indicates a decision branch should be followed.

Parameters:

context (dict[str, Any] | None) – Context for the redirect

Returns:

A TaskResult with REDIRECT status

Return type:

TaskResult

classmethod jump_to(target_stage_ref_id, context=None, outputs=None)[source]

Create a jump result to redirect flow to a different stage.

The target stage will be reset to NOT_STARTED and re-executed with the provided context merged into its existing context.

This enables dynamic routing patterns like retry loops, conditional branching, and error recovery flows.

Parameters:
  • target_stage_ref_id (str) – The ref_id of the stage to jump to

  • context (dict[str, Any] | None) – Context to merge into target stage

  • outputs (dict[str, Any] | None) – Outputs to preserve (available to target stage)

Returns:

A TaskResult with REDIRECT status and target_stage_ref_id set

Return type:

TaskResult

Example

class RouterTask(Task):
def execute(self, stage: StageExecution) -> TaskResult:
if stage.context.get(“tests_passed”):

return TaskResult.success()

else:
return TaskResult.jump_to(

“implement_stage”, context={“retry_reason”: “tests failed”}

)

classmethod builder(status)[source]

Create a builder for more complex results.

Parameters:

status (WorkflowStatus) – The execution status

Returns:

A TaskResultBuilder

Return type:

TaskResultBuilder

merge_outputs(other)[source]

Merge outputs from another result.

Parameters:

other (TaskResult | None) – Result to merge outputs from

Returns:

A new TaskResult with merged outputs

Return type:

TaskResult

class stabilize.tasks.result.TaskResultBuilder(status)[source]

Bases: object

Builder for TaskResult objects.

Provides a fluent API for constructing complex task results.

Parameters:

status (WorkflowStatus)

context(context)[source]

Set the context.

Parameters:

context (dict[str, Any])

Return type:

TaskResultBuilder

outputs(outputs)[source]

Set the outputs.

Parameters:

outputs (dict[str, Any])

Return type:

TaskResultBuilder

add_context(key, value)[source]

Add a context value.

Parameters:
  • key (str)

  • value (Any)

Return type:

TaskResultBuilder

add_output(key, value)[source]

Add an output value.

Parameters:
  • key (str)

  • value (Any)

Return type:

TaskResultBuilder

build()[source]

Build the TaskResult.

Return type:

TaskResult

Enterprise-ready ShellTask for executing shell commands.

This module provides a production-ready ShellTask with: - Working directory support - Environment variable injection - Shell selection (bash, sh, custom) - Stdin input support - Output size limits (prevent OOM) - Expected exit codes (for tools that return non-zero on success) - Secret masking in logs - Binary output mode - {key} placeholder substitution with upstream outputs - Cross-platform process tree cleanup (uses psutil when available) - Linux-specific PR_SET_PDEATHSIG for auto-cleanup when parent dies

class stabilize.tasks.shell.ShellTask[source]

Bases: Task

Enterprise-ready shell command execution.

Executes shell commands with full control over execution environment, input/output handling, and error management.

Context Parameters:

command (str): The shell command to execute (required) timeout (int): Command timeout in seconds (default: 60) cwd (str): Working directory for command execution env (dict): Additional environment variables to set shell (bool|str): True for default shell, or path to shell executable stdin (str): Input to send to command’s stdin max_output_size (int): Max bytes for stdout/stderr (default: 10MB) expected_codes (list[int]): Exit codes to treat as success (default: [0]) secrets (list[str]): Context keys whose values should be masked in logs binary (bool): If True, capture output as bytes (default: False) continue_on_failure (bool): If True, return failed_continue instead of terminal restart_on_failure (bool): If True, raise TransientError on failure to trigger

automatic retry with backoff (for long-running services)

Outputs:

stdout (str|bytes): Command standard output (stripped if text mode) stderr (str|bytes): Command standard error (stripped if text mode) returncode (int): Command exit code truncated (bool): True if output was truncated due to size limit stdout_b64 (str): Base64-encoded stdout (only if binary=True)

Placeholder Substitution:

Any {key} in the command is replaced with stage.context[key]. This includes outputs from upstream stages.

Examples

# Basic command context={“command”: “ls -la”}

# With working directory context={“command”: “npm install”, “cwd”: “/app/frontend”}

# With environment variables context={“command”: “./deploy.sh”, “env”: {“AWS_REGION”: “us-east-1”}}

# With custom shell context={“command”: “source venv/bin/activate && pytest”, “shell”: “/bin/bash”}

# With stdin input context={“command”: “cat”, “stdin”: “Hello World”}

# Allow grep’s exit code 1 (no match) context={“command”: “grep pattern file.txt”, “expected_codes”: [0, 1]}

# Mask secrets in logs context={

“command”: “curl -H ‘Authorization: Bearer {token}’ https://api.example.com”, “token”: “secret123”, “secrets”: [“token”]

}

# Binary output context={“command”: “cat image.png”, “binary”: True}

# Using upstream output context={“command”: “echo ‘Previous output: {stdout}’”}

execute(stage)[source]

Execute the shell command with all configured options.

Parameters:

stage (StageExecution)

Return type:

TaskResult

PythonTask for executing Python code in isolated subprocess.

This module provides a production-ready PythonTask with: - Inline script execution - Script file execution - Module + function execution - INPUT/RESULT variable convention for data passing - Full subprocess isolation with timeout support

class stabilize.tasks.python.PythonTask[source]

Bases: Task

Execute Python code in isolated subprocess.

Supports three execution modes: 1. Inline script: Pass Python code as a string 2. Script file: Load and execute a Python file 3. Module + function: Import a module and call a function

All modes run in a subprocess for isolation and hard timeout enforcement.

Context Parameters:

# Script execution (choose one): script (str): Inline Python code to execute script_file (str): Path to Python script file module (str): Python module path (e.g., “myapp.tasks.validate”) function (str): Function name to call (requires module)

# Inputs: inputs (dict): Input variables, available as INPUT in script (optional) args (list): Command line arguments (optional)

# Execution: python_path (str): Python interpreter path (default: current interpreter) timeout (int): Execution timeout in seconds (default: 60) cwd (str): Working directory (optional) env (dict): Environment variables to add (optional) continue_on_failure (bool): Return failed_continue on error (default: False)

Outputs:

stdout (str): Standard output from script stderr (str): Standard error from script exit_code (int): Process exit code result (any): Value of RESULT variable if set in script

Notes:
  • Scripts access input data via the INPUT dict

  • Scripts set return value via the RESULT variable

  • RESULT must be JSON-serializable

  • Module mode: imports module.function and calls with INPUT as argument

  • Upstream stage outputs are automatically available in INPUT

Examples:

# Inline script context = {

“script”: ‘’’

result = sum(INPUT[“numbers”]) RESULT = {“sum”: result, “count”: len(INPUT[“numbers”])} ‘’’,

“inputs”: {“numbers”: [1, 2, 3, 4, 5]}

}

# Script file context = {

“script_file”: “/path/to/script.py”, “inputs”: {“config”: {“debug”: True}}

}

# Module + function context = {

“module”: “myapp.validators”, “function”: “validate_input”, “inputs”: {“data”: {“name”: “test”}}

}

SCRIPT_WRAPPER = 'import json\nimport sys\nimport base64\n\nINPUT = json.loads(base64.b64decode(\'{inputs_json_b64}\').decode())\n\n# User script\n{script}\n\n# Output result if RESULT variable was set\nif \'RESULT\' in dir():\n    print("__PYTHONTASK_RESULT_START__")\n    print(json.dumps(RESULT))\n    print("__PYTHONTASK_RESULT_END__")\n'
MODULE_WRAPPER = 'import json\nimport sys\nimport base64\n\nINPUT = json.loads(base64.b64decode(\'{inputs_json_b64}\').decode())\n\nfrom {module} import {function}\nRESULT = {function}(INPUT)\n\n# Output result\nprint("__PYTHONTASK_RESULT_START__")\nprint(json.dumps(RESULT))\nprint("__PYTHONTASK_RESULT_END__")\n'
RESULT_START_MARKER = '__PYTHONTASK_RESULT_START__'
RESULT_END_MARKER = '__PYTHONTASK_RESULT_END__'
execute(stage)[source]

Execute Python code based on context parameters.

Parameters:

stage (StageExecution)

Return type:

TaskResult

HTTP task package for making HTTP requests.

class stabilize.tasks.http.HTTPTask[source]

Bases: Task

Make HTTP requests using Python’s stdlib urllib.

Supports all standard HTTP methods, file upload/download, authentication, retries, and SSL/TLS configuration with zero external dependencies.

Context Parameters:

url (str): Request URL (required) method (str): HTTP method - GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS

(default: GET)

Request Body (mutually exclusive):

body (str|bytes): Raw request body json (dict): JSON body (auto-serialized, sets Content-Type) form (dict): Form-encoded body (application/x-www-form-urlencoded)

Headers & Auth:

headers (dict): Custom request headers auth (tuple|list): Basic auth as [username, password] bearer_token (str): Bearer token for Authorization header

File Upload (multipart/form-data):

upload_file (str): Path to file to upload upload_field (str): Form field name (default: “file”) upload_filename (str): Override filename in upload upload_form (dict): Additional form fields with upload

File Download:

download_to (str): Path to save response body (streams large files)

Timeouts & Retries:

timeout (int|float): Request timeout in seconds (default: 30) retries (int): Number of retries on transient failure (default: 0) retry_delay (float): Delay between retries in seconds (default: 1.0) retry_on_status (list[int]): Status codes to retry (default: [502, 503, 504])

SSL/TLS:

verify_ssl (bool): Verify SSL certificates (default: True) ca_cert (str): Path to CA certificate bundle client_cert (str): Path to client certificate for mTLS client_key (str): Path to client private key for mTLS

Response Handling:

expected_status (int|list[int]): Expected status code(s), fail if mismatch max_response_size (int): Max response body in bytes (default: 10MB) parse_json (bool): Auto-parse JSON response body (default: False)

Error Handling:

continue_on_failure (bool): Return failed_continue instead of terminal secrets (list[str]): Context keys to mask in logs

Outputs:

status_code (int): HTTP response status headers (dict): Response headers body (str): Response body (or file path if download_to) body_json (dict|list|None): Parsed JSON (if parse_json=True and valid) elapsed_ms (int): Request duration in milliseconds url (str): Final URL after redirects content_type (str): Response Content-Type header content_length (int): Response body size in bytes

Examples

# Simple GET context = {“url”: “https://api.example.com/users”}

# POST with JSON context = {

“url”: “https://api.example.com/users”, “method”: “POST”, “json”: {“name”: “John”, “email”: “john@example.com”}, “parse_json”: True,

}

# With authentication context = {

“url”: “https://api.example.com/private”, “bearer_token”: “my-api-token”,

}

# File upload context = {

“url”: “https://api.example.com/upload”, “method”: “POST”, “upload_file”: “/path/to/file.pdf”, “upload_field”: “document”,

}

# File download context = {

“url”: “https://example.com/large-file.zip”, “download_to”: “/tmp/downloaded.zip”,

}

# With retries context = {

“url”: “https://api.example.com/flaky”, “retries”: 3, “retry_delay”: 2.0,

}

execute(stage)[source]

Execute HTTP request.

Parameters:

stage (StageExecution)

Return type:

TaskResult

DockerTask for executing Docker commands.

This module provides a production-ready DockerTask with: - Container lifecycle management (run, exec, stop, rm) - Image management (build, pull, images) - Volume and network support - Environment variable injection - Detached mode support

class stabilize.tasks.docker.DockerTask[source]

Bases: Task

Execute Docker commands.

Supports container lifecycle, image management, and inspection commands.

Context Parameters:
action (str): Action to perform (default: run)
  • run: Run a container

  • exec: Execute command in running container

  • build: Build an image

  • pull: Pull an image

  • ps: List containers

  • images: List images

  • logs: Get container logs

  • stop: Stop a container

  • rm: Remove a container

For ‘run’ action:

image (str): Docker image name (required) command (str|list): Command to run in container (optional) name (str): Container name (optional) volumes (list[str]): Volume mounts as “host:container” (optional) ports (list[str]): Port mappings as “host:container” (optional) environment (dict): Environment variables (optional) workdir (str): Working directory in container (optional) network (str): Docker network to connect (optional) remove (bool): Remove container after run (default: True) detach (bool): Run in detached mode (default: False) entrypoint (str|list): Override container entrypoint (optional) user (str): Run as user, e.g., “1000:1000” or “root” (optional) hostname (str): Container hostname (optional) privileged (bool): Run in privileged mode (default: False) cap_add (list[str]): Add Linux capabilities (optional) cap_drop (list[str]): Drop Linux capabilities (optional) memory (str): Memory limit, e.g., “512m”, “2g” (optional) memory_swap (str): Memory + swap limit (optional) cpus (str): CPU limit, e.g., “0.5”, “2” (optional) gpus (str): GPU access, e.g., “all”, “device=0” (optional) shm_size (str): Shared memory size (optional) tmpfs (list[str]): tmpfs mounts (optional) read_only (bool): Read-only root filesystem (default: False) security_opt (list[str]): Security options (optional) ulimit (dict): Ulimit settings as {name: value} (optional) labels (dict): Container labels (optional) dns (list[str]): Custom DNS servers (optional) extra_hosts (list[str]): Add host mappings as “host:ip” (optional) init (bool): Run init inside container (default: False) platform (str): Target platform, e.g., “linux/amd64” (optional) pull (str): Pull policy: “always”, “never”, “missing” (optional) restart (str): Restart policy: “no”, “always”, “on-failure” (optional) stdin_open (bool): Keep stdin open (default: False) tty (bool): Allocate pseudo-TTY (default: False)

For ‘exec’ action:

name (str): Container name (required) command (str|list): Command to execute (required)

For ‘build’ action:

tag (str): Image tag (optional) dockerfile (str): Dockerfile path (default: Dockerfile) context (str): Build context path (default: .) build_args (dict): Build arguments (optional) no_cache (bool): Disable build cache (default: False)

Common:

timeout (int): Command timeout in seconds (default: 14400 = 4 hours) continue_on_failure (bool): Return failed_continue on error

Outputs:

stdout (str): Command standard output stderr (str): Command standard error exit_code (int): Command exit code container_id (str): Container ID (for run with detach) image_id (str): Image ID (for build)

Examples

# Run container context = {

“action”: “run”, “image”: “alpine:latest”, “command”: “echo Hello”,

}

# Run with environment and volumes context = {

“action”: “run”, “image”: “python:3.11”, “volumes”: [“/app:/app”], “environment”: {“DEBUG”: “true”}, “command”: “python /app/script.py”,

}

# Build image context = {

“action”: “build”, “tag”: “myapp:latest”, “context”: “./docker”,

}

SUPPORTED_ACTIONS = frozenset({'build', 'exec', 'images', 'logs', 'ps', 'pull', 'rm', 'run', 'stop'})
execute(stage)[source]

Execute Docker command.

Parameters:

stage (StageExecution)

Return type:

TaskResult

Control-Flow Pattern Tasks

SubWorkflowTask - executes a child workflow and waits for completion.

Implements WCP-22: Recursion. A task that starts a child workflow and polls until it completes. Supports recursion depth tracking to prevent infinite recursion.

class stabilize.tasks.sub_workflow.SubWorkflowTask[source]

Bases: RetryableTask

Task that starts and monitors a child workflow execution.

The child workflow is started on the first execution. Subsequent executions poll the child’s status until it completes.

Context keys:

_sub_workflow_config: dict with workflow configuration _sub_workflow_id: set after child is started (for polling) _recursion_depth: current recursion depth (incremented per level) _max_recursion_depth: maximum allowed depth (default: 10)

Outputs:

sub_workflow_id: ID of the child workflow sub_workflow_status: Final status of the child workflow sub_workflow_outputs: Merged outputs from the child workflow

property aliases: list[str]

Alternative names for this task type.

Used for backward compatibility when task types are renamed.

Returns:

List of alternative type names

get_timeout()[source]

Default timeout: 30 minutes.

Return type:

timedelta

get_backoff_period(stage=None, duration=None)[source]

Poll every 5 seconds.

Parameters:
  • stage (Any)

  • duration (timedelta | None)

Return type:

timedelta

execute(stage)[source]

Execute or poll the sub-workflow.

Parameters:

stage (Any)

Return type:

TaskResult

Stage Builders

Multi-instance stage builder.

Provides builder methods for creating multi-instance stage patterns: - WCP-12: Multiple Instances without Synchronization - WCP-13: Multiple Instances with a priori Design-Time Knowledge - WCP-14: Multiple Instances with a priori Run-Time Knowledge - WCP-15: Multiple Instances without a priori Run-Time Knowledge

Usage:

from stabilize.stages.multi_instance_builder import MultiInstanceBuilder

# WCP-13: 6 reviewers, wait for all stages = MultiInstanceBuilder.create_fixed(

parent_stage=stage, count=6, instance_type=”review”, instance_name_prefix=”Review”,

)

# WCP-14: Count from context stages = MultiInstanceBuilder.create_from_context(

parent_stage=stage, count_key=”num_reviewers”, instance_type=”review”, instance_name_prefix=”Review”,

)

class stabilize.stages.multi_instance_builder.MultiInstanceBuilder[source]

Bases: object

Builder for multi-instance stage patterns.

static create_fixed(parent_stage, count, instance_type='', instance_name_prefix='Instance', sync_on_complete=True, join_threshold=0, cancel_remaining=False, instance_contexts=None)[source]

Create N fixed instances with optional synchronization (WCP-12/13).

Parameters:
  • parent_stage (StageExecution) – The MI parent stage

  • count (int) – Number of instances to create

  • instance_type (str) – Type for instance stages (defaults to parent type)

  • instance_name_prefix (str) – Prefix for instance stage names

  • sync_on_complete (bool) – Whether to synchronize (WCP-12=False, WCP-13=True)

  • join_threshold (int) – N-of-M threshold (0 = all must complete)

  • cancel_remaining (bool) – Cancel remaining after threshold reached

  • instance_contexts (list[dict[str, Any]] | None) – Per-instance context overrides

Returns:

List of created stages (instances + optional join stage)

Return type:

list[StageExecution]

static create_from_context(parent_stage, count_key, instance_type='', instance_name_prefix='Instance', sync_on_complete=True, join_threshold=0, cancel_remaining=False)[source]

Create instances with count from context (WCP-14).

The count is read from parent_stage.context[count_key] at build time.

Parameters:
  • parent_stage (StageExecution) – The MI parent stage

  • count_key (str) – Context key holding the instance count

  • instance_type (str) – Type for instance stages

  • instance_name_prefix (str) – Prefix for instance stage names

  • sync_on_complete (bool) – Whether to synchronize

  • join_threshold (int) – N-of-M threshold

  • cancel_remaining (bool) – Cancel remaining after threshold

Returns:

List of created stages

Return type:

list[StageExecution]

static create_from_collection(parent_stage, collection_key, instance_type='', instance_name_prefix='Instance', sync_on_complete=True, item_context_key='item')[source]

Create one instance per item in a context collection (WCP-14 variant).

Parameters:
  • parent_stage (StageExecution) – The MI parent stage

  • collection_key (str) – Context key holding a list of items

  • instance_type (str) – Type for instance stages

  • instance_name_prefix (str) – Prefix for instance stage names

  • sync_on_complete (bool) – Whether to synchronize

  • item_context_key (str) – Context key to set the item on each instance

Returns:

List of created stages

Return type:

list[StageExecution]

static create_dynamic(parent_stage, instance_type='', instance_name_prefix='Instance', initial_count=0)[source]

Create a dynamic MI setup (WCP-15).

Instances can be added during execution via AddMultiInstance messages.

Parameters:
  • parent_stage (StageExecution) – The MI parent stage

  • instance_type (str) – Type for instance stages

  • instance_name_prefix (str) – Prefix for instance stage names

  • initial_count (int) – Optional initial instances to create

Returns:

List of initially created stages (more can be added later)

Return type:

list[StageExecution]

Structured loop builder for WCP-21.

Provides builder methods for creating structured loop patterns using the existing jump_to mechanism internally.

Usage:

from stabilize.stages.loop_builder import LoopBuilder

# While loop: check condition, then execute body, repeat stages = LoopBuilder.while_loop(

condition=”iteration_count < max_iterations”, body_stages=[stage_a, stage_b], loop_ref_prefix=”retry_loop”,

)

# Repeat-until loop: execute body, then check condition stages = LoopBuilder.repeat_until(

condition=”tests_passed == True”, body_stages=[stage_a, stage_b], loop_ref_prefix=”test_loop”,

)

class stabilize.stages.loop_builder.LoopBuilder[source]

Bases: object

Builder for structured loop patterns using jump_to internally.

static while_loop(condition, body_stages, loop_ref_prefix='loop', max_iterations=100, context=None)[source]

Create a while-loop pattern: check condition, then execute body.

Creates stages: 1. Condition check stage (evaluates condition, jumps back or continues) 2. Body stages (user-provided) 3. Loop-back stage (jumps back to condition check)

Parameters:
  • condition (str) – Expression to evaluate (loop continues while True)

  • body_stages (list[StageExecution]) – Stages to execute in each iteration

  • loop_ref_prefix (str) – Prefix for generated ref_ids

  • max_iterations (int) – Safety limit on loop iterations

  • context (dict[str, Any] | None) – Initial context for the loop

Returns:

List of stages forming the loop structure

Return type:

list[StageExecution]

static repeat_until(condition, body_stages, loop_ref_prefix='loop', max_iterations=100, context=None)[source]

Create a repeat-until pattern: execute body, then check condition.

Creates stages: 1. Body stages (user-provided) 2. Condition check stage (evaluates condition, loops back or exits)

Parameters:
  • condition (str) – Expression to evaluate (loop exits when True)

  • body_stages (list[StageExecution]) – Stages to execute in each iteration

  • loop_ref_prefix (str) – Prefix for generated ref_ids

  • max_iterations (int) – Safety limit on loop iterations

  • context (dict[str, Any] | None) – Initial context for the loop

Returns:

List of stages forming the loop structure

Return type:

list[StageExecution]