Source code for stabilize.tasks.http.task

"""
HTTPTask for making HTTP requests using Python stdlib.

This module provides a production-ready HTTPTask with:
- All HTTP methods (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS)
- JSON and form-encoded request bodies
- File upload (multipart/form-data)
- File download (streaming)
- Custom headers and authentication (Basic, Bearer)
- Retry logic with configurable backoff
- SSL/TLS configuration (verification, client certs)
- Placeholder substitution in URL, headers, and body
- Secret masking in logs

Zero external dependencies - uses only Python stdlib.
"""

from __future__ import annotations

import ipaddress
import logging
import socket
import time
from datetime import timedelta
from typing import TYPE_CHECKING
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import BaseHandler, HTTPRedirectHandler, HTTPSHandler, build_opener

from resilient_circuit import ExponentialDelay, RetryWithBackoffPolicy
from resilient_circuit.exceptions import RetryLimitReached

from stabilize.tasks.http.constants import (
    DEFAULT_RETRY_ON_STATUS,
    DEFAULT_TIMEOUT,
    SUPPORTED_METHODS,
)
from stabilize.tasks.http.request import build_request
from stabilize.tasks.http.response import process_response
from stabilize.tasks.http.ssl_context import build_ssl_context
from stabilize.tasks.http.utils import (
    format_error,
    mask_secrets,
    substitute_placeholders,
)
from stabilize.tasks.interface import Task
from stabilize.tasks.result import TaskResult

if TYPE_CHECKING:
    from http.client import HTTPMessage, HTTPResponse
    from typing import IO
    from urllib.request import Request

    from stabilize.models.stage import StageExecution

logger = logging.getLogger(__name__)

_BLOCKED_NETWORKS = [
    ipaddress.ip_network("127.0.0.0/8"),
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
    ipaddress.ip_network("169.254.0.0/16"),
    ipaddress.ip_network("::1/128"),
    ipaddress.ip_network("fc00::/7"),
    ipaddress.ip_network("fe80::/10"),
]


def _validate_url_safety(url: str) -> None:
    """Raise ValueError if URL resolves to a private/loopback address (SSRF protection)."""
    parsed = urlparse(url)
    hostname = parsed.hostname
    if not hostname:
        raise ValueError(f"Cannot extract hostname from URL: {url}")
    try:
        addr = ipaddress.ip_address(hostname)
    except ValueError:
        # Hostname is a DNS name — resolve it
        try:
            resolved = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
            addrs = {ipaddress.ip_address(r[4][0]) for r in resolved}
        except socket.gaierror:
            return  # DNS failure will be caught by urllib later
        for addr in addrs:
            for net in _BLOCKED_NETWORKS:
                if addr in net:
                    raise ValueError(f"SSRF blocked: URL '{url}' resolves to private address {addr}")
        return

    for net in _BLOCKED_NETWORKS:
        if addr in net:
            raise ValueError(f"SSRF blocked: URL '{url}' targets private address {addr}")


class _SafeRedirectHandler(HTTPRedirectHandler):
    """Validates each redirect target against SSRF rules."""

    def __init__(self, allow_private: bool = False) -> None:
        super().__init__()
        self._allow_private = allow_private

    def redirect_request(
        self, req: Request, fp: IO[bytes], code: int, msg: str, headers: HTTPMessage, newurl: str
    ) -> Request | None:
        if not self._allow_private:
            _validate_url_safety(newurl)
        return super().redirect_request(req, fp, code, msg, headers, newurl)


