"""
DockerTask for executing Docker commands.
This module provides a production-ready DockerTask with:
- Container lifecycle management (run, exec, stop, rm)
- Image management (build, pull, images)
- Volume and network support
- Environment variable injection
- Detached mode support
"""
from __future__ import annotations
import logging
import os
import shlex
import subprocess
from typing import TYPE_CHECKING, Any
from stabilize.tasks.interface import Task
from stabilize.tasks.result import TaskResult
if TYPE_CHECKING:
from stabilize.models.stage import StageExecution
logger = logging.getLogger(__name__)
_DANGEROUS_CAPABILITIES = frozenset(
{
"SYS_ADMIN",
"SYS_PTRACE",
"SYS_RAWIO",
"SYS_MODULE",
"DAC_READ_SEARCH",
"NET_ADMIN",
"NET_RAW",
"SYS_BOOT",
"ALL",
}
)
[docs]
class DockerTask(Task):
"""
Execute Docker commands.
Supports container lifecycle, image management, and inspection commands.
Context Parameters:
action (str): Action to perform (default: run)
- run: Run a container
- exec: Execute command in running container
- build: Build an image
- pull: Pull an image
- ps: List containers
- images: List images
- logs: Get container logs
- stop: Stop a container
- rm: Remove a container
For 'run' action:
image (str): Docker image name (required)
command (str|list): Command to run in container (optional)
name (str): Container name (optional)
volumes (list[str]): Volume mounts as "host:container" (optional)
ports (list[str]): Port mappings as "host:container" (optional)
environment (dict): Environment variables (optional)
workdir (str): Working directory in container (optional)
network (str): Docker network to connect (optional)
remove (bool): Remove container after run (default: True)
detach (bool): Run in detached mode (default: False)
entrypoint (str|list): Override container entrypoint (optional)
user (str): Run as user, e.g., "1000:1000" or "root" (optional)
hostname (str): Container hostname (optional)
privileged (bool): Run in privileged mode (default: False)
cap_add (list[str]): Add Linux capabilities (optional)
cap_drop (list[str]): Drop Linux capabilities (optional)
memory (str): Memory limit, e.g., "512m", "2g" (optional)
memory_swap (str): Memory + swap limit (optional)
cpus (str): CPU limit, e.g., "0.5", "2" (optional)
gpus (str): GPU access, e.g., "all", "device=0" (optional)
shm_size (str): Shared memory size (optional)
tmpfs (list[str]): tmpfs mounts (optional)
read_only (bool): Read-only root filesystem (default: False)
security_opt (list[str]): Security options (optional)
ulimit (dict): Ulimit settings as {name: value} (optional)
labels (dict): Container labels (optional)
dns (list[str]): Custom DNS servers (optional)
extra_hosts (list[str]): Add host mappings as "host:ip" (optional)
init (bool): Run init inside container (default: False)
platform (str): Target platform, e.g., "linux/amd64" (optional)
pull (str): Pull policy: "always", "never", "missing" (optional)
restart (str): Restart policy: "no", "always", "on-failure" (optional)
stdin_open (bool): Keep stdin open (default: False)
tty (bool): Allocate pseudo-TTY (default: False)
For 'exec' action:
name (str): Container name (required)
command (str|list): Command to execute (required)
For 'build' action:
tag (str): Image tag (optional)
dockerfile (str): Dockerfile path (default: Dockerfile)
context (str): Build context path (default: .)
build_args (dict): Build arguments (optional)
no_cache (bool): Disable build cache (default: False)
Common:
timeout (int): Command timeout in seconds (default: 14400 = 4 hours)
continue_on_failure (bool): Return failed_continue on error
Outputs:
stdout (str): Command standard output
stderr (str): Command standard error
exit_code (int): Command exit code
container_id (str): Container ID (for run with detach)
image_id (str): Image ID (for build)
Examples:
# Run container
context = {
"action": "run",
"image": "alpine:latest",
"command": "echo Hello",
}
# Run with environment and volumes
context = {
"action": "run",
"image": "python:3.11",
"volumes": ["/app:/app"],
"environment": {"DEBUG": "true"},
"command": "python /app/script.py",
}
# Build image
context = {
"action": "build",
"tag": "myapp:latest",
"context": "./docker",
}
"""
SUPPORTED_ACTIONS = frozenset(
{
"run",
"exec",
"build",
"pull",
"ps",
"images",
"logs",
"stop",
"rm",
}
)
[docs]
def execute(self, stage: StageExecution) -> TaskResult:
"""Execute Docker command."""
action = stage.context.get("action", "run")
timeout = stage.context.get("timeout", 14400) # 4 hours for long-running workflows
continue_on_failure = stage.context.get("continue_on_failure", False)
if action not in self.SUPPORTED_ACTIONS:
return TaskResult.terminal(
error=f"Unsupported action '{action}'. Supported: {sorted(self.SUPPORTED_ACTIONS)}"
)
# Check Docker availability
try:
subprocess.run(
["docker", "version"],
capture_output=True,
timeout=10,
check=True,
)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
return TaskResult.terminal(error="Docker is not available. Ensure Docker is installed and running.")
# Build command based on action
try:
cmd = self._build_command(action, stage.context)
except ValueError as e:
return TaskResult.terminal(error=str(e))
logger.debug("DockerTask executing: %s", " ".join(cmd))
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
)
max_output_size = stage.context.get("max_output_size", 10 * 1024 * 1024)
stdout = result.stdout
stderr = result.stderr
truncated = False
if len(stdout) > max_output_size:
stdout = stdout[:max_output_size]
truncated = True
if len(stderr) > max_output_size:
stderr = stderr[:max_output_size]
truncated = True
outputs: dict[str, Any] = {
"stdout": stdout.strip(),
"stderr": stderr.strip(),
"exit_code": result.returncode,
"truncated": truncated,
}
# Extract container/image ID for relevant actions
if action == "run" and stage.context.get("detach"):
outputs["container_id"] = result.stdout.strip()[:12]
elif action == "build" and result.returncode == 0:
for line in result.stdout.split("\n"):
if "Successfully built" in line:
outputs["image_id"] = line.split()[-1]
break
if result.returncode == 0:
logger.debug("DockerTask success")
return TaskResult.success(outputs=outputs)
else:
error_msg = f"Docker command failed with exit code {result.returncode}"
if continue_on_failure:
return TaskResult.failed_continue(error=error_msg, outputs=outputs)
return TaskResult.terminal(error=error_msg, context=outputs)
except subprocess.TimeoutExpired:
error_msg = f"Docker command timed out after {timeout}s"
if continue_on_failure:
return TaskResult.failed_continue(error=error_msg)
return TaskResult.terminal(error=error_msg)
def _build_command(self, action: str, context: dict[str, Any]) -> list[str]:
"""Build Docker command based on action and context."""
if action == "run":
return self._build_run_command(context)
elif action == "exec":
return self._build_exec_command(context)
elif action == "build":
return self._build_build_command(context)
elif action == "pull":
image = context.get("image")
if not image:
raise ValueError("'image' is required for pull action")
return ["docker", "pull", image]
elif action == "ps":
cmd = ["docker", "ps"]
if context.get("all"):
cmd.append("-a")
return cmd
elif action == "images":
return ["docker", "images"]
elif action == "logs":
name = context.get("name")
if not name:
raise ValueError("'name' is required for logs action")
cmd = ["docker", "logs"]
if context.get("follow"):
cmd.append("-f")
if context.get("tail"):
cmd.extend(["--tail", str(context["tail"])])
cmd.append(name)
return cmd
elif action == "stop":
name = context.get("name")
if not name:
raise ValueError("'name' is required for stop action")
return ["docker", "stop", name]
elif action == "rm":
name = context.get("name")
if not name:
raise ValueError("'name' is required for rm action")
cmd = ["docker", "rm"]
if context.get("force"):
cmd.append("-f")
cmd.append(name)
return cmd
else:
raise ValueError(f"Unknown action: {action}")
def _build_run_command(self, context: dict[str, Any]) -> list[str]:
"""Build docker run command."""
image = context.get("image")
if not image:
raise ValueError("'image' is required for run action")
cmd = ["docker", "run"]
# Container name
if context.get("name"):
cmd.extend(["--name", context["name"]])
# Remove after exit
if context.get("remove", True):
cmd.append("--rm")
# Detach mode
if context.get("detach"):
cmd.append("-d")
# Interactive/TTY
if context.get("stdin_open"):
cmd.append("-i")
if context.get("tty"):
cmd.append("-t")
# Entrypoint override
entrypoint = context.get("entrypoint")
if entrypoint:
if isinstance(entrypoint, list):
cmd.extend(["--entrypoint", entrypoint[0]])
else:
cmd.extend(["--entrypoint", entrypoint])
# User
if context.get("user"):
cmd.extend(["--user", context["user"]])
# Hostname
if context.get("hostname"):
cmd.extend(["--hostname", context["hostname"]])
# Privileged mode - requires explicit opt-in
if context.get("privileged"):
if not context.get("allow_privileged", False):
raise ValueError(
"Privileged mode requires 'allow_privileged: true' in context. "
"This grants full host access and should only be used when absolutely necessary."
)
cmd.append("--privileged")
# Capabilities
for cap in context.get("cap_add", []):
if cap.upper() in _DANGEROUS_CAPABILITIES and not context.get("allow_dangerous_capabilities", False):
raise ValueError(
f"Capability '{cap}' is dangerous and requires 'allow_dangerous_capabilities: true' in context."
)
cmd.extend(["--cap-add", cap])
for cap in context.get("cap_drop", []):
cmd.extend(["--cap-drop", cap])
# Resource limits
if context.get("memory"):
cmd.extend(["--memory", context["memory"]])
if context.get("memory_swap"):
cmd.extend(["--memory-swap", context["memory_swap"]])
if context.get("cpus"):
cmd.extend(["--cpus", str(context["cpus"])])
# GPU access
if context.get("gpus"):
cmd.extend(["--gpus", context["gpus"]])
# Shared memory
if context.get("shm_size"):
cmd.extend(["--shm-size", context["shm_size"]])
# tmpfs mounts
for tmpfs in context.get("tmpfs", []):
cmd.extend(["--tmpfs", tmpfs])
# Read-only filesystem
if context.get("read_only"):
cmd.append("--read-only")
# Security options
for opt in context.get("security_opt", []):
cmd.extend(["--security-opt", opt])
# Ulimits
for name, value in context.get("ulimit", {}).items():
cmd.extend(["--ulimit", f"{name}={value}"])
# Labels
for key, value in context.get("labels", {}).items():
cmd.extend(["--label", f"{key}={value}"])
# DNS
for dns in context.get("dns", []):
cmd.extend(["--dns", dns])
# Extra hosts
for host in context.get("extra_hosts", []):
cmd.extend(["--add-host", host])
# Init process
if context.get("init"):
cmd.append("--init")
# Platform
if context.get("platform"):
cmd.extend(["--platform", context["platform"]])
# Pull policy
if context.get("pull"):
cmd.extend(["--pull", context["pull"]])
# Restart policy
if context.get("restart"):
cmd.extend(["--restart", context["restart"]])
for vol in context.get("volumes", []):
parts = vol.split(":")
host_path = parts[0] if parts else vol
if ".." in host_path:
raise ValueError(f"Volume mount path traversal blocked: {vol}")
if not os.path.isabs(host_path):
raise ValueError(f"Volume host path must be absolute: {vol}")
cmd.extend(["-v", vol])
# Ports
for port in context.get("ports", []):
cmd.extend(["-p", port])
# Environment variables
for key, value in context.get("environment", {}).items():
cmd.extend(["-e", f"{key}={value}"])
# Working directory
if context.get("workdir"):
cmd.extend(["-w", context["workdir"]])
# Network
if context.get("network"):
cmd.extend(["--network", context["network"]])
# Image
cmd.append(image)
# Command (must come after image)
container_cmd = context.get("command")
if container_cmd:
if isinstance(container_cmd, str):
# Use shlex to handle complex commands with quotes
# This allows commands like: sh -c 'echo $VAR'
cmd.extend(shlex.split(container_cmd))
else:
cmd.extend(container_cmd)
return cmd
def _build_exec_command(self, context: dict[str, Any]) -> list[str]:
"""Build docker exec command."""
name = context.get("name")
command = context.get("command")
if not name:
raise ValueError("'name' is required for exec action")
if not command:
raise ValueError("'command' is required for exec action")
cmd = ["docker", "exec"]
# Interactive/TTY
if context.get("interactive"):
cmd.append("-i")
if context.get("tty"):
cmd.append("-t")
# Working directory
if context.get("workdir"):
cmd.extend(["-w", context["workdir"]])
# Environment variables
for key, value in context.get("environment", {}).items():
cmd.extend(["-e", f"{key}={value}"])
cmd.append(name)
if isinstance(command, str):
cmd.extend(shlex.split(command))
else:
cmd.extend(command)
return cmd
def _build_build_command(self, context: dict[str, Any]) -> list[str]:
"""Build docker build command."""
cmd = ["docker", "build"]
# Tag
tag = context.get("tag") or context.get("image")
if tag:
cmd.extend(["-t", tag])
# Dockerfile
if context.get("dockerfile"):
cmd.extend(["-f", context["dockerfile"]])
# Build args
for key, value in context.get("build_args", {}).items():
cmd.extend(["--build-arg", f"{key}={value}"])
# No cache
if context.get("no_cache"):
cmd.append("--no-cache")
# Context path
build_context = context.get("context", ".")
cmd.append(build_context)
return cmd