Files
docker-agent-sandbox/docker_agent_sandbox/sandbox.py
Matte23 80c2f9b159 feat: Initial library extraction from PIN LLM benchmark
DockerSandbox + LangChain file/shell tools extracted into a standalone package.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 11:47:44 +02:00

215 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""sandbox.py Docker container lifecycle and execution environment."""
from __future__ import annotations
import concurrent.futures
import io
import socket
import tarfile
import time
from pathlib import Path
from typing import TYPE_CHECKING
import docker
import docker.errors
from loguru import logger
if TYPE_CHECKING:
import docker.models.containers
class DockerSandbox:
"""
Manages a single long-running Docker container used as the bash execution
environment for an LLM agent.
The sandbox directory is bind-mounted at *container_workdir* inside the
container (default ``/workspace``), giving the model a stable, short path
regardless of where the sandbox lives on the host.
"""
def __init__(
self,
sandbox_dir: str,
container_name: str,
container_workdir: str = "/workspace",
pin_mcp_port: int = 8080,
image: str = "docker-agent-sandbox",
dockerfile_dir: str | None = None,
) -> None:
self.sandbox_dir = sandbox_dir
self.container_name = container_name
self.container_workdir = container_workdir
self.pin_mcp_port = pin_mcp_port
self.mcp_url: str = ""
self._image = image
self._dockerfile_dir = dockerfile_dir
self._client: docker.DockerClient = docker.from_env()
self._container: docker.models.containers.Container | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def build_image_if_missing(self) -> None:
"""Build the Docker image if it is not already present locally."""
try:
self._client.images.get(self._image)
logger.info("Image {!r} already present, skipping build.", self._image)
return
except docker.errors.ImageNotFound:
pass
if self._dockerfile_dir is None:
raise ValueError(
"dockerfile_dir must be provided to build the image"
)
logger.info("Building image {!r} from {}", self._image, self._dockerfile_dir)
_, logs = self._client.images.build(
path=self._dockerfile_dir,
tag=self._image,
rm=True,
)
for entry in logs:
line = entry.get("stream", "").rstrip()
if line:
logger.debug(" {}", line)
logger.success("Image {!r} built successfully.", self._image)
def start(self) -> None:
"""
Start the sandbox container.
Any existing container with the same name is removed first so that
re-running the agent always starts from a clean state.
"""
# Remove any leftover container from a previous run.
try:
old = self._client.containers.get(self.container_name)
old.remove(force=True)
except docker.errors.NotFound:
pass
self._container = self._client.containers.run(
self._image,
name=self.container_name,
detach=True,
volumes={
self.sandbox_dir: {"bind": self.container_workdir, "mode": "rw,Z"}
},
working_dir=self.container_workdir,
environment={
"CONTAINER_WORKSPACE": self.container_workdir,
"PIN_MCP_PORT": str(self.pin_mcp_port),
},
# Expose pin-mcp port; Docker assigns a random host port.
ports={f"{self.pin_mcp_port}/tcp": None},
# No outbound network needed; all tools are pre-installed.
network_mode="bridge",
# Minimal capability set; SYS_PTRACE is required for ltrace/strace/gdb.
cap_drop=["ALL"],
cap_add=["SYS_PTRACE"],
security_opt=["no-new-privileges"],
)
# Resolve the host port Docker assigned and wait for pin-mcp to be ready.
self._container.reload()
host_port = self._container.ports[f"{self.pin_mcp_port}/tcp"][0]["HostPort"]
self.mcp_url = f"http://localhost:{host_port}/mcp"
self._wait_for_mcp()
logger.info(
"Container {!r} started (id={}), pin-mcp at {}.",
self.container_name,
self._container.short_id,
self.mcp_url,
)
def _wait_for_mcp(self, timeout: int = 30) -> None:
"""Block until pin-mcp's TCP port accepts connections, or raise on timeout."""
host_port = int(self.mcp_url.split(":")[-1].split("/")[0])
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
with socket.create_connection(("localhost", host_port), timeout=1):
return
except OSError:
time.sleep(0.5)
raise RuntimeError(
f"pin-mcp did not become ready on port {host_port} within {timeout}s"
)
def stop(self) -> None:
"""Remove the sandbox container."""
if self._container is not None:
self._container.remove(force=True)
logger.info("Container {!r} stopped.", self.container_name)
self._container = None
# ------------------------------------------------------------------
# File I/O
# ------------------------------------------------------------------
def write_file(self, path: str, content: str) -> None:
"""
Write *content* to *path* inside the container using ``put_archive``.
Using the archive API avoids all shell-escaping concerns; any text
(including content with quotes, backslashes, or null bytes) is
transferred safely as a tar stream. Parent directories are created
automatically via a preceding ``mkdir -p``.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
p = Path(path)
encoded = content.encode("utf-8")
# Ensure the parent directory exists inside the container.
self._container.exec_run(["mkdir", "-p", str(p.parent)])
# Pack the file into an in-memory tar archive and push it in.
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tar:
info = tarfile.TarInfo(name=p.name)
info.size = len(encoded)
info.mode = 0o644
tar.addfile(info, io.BytesIO(encoded))
buf.seek(0)
self._container.put_archive(str(p.parent), buf)
# ------------------------------------------------------------------
# Command execution
# ------------------------------------------------------------------
def exec(self, command: str, timeout: int = 120) -> tuple[int, str]:
"""
Run *command* inside the container via ``exec_run``.
Returns ``(exit_code, combined_stdout_stderr)``.
The call is wrapped in a thread so the *timeout* is enforced without
modifying the command string.
"""
if self._container is None:
return 1, "Sandbox container is not running."
def _run() -> tuple[int, bytes]:
exit_code, output = self._container.exec_run(
["bash", "-c", command],
workdir=self.container_workdir,
demux=False,
)
return exit_code, output or b""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(_run)
try:
exit_code, raw = future.result(timeout=timeout)
return exit_code, raw.decode("utf-8", errors="replace")
except concurrent.futures.TimeoutError:
return 124, f"Command timed out after {timeout}s"
except Exception as exc:
return 1, f"Error running command in container: {exc}"