Files
docker-agent-sandbox/docker_agent_sandbox/sandbox.py
2026-04-02 14:41:18 +02:00

251 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""sandbox.py Docker container lifecycle and execution environment."""
from __future__ import annotations
import io
import select as _select
import socket as _socket
import tarfile
from select import select as _original_select
from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import patch
import docker
import docker.errors
from docker.utils.socket import consume_socket_output, demux_adaptor, frames_iter
from loguru import logger
if TYPE_CHECKING:
import docker.models.containers
class DockerSandbox:
"""
Manages a single long-running Docker container.
All container configuration (volumes, ports, environment variables,
capabilities) is supplied by the caller.
"""
def __init__(
self,
container_name: str,
image: str = "docker-agent-sandbox",
dockerfile_dir: str | None = None,
volumes: dict | None = None,
ports: dict | None = None,
environment: dict[str, str] | None = None,
working_dir: str | None = None,
# TODO(security): default network_mode is "bridge", giving containers unrestricted
# outbound network access. For an untrusted-code sandbox this should default to
# "none" so callers must explicitly opt in to network access.
network_mode: str = "bridge",
cap_drop: list[str] | None = None,
cap_add: list[str] | None = None,
security_opt: list[str] | None = None,
) -> None:
self.container_name = container_name
self._image = image
self._dockerfile_dir = dockerfile_dir
self._volumes = volumes or {}
self._ports = ports or {}
self._environment = environment or {}
self._working_dir = working_dir
self._network_mode = network_mode
self._cap_drop = cap_drop
self._cap_add = cap_add
self._security_opt = security_opt
self._client: docker.DockerClient = docker.from_env()
self._container: docker.models.containers.Container | None = None
def __enter__(self) -> "DockerSandbox":
return self
def __exit__(self, *_: object) -> None:
self.stop()
self._client.close()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def build_image_if_missing(self) -> None:
"""Build the Docker image if it is not already present locally."""
try:
self._client.images.get(self._image)
logger.info("Image {!r} already present, skipping build.", self._image)
return
except docker.errors.ImageNotFound:
pass
if self._dockerfile_dir is None:
raise ValueError("dockerfile_dir must be provided to build the image")
logger.info("Building image {!r} from {}", self._image, self._dockerfile_dir)
_, logs = self._client.images.build(
path=self._dockerfile_dir,
tag=self._image,
rm=True,
)
for entry in logs:
line = entry.get("stream", "").rstrip()
if line:
logger.debug(" {}", line)
logger.success("Image {!r} built successfully.", self._image)
def start(self) -> None:
"""
Start the container.
Any existing container with the same name is removed first so that
re-runs always start from a clean state.
"""
try:
old = self._client.containers.get(self.container_name)
old.remove(force=True)
except docker.errors.NotFound:
pass
run_kwargs: dict = dict(
name=self.container_name,
detach=True,
volumes=self._volumes,
environment=self._environment,
ports=self._ports,
network_mode=self._network_mode,
)
if self._working_dir is not None:
run_kwargs["working_dir"] = self._working_dir
if self._cap_drop is not None:
run_kwargs["cap_drop"] = self._cap_drop
if self._cap_add is not None:
run_kwargs["cap_add"] = self._cap_add
if self._security_opt is not None:
run_kwargs["security_opt"] = self._security_opt
try:
self._container = self._client.containers.run(self._image, **run_kwargs)
except docker.errors.ImageNotFound:
raise RuntimeError(
f"Image {self._image!r} not found locally. "
"Call build_image_if_missing() first, or pull/build the image manually."
) from None
self._container.reload()
logger.info(
"Container {!r} started (id={}).",
self.container_name,
self._container.short_id,
)
def stop(self) -> None:
"""Remove the container."""
if self._container is not None:
self._container.remove(force=True)
logger.info("Container {!r} stopped.", self.container_name)
self._container = None
# ------------------------------------------------------------------
# Port inspection
# ------------------------------------------------------------------
def get_host_port(self, container_port: str) -> str:
"""
Return the host port Docker mapped to *container_port*.
*container_port* should include the protocol, e.g. ``"8080/tcp"``.
Raises ``RuntimeError`` if the container is not running or the port
is not exposed.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
mappings = self._container.ports.get(container_port)
if not mappings:
raise RuntimeError(
f"Port {container_port!r} is not mapped on container {self.container_name!r}"
)
return mappings[0]["HostPort"]
# ------------------------------------------------------------------
# File I/O
# ------------------------------------------------------------------
def write_file(self, path: str, content: str) -> None:
"""
Write *content* to *path* inside the container using ``put_archive``.
The tar archive API avoids all shell-escaping concerns. Parent
directories are created automatically via a preceding ``mkdir -p``.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
p = Path(path)
if not p.is_absolute() and self._working_dir is not None:
p = Path(self._working_dir) / p
encoded = content.encode("utf-8")
self._container.exec_run(["mkdir", "-p", str(p.parent)])
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tar:
info = tarfile.TarInfo(name=p.name)
info.size = len(encoded)
info.mode = 0o644
tar.addfile(info, io.BytesIO(encoded))
buf.seek(0)
self._container.put_archive(str(p.parent), buf)
# ------------------------------------------------------------------
# Command execution
# ------------------------------------------------------------------
def exec(self, command: str, timeout: int = 120) -> tuple[int, str]:
"""
Run *command* inside the container via the low-level exec API.
Returns ``(exit_code, combined_stdout_stderr)``.
Timeout is enforced via a socket-level timeout on the exec socket.
"""
if self._container is None:
return 1, "Sandbox container is not running."
create_kwargs: dict = {}
if self._working_dir is not None:
create_kwargs["workdir"] = self._working_dir
# TODO(fragile): timeout enforcement relies on private docker-py internals
# (frames_iter, demux_adaptor, consume_socket_output from docker.utils.socket)
# and monkey-patches select.select for the duration of the read — not thread-safe
# if multiple exec() calls run concurrently. Replace when docker-py adds native
# per-call timeout support. See https://github.com/docker/docker-py/issues/2651
try:
exec_id = self._client.api.exec_create(
self._container.id,
["bash", "-c", command],
stdout=True,
stderr=True,
**create_kwargs,
)
sock = self._client.api.exec_start(exec_id["Id"], socket=True)
sock._sock.settimeout(timeout)
with patch.object(
_select,
"select",
new=lambda rlist, wlist, xlist: _original_select(
rlist, wlist, xlist, timeout
),
):
gen = (demux_adaptor(*frame) for frame in frames_iter(sock, tty=False))
stdout, stderr = consume_socket_output(gen, demux=True)
exit_code = self._client.api.exec_inspect(exec_id["Id"])["ExitCode"] or 0
output = (stdout or b"") + (stderr or b"")
return exit_code, output.decode("utf-8", errors="replace")
except _socket.timeout:
return 124, f"Command timed out after {timeout}s"
except Exception as exc:
return 1, f"Error running command in container: {exc}"