Tasks
Built-in Tasks
Task interface definitions.
This module defines the Task interface and its variants (RetryableTask, SkippableTask) that all task implementations must follow.
- class stabilize.tasks.interface.Task[source]
Bases:
ABCBase interface for all tasks.
Tasks are the atomic units of work in a pipeline. Each task: - Receives the current stage context - Performs some work - Returns a TaskResult indicating status and any outputs
Example
- class DeployTask(Task):
- def execute(self, stage: StageExecution) -> TaskResult:
# Get inputs from context cluster = stage.context.get(“cluster”) image = stage.context.get(“image”)
# Do the work deployment_id = deploy(cluster, image)
# Return result with outputs return TaskResult.success(
outputs={“deploymentId”: deployment_id}
)
- abstractmethod execute(stage)[source]
Execute the task.
- Parameters:
stage (StageExecution) – The stage execution context
- Returns:
TaskResult indicating status and any outputs
- Raises:
Exception – Any exception will be caught and handled by the runner
- Return type:
- on_timeout(stage)[source]
Called when the task times out.
Override to provide custom timeout handling. If None is returned, the default timeout behavior applies.
- Parameters:
stage (StageExecution) – The stage execution context
- Returns:
Optional TaskResult to use instead of default timeout
- Return type:
TaskResult | None
- on_cancel(stage)[source]
Called when the execution is canceled.
Override to provide cleanup logic when execution is canceled.
- Parameters:
stage (StageExecution) – The stage execution context
- Returns:
Optional TaskResult with cleanup results
- Return type:
TaskResult | None
- on_cleanup(stage)[source]
Called when stage enters terminal state.
Override to clean up resources when the stage completes, fails, or is canceled. This is called after the stage status is set but before the CompleteStage message is pushed.
Unlike on_cancel (which is called only on cancellation), on_cleanup is called for all terminal states: TERMINAL, CANCELED, STOPPED, SUCCEEDED, FAILED_CONTINUE, and SKIPPED.
Use this for: - Releasing external resources (containers, connections) - Cleaning up temporary files - Sending notifications - Recording metrics
- Parameters:
stage (StageExecution) – The stage execution context
- Return type:
None
Example
- def on_cleanup(self, stage: StageExecution) -> None:
container_id = stage.context.get(“container_id”) if container_id:
docker_client.remove(container_id, force=True)
- property aliases: list[str]
Alternative names for this task type.
Used for backward compatibility when task types are renamed.
- Returns:
List of alternative type names
- class stabilize.tasks.interface.RetryableTask[source]
Bases:
TaskA task that can be retried with timeout and backoff.
Retryable tasks return RUNNING status while waiting for some condition. They are re-executed after a backoff period until they succeed, fail, or timeout.
Example
- class WaitForDeployTask(RetryableTask):
- def get_timeout(self) -> timedelta:
return timedelta(minutes=30)
- def get_backoff_period(self, stage, duration) -> timedelta:
return timedelta(seconds=10)
- def execute(self, stage: StageExecution) -> TaskResult:
deployment_id = stage.context.get(“deploymentId”) status = check_deployment_status(deployment_id)
- if status == “complete”:
return TaskResult.success()
- elif status == “failed”:
return TaskResult.terminal(“Deployment failed”)
- else:
return TaskResult.running()
- abstractmethod get_timeout()[source]
Get the maximum time this task can run before timing out.
- Returns:
Maximum execution time
- Return type:
timedelta
- get_backoff_period(stage, duration)[source]
Get the backoff period before retrying.
Override to implement dynamic backoff based on how long the task has been running.
- Parameters:
stage (StageExecution) – The stage execution context
duration (timedelta) – How long the task has been running
- Returns:
Time to wait before retrying
- Return type:
timedelta
- get_dynamic_timeout(stage)[source]
Get dynamic timeout based on stage context.
Override to implement context-based timeouts.
- Parameters:
stage (StageExecution) – The stage execution context
- Returns:
Timeout duration
- Return type:
timedelta
- get_dynamic_backoff_period(stage, duration)[source]
Get dynamic backoff based on stage context.
- Parameters:
stage (StageExecution) – The stage execution context
duration (timedelta) – How long the task has been running
- Returns:
Time to wait before retrying
- Return type:
timedelta
- class stabilize.tasks.interface.OverridableTimeoutRetryableTask[source]
Bases:
RetryableTaskA retryable task whose timeout can be overridden by the stage.
The stage can set a ‘stageTimeoutMs’ context value to override the default timeout.
- get_dynamic_timeout(stage)[source]
Get timeout, potentially overridden by stage context.
- Parameters:
stage (StageExecution)
- Return type:
timedelta
- class stabilize.tasks.interface.SkippableTask[source]
Bases:
TaskA task that can be conditionally skipped.
Override is_enabled() to control when the task should be skipped.
- is_enabled(stage)[source]
Check if this task is enabled.
Override to implement skip logic.
- Parameters:
stage (StageExecution) – The stage execution context
- Returns:
True if task should execute, False to skip
- Return type:
bool
- execute(stage)[source]
Execute the task if enabled.
- Parameters:
stage (StageExecution)
- Return type:
- abstractmethod do_execute(stage)[source]
Perform the actual task execution.
- Parameters:
stage (StageExecution) – The stage execution context
- Returns:
TaskResult indicating status
- Return type:
- class stabilize.tasks.interface.CallableTask(func, name=None)[source]
Bases:
TaskA task that wraps a callable function.
Allows using simple functions as tasks without creating a class.
Example
- def my_task(stage: StageExecution) -> TaskResult:
return TaskResult.success(outputs={“result”: “done”})
task = CallableTask(my_task)
- Parameters:
func (Callable[[StageExecution], TaskResult])
name (str | None)
- execute(stage)[source]
Execute the wrapped function.
- Parameters:
stage (StageExecution)
- Return type:
- property name: str
Get the task name.
- class stabilize.tasks.interface.NoOpTask[source]
Bases:
TaskA task that does nothing.
Useful for testing or placeholder stages.
- execute(stage)[source]
Return success immediately.
- Parameters:
stage (StageExecution)
- Return type:
- class stabilize.tasks.interface.WaitTask[source]
Bases:
RetryableTaskA task that waits for a specified duration.
Reads ‘waitTime’ from stage context (in seconds).
- get_backoff_period(stage, duration)[source]
Check every second.
- Parameters:
stage (StageExecution)
duration (timedelta)
- Return type:
timedelta
- execute(stage)[source]
Wait for the specified time.
- Parameters:
stage (StageExecution)
- Return type:
TaskResult - result of task execution.
This module defines the TaskResult class that encapsulates the result of executing a task, including status, context updates, and outputs.
- class stabilize.tasks.result.TaskResult(status, context=<factory>, outputs=<factory>, target_stage_ref_id=None)[source]
Bases:
objectResult of a task execution.
Tasks return a TaskResult to indicate their status and provide data to downstream stages.
- Parameters:
status (WorkflowStatus)
context (dict[str, Any])
outputs (dict[str, Any])
target_stage_ref_id (str | None)
- status
The execution status after the task runs
- context
Data scoped to the current stage (merged into stage.context)
- Type:
dict[str, Any]
- outputs
Data available to downstream stages (merged into stage.outputs)
- Type:
dict[str, Any]
- target_stage_ref_id
For jump_to results, the ref_id of the stage to jump to
- Type:
str | None
- status: WorkflowStatus
- context: dict[str, Any]
- outputs: dict[str, Any]
- target_stage_ref_id: str | None = None
- classmethod success(outputs=None, context=None)[source]
Create a successful result.
- Parameters:
outputs (dict[str, Any] | None) – Values available to downstream stages
context (dict[str, Any] | None) – Values scoped to current stage
- Returns:
A TaskResult with SUCCEEDED status
- Return type:
- classmethod running(context=None)[source]
Create a running result (task will be re-executed).
Use this when a task needs to poll/wait for something. The task will be re-queued and executed again after a backoff period.
- Parameters:
context (dict[str, Any] | None) – Updated context values
- Returns:
A TaskResult with RUNNING status
- Return type:
- classmethod terminal(error, context=None)[source]
Create a terminal failure result.
The stage and pipeline will fail.
- Parameters:
error (str) – Error message
context (dict[str, Any] | None) – Additional context
- Returns:
A TaskResult with TERMINAL status
- Return type:
- classmethod failed_continue(error, outputs=None, context=None)[source]
Create a failed result that allows pipeline to continue.
The stage will be marked as failed but downstream stages will run.
- Parameters:
error (str) – Error message
outputs (dict[str, Any] | None) – Values available to downstream stages
context (dict[str, Any] | None) – Additional context
- Returns:
A TaskResult with FAILED_CONTINUE status
- Return type:
- classmethod skipped()[source]
Create a skipped result.
- Returns:
A TaskResult with SKIPPED status
- Return type:
- classmethod canceled(outputs=None)[source]
Create a canceled result.
- Parameters:
outputs (dict[str, Any] | None) – Final outputs to preserve
- Returns:
A TaskResult with CANCELED status
- Return type:
- classmethod stopped(outputs=None)[source]
Create a stopped result.
- Parameters:
outputs (dict[str, Any] | None) – Final outputs to preserve
- Returns:
A TaskResult with STOPPED status
- Return type:
- classmethod suspend(context=None)[source]
Create a suspended result (waiting for external signal).
The stage will be set to SUSPENDED status and will wait for a SignalStage message to resume (WCP-23/24).
- Parameters:
context (dict[str, Any] | None) – Context to preserve while suspended
- Returns:
A TaskResult with SUSPENDED status
- Return type:
- classmethod redirect(context=None)[source]
Create a redirect result.
Indicates a decision branch should be followed.
- Parameters:
context (dict[str, Any] | None) – Context for the redirect
- Returns:
A TaskResult with REDIRECT status
- Return type:
- classmethod jump_to(target_stage_ref_id, context=None, outputs=None)[source]
Create a jump result to redirect flow to a different stage.
The target stage will be reset to NOT_STARTED and re-executed with the provided context merged into its existing context.
This enables dynamic routing patterns like retry loops, conditional branching, and error recovery flows.
- Parameters:
target_stage_ref_id (str) – The ref_id of the stage to jump to
context (dict[str, Any] | None) – Context to merge into target stage
outputs (dict[str, Any] | None) – Outputs to preserve (available to target stage)
- Returns:
A TaskResult with REDIRECT status and target_stage_ref_id set
- Return type:
Example
- class RouterTask(Task):
- def execute(self, stage: StageExecution) -> TaskResult:
- if stage.context.get(“tests_passed”):
return TaskResult.success()
- else:
- return TaskResult.jump_to(
“implement_stage”, context={“retry_reason”: “tests failed”}
)
- classmethod builder(status)[source]
Create a builder for more complex results.
- Parameters:
status (WorkflowStatus) – The execution status
- Returns:
A TaskResultBuilder
- Return type:
- merge_outputs(other)[source]
Merge outputs from another result.
- Parameters:
other (TaskResult | None) – Result to merge outputs from
- Returns:
A new TaskResult with merged outputs
- Return type:
- class stabilize.tasks.result.TaskResultBuilder(status)[source]
Bases:
objectBuilder for TaskResult objects.
Provides a fluent API for constructing complex task results.
- Parameters:
status (WorkflowStatus)
- add_context(key, value)[source]
Add a context value.
- Parameters:
key (str)
value (Any)
- Return type:
Enterprise-ready ShellTask for executing shell commands.
This module provides a production-ready ShellTask with: - Working directory support - Environment variable injection - Shell selection (bash, sh, custom) - Stdin input support - Output size limits (prevent OOM) - Expected exit codes (for tools that return non-zero on success) - Secret masking in logs - Binary output mode - {key} placeholder substitution with upstream outputs - Cross-platform process tree cleanup (uses psutil when available) - Linux-specific PR_SET_PDEATHSIG for auto-cleanup when parent dies
- class stabilize.tasks.shell.ShellTask[source]
Bases:
TaskEnterprise-ready shell command execution.
Executes shell commands with full control over execution environment, input/output handling, and error management.
- Context Parameters:
command (str): The shell command to execute (required) timeout (int): Command timeout in seconds (default: 60) cwd (str): Working directory for command execution env (dict): Additional environment variables to set shell (bool|str): True for default shell, or path to shell executable stdin (str): Input to send to command’s stdin max_output_size (int): Max bytes for stdout/stderr (default: 10MB) expected_codes (list[int]): Exit codes to treat as success (default: [0]) secrets (list[str]): Context keys whose values should be masked in logs binary (bool): If True, capture output as bytes (default: False) continue_on_failure (bool): If True, return failed_continue instead of terminal restart_on_failure (bool): If True, raise TransientError on failure to trigger
automatic retry with backoff (for long-running services)
- Outputs:
stdout (str|bytes): Command standard output (stripped if text mode) stderr (str|bytes): Command standard error (stripped if text mode) returncode (int): Command exit code truncated (bool): True if output was truncated due to size limit stdout_b64 (str): Base64-encoded stdout (only if binary=True)
- Placeholder Substitution:
Any {key} in the command is replaced with stage.context[key]. This includes outputs from upstream stages.
Examples
# Basic command context={“command”: “ls -la”}
# With working directory context={“command”: “npm install”, “cwd”: “/app/frontend”}
# With environment variables context={“command”: “./deploy.sh”, “env”: {“AWS_REGION”: “us-east-1”}}
# With custom shell context={“command”: “source venv/bin/activate && pytest”, “shell”: “/bin/bash”}
# With stdin input context={“command”: “cat”, “stdin”: “Hello World”}
# Allow grep’s exit code 1 (no match) context={“command”: “grep pattern file.txt”, “expected_codes”: [0, 1]}
# Mask secrets in logs context={
“command”: “curl -H ‘Authorization: Bearer {token}’ https://api.example.com”, “token”: “secret123”, “secrets”: [“token”]
}
# Binary output context={“command”: “cat image.png”, “binary”: True}
# Using upstream output context={“command”: “echo ‘Previous output: {stdout}’”}
- execute(stage)[source]
Execute the shell command with all configured options.
- Parameters:
stage (StageExecution)
- Return type:
PythonTask for executing Python code in isolated subprocess.
This module provides a production-ready PythonTask with: - Inline script execution - Script file execution - Module + function execution - INPUT/RESULT variable convention for data passing - Full subprocess isolation with timeout support
- class stabilize.tasks.python.PythonTask[source]
Bases:
TaskExecute Python code in isolated subprocess.
Supports three execution modes: 1. Inline script: Pass Python code as a string 2. Script file: Load and execute a Python file 3. Module + function: Import a module and call a function
All modes run in a subprocess for isolation and hard timeout enforcement.
- Context Parameters:
# Script execution (choose one): script (str): Inline Python code to execute script_file (str): Path to Python script file module (str): Python module path (e.g., “myapp.tasks.validate”) function (str): Function name to call (requires module)
# Inputs: inputs (dict): Input variables, available as INPUT in script (optional) args (list): Command line arguments (optional)
# Execution: python_path (str): Python interpreter path (default: current interpreter) timeout (int): Execution timeout in seconds (default: 60) cwd (str): Working directory (optional) env (dict): Environment variables to add (optional) continue_on_failure (bool): Return failed_continue on error (default: False)
- Outputs:
stdout (str): Standard output from script stderr (str): Standard error from script exit_code (int): Process exit code result (any): Value of RESULT variable if set in script
- Notes:
Scripts access input data via the INPUT dict
Scripts set return value via the RESULT variable
RESULT must be JSON-serializable
Module mode: imports module.function and calls with INPUT as argument
Upstream stage outputs are automatically available in INPUT
- Examples:
# Inline script context = {
“script”: ‘’’
result = sum(INPUT[“numbers”]) RESULT = {“sum”: result, “count”: len(INPUT[“numbers”])} ‘’’,
“inputs”: {“numbers”: [1, 2, 3, 4, 5]}
}
# Script file context = {
“script_file”: “/path/to/script.py”, “inputs”: {“config”: {“debug”: True}}
}
# Module + function context = {
“module”: “myapp.validators”, “function”: “validate_input”, “inputs”: {“data”: {“name”: “test”}}
}
- SCRIPT_WRAPPER = 'import json\nimport sys\nimport base64\n\nINPUT = json.loads(base64.b64decode(\'{inputs_json_b64}\').decode())\n\n# User script\n{script}\n\n# Output result if RESULT variable was set\nif \'RESULT\' in dir():\n print("__PYTHONTASK_RESULT_START__")\n print(json.dumps(RESULT))\n print("__PYTHONTASK_RESULT_END__")\n'
- MODULE_WRAPPER = 'import json\nimport sys\nimport base64\n\nINPUT = json.loads(base64.b64decode(\'{inputs_json_b64}\').decode())\n\nfrom {module} import {function}\nRESULT = {function}(INPUT)\n\n# Output result\nprint("__PYTHONTASK_RESULT_START__")\nprint(json.dumps(RESULT))\nprint("__PYTHONTASK_RESULT_END__")\n'
- RESULT_START_MARKER = '__PYTHONTASK_RESULT_START__'
- RESULT_END_MARKER = '__PYTHONTASK_RESULT_END__'
- execute(stage)[source]
Execute Python code based on context parameters.
- Parameters:
stage (StageExecution)
- Return type:
HTTP task package for making HTTP requests.
- class stabilize.tasks.http.HTTPTask[source]
Bases:
TaskMake HTTP requests using Python’s stdlib urllib.
Supports all standard HTTP methods, file upload/download, authentication, retries, and SSL/TLS configuration with zero external dependencies.
- Context Parameters:
url (str): Request URL (required) method (str): HTTP method - GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS
(default: GET)
- Request Body (mutually exclusive):
body (str|bytes): Raw request body json (dict): JSON body (auto-serialized, sets Content-Type) form (dict): Form-encoded body (application/x-www-form-urlencoded)
- Headers & Auth:
headers (dict): Custom request headers auth (tuple|list): Basic auth as [username, password] bearer_token (str): Bearer token for Authorization header
- File Upload (multipart/form-data):
upload_file (str): Path to file to upload upload_field (str): Form field name (default: “file”) upload_filename (str): Override filename in upload upload_form (dict): Additional form fields with upload
- File Download:
download_to (str): Path to save response body (streams large files)
- Timeouts & Retries:
timeout (int|float): Request timeout in seconds (default: 30) retries (int): Number of retries on transient failure (default: 0) retry_delay (float): Delay between retries in seconds (default: 1.0) retry_on_status (list[int]): Status codes to retry (default: [502, 503, 504])
- SSL/TLS:
verify_ssl (bool): Verify SSL certificates (default: True) ca_cert (str): Path to CA certificate bundle client_cert (str): Path to client certificate for mTLS client_key (str): Path to client private key for mTLS
- Response Handling:
expected_status (int|list[int]): Expected status code(s), fail if mismatch max_response_size (int): Max response body in bytes (default: 10MB) parse_json (bool): Auto-parse JSON response body (default: False)
- Error Handling:
continue_on_failure (bool): Return failed_continue instead of terminal secrets (list[str]): Context keys to mask in logs
- Outputs:
status_code (int): HTTP response status headers (dict): Response headers body (str): Response body (or file path if download_to) body_json (dict|list|None): Parsed JSON (if parse_json=True and valid) elapsed_ms (int): Request duration in milliseconds url (str): Final URL after redirects content_type (str): Response Content-Type header content_length (int): Response body size in bytes
Examples
# Simple GET context = {“url”: “https://api.example.com/users”}
# POST with JSON context = {
“url”: “https://api.example.com/users”, “method”: “POST”, “json”: {“name”: “John”, “email”: “john@example.com”}, “parse_json”: True,
}
# With authentication context = {
“url”: “https://api.example.com/private”, “bearer_token”: “my-api-token”,
}
# File upload context = {
“url”: “https://api.example.com/upload”, “method”: “POST”, “upload_file”: “/path/to/file.pdf”, “upload_field”: “document”,
}
# File download context = {
“url”: “https://example.com/large-file.zip”, “download_to”: “/tmp/downloaded.zip”,
}
# With retries context = {
“url”: “https://api.example.com/flaky”, “retries”: 3, “retry_delay”: 2.0,
}
- execute(stage)[source]
Execute HTTP request.
- Parameters:
stage (StageExecution)
- Return type:
DockerTask for executing Docker commands.
This module provides a production-ready DockerTask with: - Container lifecycle management (run, exec, stop, rm) - Image management (build, pull, images) - Volume and network support - Environment variable injection - Detached mode support
- class stabilize.tasks.docker.DockerTask[source]
Bases:
TaskExecute Docker commands.
Supports container lifecycle, image management, and inspection commands.
- Context Parameters:
- action (str): Action to perform (default: run)
run: Run a container
exec: Execute command in running container
build: Build an image
pull: Pull an image
ps: List containers
images: List images
logs: Get container logs
stop: Stop a container
rm: Remove a container
- For ‘run’ action:
image (str): Docker image name (required) command (str|list): Command to run in container (optional) name (str): Container name (optional) volumes (list[str]): Volume mounts as “host:container” (optional) ports (list[str]): Port mappings as “host:container” (optional) environment (dict): Environment variables (optional) workdir (str): Working directory in container (optional) network (str): Docker network to connect (optional) remove (bool): Remove container after run (default: True) detach (bool): Run in detached mode (default: False) entrypoint (str|list): Override container entrypoint (optional) user (str): Run as user, e.g., “1000:1000” or “root” (optional) hostname (str): Container hostname (optional) privileged (bool): Run in privileged mode (default: False) cap_add (list[str]): Add Linux capabilities (optional) cap_drop (list[str]): Drop Linux capabilities (optional) memory (str): Memory limit, e.g., “512m”, “2g” (optional) memory_swap (str): Memory + swap limit (optional) cpus (str): CPU limit, e.g., “0.5”, “2” (optional) gpus (str): GPU access, e.g., “all”, “device=0” (optional) shm_size (str): Shared memory size (optional) tmpfs (list[str]): tmpfs mounts (optional) read_only (bool): Read-only root filesystem (default: False) security_opt (list[str]): Security options (optional) ulimit (dict): Ulimit settings as {name: value} (optional) labels (dict): Container labels (optional) dns (list[str]): Custom DNS servers (optional) extra_hosts (list[str]): Add host mappings as “host:ip” (optional) init (bool): Run init inside container (default: False) platform (str): Target platform, e.g., “linux/amd64” (optional) pull (str): Pull policy: “always”, “never”, “missing” (optional) restart (str): Restart policy: “no”, “always”, “on-failure” (optional) stdin_open (bool): Keep stdin open (default: False) tty (bool): Allocate pseudo-TTY (default: False)
- For ‘exec’ action:
name (str): Container name (required) command (str|list): Command to execute (required)
- For ‘build’ action:
tag (str): Image tag (optional) dockerfile (str): Dockerfile path (default: Dockerfile) context (str): Build context path (default: .) build_args (dict): Build arguments (optional) no_cache (bool): Disable build cache (default: False)
- Common:
timeout (int): Command timeout in seconds (default: 14400 = 4 hours) continue_on_failure (bool): Return failed_continue on error
- Outputs:
stdout (str): Command standard output stderr (str): Command standard error exit_code (int): Command exit code container_id (str): Container ID (for run with detach) image_id (str): Image ID (for build)
Examples
# Run container context = {
“action”: “run”, “image”: “alpine:latest”, “command”: “echo Hello”,
}
# Run with environment and volumes context = {
“action”: “run”, “image”: “python:3.11”, “volumes”: [“/app:/app”], “environment”: {“DEBUG”: “true”}, “command”: “python /app/script.py”,
}
# Build image context = {
“action”: “build”, “tag”: “myapp:latest”, “context”: “./docker”,
}
- SUPPORTED_ACTIONS = frozenset({'build', 'exec', 'images', 'logs', 'ps', 'pull', 'rm', 'run', 'stop'})
- execute(stage)[source]
Execute Docker command.
- Parameters:
stage (StageExecution)
- Return type:
Control-Flow Pattern Tasks
SubWorkflowTask - executes a child workflow and waits for completion.
Implements WCP-22: Recursion. A task that starts a child workflow and polls until it completes. Supports recursion depth tracking to prevent infinite recursion.
- class stabilize.tasks.sub_workflow.SubWorkflowTask[source]
Bases:
RetryableTaskTask that starts and monitors a child workflow execution.
The child workflow is started on the first execution. Subsequent executions poll the child’s status until it completes.
- Context keys:
_sub_workflow_config: dict with workflow configuration _sub_workflow_id: set after child is started (for polling) _recursion_depth: current recursion depth (incremented per level) _max_recursion_depth: maximum allowed depth (default: 10)
- Outputs:
sub_workflow_id: ID of the child workflow sub_workflow_status: Final status of the child workflow sub_workflow_outputs: Merged outputs from the child workflow
- property aliases: list[str]
Alternative names for this task type.
Used for backward compatibility when task types are renamed.
- Returns:
List of alternative type names
- get_backoff_period(stage=None, duration=None)[source]
Poll every 5 seconds.
- Parameters:
stage (Any)
duration (timedelta | None)
- Return type:
timedelta
Stage Builders
Multi-instance stage builder.
Provides builder methods for creating multi-instance stage patterns: - WCP-12: Multiple Instances without Synchronization - WCP-13: Multiple Instances with a priori Design-Time Knowledge - WCP-14: Multiple Instances with a priori Run-Time Knowledge - WCP-15: Multiple Instances without a priori Run-Time Knowledge
- Usage:
from stabilize.stages.multi_instance_builder import MultiInstanceBuilder
# WCP-13: 6 reviewers, wait for all stages = MultiInstanceBuilder.create_fixed(
parent_stage=stage, count=6, instance_type=”review”, instance_name_prefix=”Review”,
)
# WCP-14: Count from context stages = MultiInstanceBuilder.create_from_context(
parent_stage=stage, count_key=”num_reviewers”, instance_type=”review”, instance_name_prefix=”Review”,
)
- class stabilize.stages.multi_instance_builder.MultiInstanceBuilder[source]
Bases:
objectBuilder for multi-instance stage patterns.
- static create_fixed(parent_stage, count, instance_type='', instance_name_prefix='Instance', sync_on_complete=True, join_threshold=0, cancel_remaining=False, instance_contexts=None)[source]
Create N fixed instances with optional synchronization (WCP-12/13).
- Parameters:
parent_stage (StageExecution) – The MI parent stage
count (int) – Number of instances to create
instance_type (str) – Type for instance stages (defaults to parent type)
instance_name_prefix (str) – Prefix for instance stage names
sync_on_complete (bool) – Whether to synchronize (WCP-12=False, WCP-13=True)
join_threshold (int) – N-of-M threshold (0 = all must complete)
cancel_remaining (bool) – Cancel remaining after threshold reached
instance_contexts (list[dict[str, Any]] | None) – Per-instance context overrides
- Returns:
List of created stages (instances + optional join stage)
- Return type:
list[StageExecution]
- static create_from_context(parent_stage, count_key, instance_type='', instance_name_prefix='Instance', sync_on_complete=True, join_threshold=0, cancel_remaining=False)[source]
Create instances with count from context (WCP-14).
The count is read from parent_stage.context[count_key] at build time.
- Parameters:
parent_stage (StageExecution) – The MI parent stage
count_key (str) – Context key holding the instance count
instance_type (str) – Type for instance stages
instance_name_prefix (str) – Prefix for instance stage names
sync_on_complete (bool) – Whether to synchronize
join_threshold (int) – N-of-M threshold
cancel_remaining (bool) – Cancel remaining after threshold
- Returns:
List of created stages
- Return type:
list[StageExecution]
- static create_from_collection(parent_stage, collection_key, instance_type='', instance_name_prefix='Instance', sync_on_complete=True, item_context_key='item')[source]
Create one instance per item in a context collection (WCP-14 variant).
- Parameters:
parent_stage (StageExecution) – The MI parent stage
collection_key (str) – Context key holding a list of items
instance_type (str) – Type for instance stages
instance_name_prefix (str) – Prefix for instance stage names
sync_on_complete (bool) – Whether to synchronize
item_context_key (str) – Context key to set the item on each instance
- Returns:
List of created stages
- Return type:
list[StageExecution]
- static create_dynamic(parent_stage, instance_type='', instance_name_prefix='Instance', initial_count=0)[source]
Create a dynamic MI setup (WCP-15).
Instances can be added during execution via AddMultiInstance messages.
- Parameters:
parent_stage (StageExecution) – The MI parent stage
instance_type (str) – Type for instance stages
instance_name_prefix (str) – Prefix for instance stage names
initial_count (int) – Optional initial instances to create
- Returns:
List of initially created stages (more can be added later)
- Return type:
list[StageExecution]
Structured loop builder for WCP-21.
Provides builder methods for creating structured loop patterns using the existing jump_to mechanism internally.
- Usage:
from stabilize.stages.loop_builder import LoopBuilder
# While loop: check condition, then execute body, repeat stages = LoopBuilder.while_loop(
condition=”iteration_count < max_iterations”, body_stages=[stage_a, stage_b], loop_ref_prefix=”retry_loop”,
)
# Repeat-until loop: execute body, then check condition stages = LoopBuilder.repeat_until(
condition=”tests_passed == True”, body_stages=[stage_a, stage_b], loop_ref_prefix=”test_loop”,
)
- class stabilize.stages.loop_builder.LoopBuilder[source]
Bases:
objectBuilder for structured loop patterns using jump_to internally.
- static while_loop(condition, body_stages, loop_ref_prefix='loop', max_iterations=100, context=None)[source]
Create a while-loop pattern: check condition, then execute body.
Creates stages: 1. Condition check stage (evaluates condition, jumps back or continues) 2. Body stages (user-provided) 3. Loop-back stage (jumps back to condition check)
- Parameters:
condition (str) – Expression to evaluate (loop continues while True)
body_stages (list[StageExecution]) – Stages to execute in each iteration
loop_ref_prefix (str) – Prefix for generated ref_ids
max_iterations (int) – Safety limit on loop iterations
context (dict[str, Any] | None) – Initial context for the loop
- Returns:
List of stages forming the loop structure
- Return type:
list[StageExecution]
- static repeat_until(condition, body_stages, loop_ref_prefix='loop', max_iterations=100, context=None)[source]
Create a repeat-until pattern: execute body, then check condition.
Creates stages: 1. Body stages (user-provided) 2. Condition check stage (evaluates condition, loops back or exits)
- Parameters:
condition (str) – Expression to evaluate (loop exits when True)
body_stages (list[StageExecution]) – Stages to execute in each iteration
loop_ref_prefix (str) – Prefix for generated ref_ids
max_iterations (int) – Safety limit on loop iterations
context (dict[str, Any] | None) – Initial context for the loop
- Returns:
List of stages forming the loop structure
- Return type:
list[StageExecution]