Files
docker-agent-sandbox/docker_agent_sandbox/sandbox.py
T
Matte23 9dc5b9ba50
CI / test (push) Failing after 36s
CI / publish (push) Has been skipped
fix: Timeout monkey patch not working on Linux. Implement native poll
override
2026-05-04 11:07:03 +02:00

321 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""sandbox.py Docker container lifecycle and execution environment."""
from __future__ import annotations
import io
import select as _select
import socket as _socket
import tarfile
from contextlib import ExitStack
from select import select as _original_select
from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import patch
_original_poll = getattr(_select, "poll", None)
import docker
import docker.errors
from docker.utils.socket import consume_socket_output, demux_adaptor, frames_iter
from loguru import logger
if TYPE_CHECKING:
import docker.models.containers
class DockerSandbox:
"""
Manages a single long-running Docker container.
All container configuration (volumes, ports, environment variables,
capabilities) is supplied by the caller.
"""
def __init__(
self,
container_name: str,
image: str = "docker-agent-sandbox",
dockerfile_dir: str | None = None,
volumes: dict | None = None,
ports: dict | None = None,
environment: dict[str, str] | None = None,
working_dir: str | None = None,
# TODO(security): default network_mode is "bridge", giving containers unrestricted
# outbound network access. For an untrusted-code sandbox this should default to
# "none" so callers must explicitly opt in to network access.
network_mode: str = "bridge",
cap_drop: list[str] | None = None,
cap_add: list[str] | None = None,
security_opt: list[str] | None = None,
cpu_limit: float = 8,
memory_limit: str = "16g",
command: str | None = None,
) -> None:
self.container_name = container_name
self._image = image
self._dockerfile_dir = dockerfile_dir
self._volumes = volumes or {}
self._ports = ports or {}
self._environment = environment or {}
self._working_dir = working_dir
self._network_mode = network_mode
self._cap_drop = cap_drop
self._cap_add = cap_add
self._security_opt = security_opt
self._nano_cpus = int(cpu_limit * 1e9)
self._memory_limit = memory_limit
self._command = command
self._client: docker.DockerClient = docker.from_env()
self._container: docker.models.containers.Container | None = None
def __enter__(self) -> "DockerSandbox":
return self
def __exit__(self, *_: object) -> None:
self.stop()
self._client.close()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def build_image_if_missing(self) -> None:
"""Build the Docker image if it is not already present locally."""
try:
self._client.images.get(self._image)
logger.info("Image {!r} already present, skipping build.", self._image)
return
except docker.errors.ImageNotFound:
pass
if self._dockerfile_dir is None:
raise ValueError("dockerfile_dir must be provided to build the image")
logger.info("Building image {!r} from {}", self._image, self._dockerfile_dir)
_, logs = self._client.images.build(
path=self._dockerfile_dir,
tag=self._image,
rm=True,
)
for entry in logs:
line = entry.get("stream", "").rstrip()
if line:
logger.debug(" {}", line)
logger.success("Image {!r} built successfully.", self._image)
def start(self) -> None:
"""
Start the container.
Any existing container with the same name is removed first so that
re-runs always start from a clean state.
"""
try:
old = self._client.containers.get(self.container_name)
old.remove(force=True)
except docker.errors.NotFound:
pass
run_kwargs: dict = dict(
name=self.container_name,
detach=True,
volumes=self._volumes,
environment=self._environment,
ports=self._ports,
network_mode=self._network_mode,
)
if self._working_dir is not None:
run_kwargs["working_dir"] = self._working_dir
if self._cap_drop is not None:
run_kwargs["cap_drop"] = self._cap_drop
if self._cap_add is not None:
run_kwargs["cap_add"] = self._cap_add
if self._security_opt is not None:
run_kwargs["security_opt"] = self._security_opt
run_kwargs["nano_cpus"] = self._nano_cpus
run_kwargs["mem_limit"] = self._memory_limit
try:
self._container = self._client.containers.run(self._image, self._command, **run_kwargs)
except docker.errors.ImageNotFound:
raise RuntimeError(
f"Image {self._image!r} not found locally. "
"Call build_image_if_missing() first, or pull/build the image manually."
) from None
self._container.reload()
logger.info(
"Container {!r} started (id={}).",
self.container_name,
self._container.short_id,
)
def stop(self) -> None:
"""Remove the container."""
if self._container is not None:
self._container.remove(force=True)
logger.info("Container {!r} stopped.", self.container_name)
self._container = None
# ------------------------------------------------------------------
# Port inspection
# ------------------------------------------------------------------
def get_host_port(self, container_port: str) -> str:
"""
Return the host port Docker mapped to *container_port*.
*container_port* should include the protocol, e.g. ``"8080/tcp"``.
Raises ``RuntimeError`` if the container is not running or the port
is not exposed.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
mappings = self._container.ports.get(container_port)
if not mappings:
raise RuntimeError(
f"Port {container_port!r} is not mapped on container {self.container_name!r}"
)
return mappings[0]["HostPort"]
# ------------------------------------------------------------------
# File I/O
# ------------------------------------------------------------------
def _resolve_path(self, path: str) -> Path:
"""Resolve *path* against working_dir if relative."""
p = Path(path)
if not p.is_absolute() and self._working_dir is not None:
p = Path(self._working_dir) / p
return p
def read_file(self, path: str) -> bytes:
"""
Read the file at *path* from the container using ``get_archive``.
Returns the raw file bytes. Raises ``FileNotFoundError`` if the path
does not exist and ``RuntimeError`` if the container is not running.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
p = self._resolve_path(path)
try:
stream, _ = self._container.get_archive(str(p))
except docker.errors.NotFound:
raise FileNotFoundError(f"No such file in container: {path!r}") from None
buf = io.BytesIO()
for chunk in stream:
buf.write(chunk)
buf.seek(0)
with tarfile.open(fileobj=buf) as tar:
member = tar.getmembers()[0]
f = tar.extractfile(member)
if f is None:
raise IsADirectoryError(f"{path!r} is a directory, not a file")
return f.read()
def write_file(self, path: str, content: str) -> None:
"""
Write *content* to *path* inside the container using ``put_archive``.
The tar archive API avoids all shell-escaping concerns. Parent
directories are created automatically via a preceding ``mkdir -p``.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
p = self._resolve_path(path)
encoded = content.encode("utf-8")
self._container.exec_run(["mkdir", "-p", str(p.parent)])
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tar:
info = tarfile.TarInfo(name=p.name)
info.size = len(encoded)
info.mode = 0o644
tar.addfile(info, io.BytesIO(encoded))
buf.seek(0)
self._container.put_archive(str(p.parent), buf)
# ------------------------------------------------------------------
# Command execution
# ------------------------------------------------------------------
def exec(self, command: str, timeout: int = 120) -> tuple[int, str]:
"""
Run *command* inside the container via the low-level exec API.
Returns ``(exit_code, combined_stdout_stderr)``.
Timeout is enforced via a socket-level timeout on the exec socket.
"""
if self._container is None:
return 1, "Sandbox container is not running."
create_kwargs: dict = {}
if self._working_dir is not None:
create_kwargs["workdir"] = self._working_dir
# TODO(fragile): timeout enforcement relies on private docker-py internals
# (frames_iter, demux_adaptor, consume_socket_output from docker.utils.socket)
# and monkey-patches select.select / select.poll for the duration of the read
# — not thread-safe if multiple exec() calls run concurrently. Replace when
# docker-py adds native per-call timeout support.
# See https://github.com/docker/docker-py/issues/2651
#
# On Linux docker-py uses select.poll (not select.select), so both are patched.
try:
exec_id = self._client.api.exec_create(
self._container.id,
["bash", "-c", command],
stdout=True,
stderr=True,
**create_kwargs,
)
sock = self._client.api.exec_start(exec_id["Id"], socket=True)
sock._sock.settimeout(timeout)
timeout_ms = timeout * 1000
class _PollWithTimeout:
def __init__(self):
self._inner = _original_poll()
def register(self, fd, eventmask):
return self._inner.register(fd, eventmask)
def poll(self, *args):
result = self._inner.poll(timeout_ms)
if not result:
raise _socket.timeout(f"timed out after {timeout}s")
return result
with ExitStack() as stack:
stack.enter_context(patch.object(
_select, "select",
new=lambda rlist, wlist, xlist: _original_select(
rlist, wlist, xlist, timeout
),
))
if _original_poll is not None:
stack.enter_context(
patch.object(_select, "poll", new=_PollWithTimeout)
)
gen = (demux_adaptor(*frame) for frame in frames_iter(sock, tty=False))
stdout, stderr = consume_socket_output(gen, demux=True)
exit_code = self._client.api.exec_inspect(exec_id["Id"])["ExitCode"]
if exit_code is None:
exit_code = 0
output = (stdout or b"") + (stderr or b"")
return exit_code, output.decode("utf-8", errors="replace")
except _socket.timeout:
return 124, f"Command timed out after {timeout}s"
except Exception as exc:
return 1, f"Error running command in container: {exc}"