[docs] class HTTPTask(Task): """ Make HTTP requests using Python's stdlib urllib. Supports all standard HTTP methods, file upload/download, authentication, retries, and SSL/TLS configuration with zero external dependencies. Context Parameters: url (str): Request URL (required) method (str): HTTP method - GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS (default: GET) Request Body (mutually exclusive): body (str|bytes): Raw request body json (dict): JSON body (auto-serialized, sets Content-Type) form (dict): Form-encoded body (application/x-www-form-urlencoded) Headers & Auth: headers (dict): Custom request headers auth (tuple|list): Basic auth as [username, password] bearer_token (str): Bearer token for Authorization header File Upload (multipart/form-data): upload_file (str): Path to file to upload upload_field (str): Form field name (default: "file") upload_filename (str): Override filename in upload upload_form (dict): Additional form fields with upload File Download: download_to (str): Path to save response body (streams large files) Timeouts & Retries: timeout (int|float): Request timeout in seconds (default: 30) retries (int): Number of retries on transient failure (default: 0) retry_delay (float): Delay between retries in seconds (default: 1.0) retry_on_status (list[int]): Status codes to retry (default: [502, 503, 504]) SSL/TLS: verify_ssl (bool): Verify SSL certificates (default: True) ca_cert (str): Path to CA certificate bundle client_cert (str): Path to client certificate for mTLS client_key (str): Path to client private key for mTLS Response Handling: expected_status (int|list[int]): Expected status code(s), fail if mismatch max_response_size (int): Max response body in bytes (default: 10MB) parse_json (bool): Auto-parse JSON response body (default: False) Error Handling: continue_on_failure (bool): Return failed_continue instead of terminal secrets (list[str]): Context keys to mask in logs Outputs: status_code (int): HTTP response status headers (dict): Response headers body (str): Response body (or file path if download_to) body_json (dict|list|None): Parsed JSON (if parse_json=True and valid) elapsed_ms (int): Request duration in milliseconds url (str): Final URL after redirects content_type (str): Response Content-Type header content_length (int): Response body size in bytes Examples: # Simple GET context = {"url": "https://api.example.com/users"} # POST with JSON context = { "url": "https://api.example.com/users", "method": "POST", "json": {"name": "John", "email": "john@example.com"}, "parse_json": True, } # With authentication context = { "url": "https://api.example.com/private", "bearer_token": "my-api-token", } # File upload context = { "url": "https://api.example.com/upload", "method": "POST", "upload_file": "/path/to/file.pdf", "upload_field": "document", } # File download context = { "url": "https://example.com/large-file.zip", "download_to": "/tmp/downloaded.zip", } # With retries context = { "url": "https://api.example.com/flaky", "retries": 3, "retry_delay": 2.0, } """
[docs] def execute(self, stage: StageExecution) -> TaskResult: """Execute HTTP request.""" context = stage.context continue_on_failure = context.get("continue_on_failure", False) secrets = context.get("secrets", []) # Extract and validate URL url = context.get("url") if not url: return TaskResult.terminal(error="No 'url' specified in context") # Substitute placeholders in URL url = substitute_placeholders(url, context, secrets) # SSRF protection: block requests to private/loopback addresses if not context.get("allow_private_urls", False): try: _validate_url_safety(url) except ValueError as e: return TaskResult.terminal(error=str(e)) # Validate method method = context.get("method", "GET").upper() if method not in SUPPORTED_METHODS: return TaskResult.terminal(error=f"Unsupported method '{method}'. Supported: {sorted(SUPPORTED_METHODS)}") # Build request try: request, body_bytes = build_request(method, url, context, secrets) except ValueError as e: return TaskResult.terminal(error=str(e)) except FileNotFoundError as e: return TaskResult.terminal(error=f"File not found: {e}") # Build SSL context ssl_context = build_ssl_context(context) # Build opener with SSRF-safe redirect handler allow_private = context.get("allow_private_urls", False) opener_handlers: list[BaseHandler] = [_SafeRedirectHandler(allow_private=allow_private)] if ssl_context: opener_handlers.append(HTTPSHandler(context=ssl_context)) _opener = build_opener(*opener_handlers) # Retry configuration retries = context.get("retries", 0) retry_delay = context.get("retry_delay", 1.0) retry_on_status = context.get("retry_on_status", DEFAULT_RETRY_ON_STATUS) timeout = context.get("timeout", DEFAULT_TIMEOUT) # Logging (mask secrets) log_url = mask_secrets(url, context, secrets) logger.debug("HTTPTask %s %s", method, log_url) # Execute with retries using resilient-circuit start_time = time.time() last_error: Exception | None = None last_response: HTTPResponse | HTTPError | None = None # Create retry configuration with exponential backoff http_backoff = ExponentialDelay( min_delay=timedelta(seconds=retry_delay), max_delay=timedelta(seconds=retry_delay * 10), factor=2, jitter=0.1, ) # Custom exception for retryable status codes class RetryableStatusError(Exception): """Raised when response has a retryable status code.""" def __init__(self, response: HTTPResponse) -> None: self.response = response super().__init__(f"Retryable status: {response.status}") def should_retry(e: Exception) -> bool: """Check if error should trigger retry.""" if isinstance(e, RetryableStatusError): return True if isinstance(e, HTTPError) and e.code in retry_on_status: return True if isinstance(e, (URLError, TimeoutError, OSError)): return True return False http_retry_policy = RetryWithBackoffPolicy( max_retries=retries, backoff=http_backoff, should_handle=should_retry, ) @http_retry_policy def make_request() -> HTTPResponse | HTTPError: nonlocal last_error try: if not allow_private: _validate_url_safety(url) resp: HTTPResponse = _opener.open( request, data=body_bytes, timeout=timeout, ) # Check if we should retry based on status if resp.status in retry_on_status: logger.debug("HTTPTask got %s, will retry", resp.status) raise RetryableStatusError(resp) return resp except HTTPError as e: last_error = e if e.code in retry_on_status: logger.debug("HTTPTask got HTTP %s, will retry", e.code) raise # Let retry policy handle it # Non-retryable HTTP error - return it for processing return e except (URLError, TimeoutError, OSError) as e: last_error = e logger.debug("HTTPTask error: %s, will retry", e) raise # Execute with retry if retries > 0: try: last_response = make_request() except RetryLimitReached as e: # Max retries exceeded - use last error or response cause = e.__cause__ if cause is not None: if isinstance(cause, RetryableStatusError): last_response = cause.response elif isinstance(cause, HTTPError): last_response = cause elif isinstance(cause, Exception): last_error = cause except RetryableStatusError as e: # Shouldn't happen, but handle gracefully last_response = e.response else: # No retries configured - single attempt try: if not allow_private: _validate_url_safety(url) response = _opener.open( request, data=body_bytes, timeout=timeout, ) last_response = response except HTTPError as e: last_response = e except (URLError, TimeoutError, OSError) as e: last_error = e elapsed_ms = int((time.time() - start_time) * 1000) # Handle connection/timeout errors if last_response is None and last_error is not None: error_msg = format_error(last_error) if continue_on_failure: return TaskResult.failed_continue( error=error_msg, outputs={"elapsed_ms": elapsed_ms, "url": url}, ) return TaskResult.terminal( error=error_msg, context={"elapsed_ms": elapsed_ms, "url": url}, ) # Process response return process_response( response=last_response, context=context, url=url, elapsed_ms=elapsed_ms, continue_on_failure=continue_on_failure, )