Files
docker-agent-sandbox/docker_agent_sandbox/sandbox.py
Matte23 cd51258fb5
Some checks failed
CI / test (push) Failing after 51s
CI / publish (push) Has been skipped
feat: Remove MCP specific code
2026-04-02 13:24:07 +02:00

212 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""sandbox.py Docker container lifecycle and execution environment."""
from __future__ import annotations
import concurrent.futures
import io
import tarfile
from pathlib import Path
from typing import TYPE_CHECKING
import docker
import docker.errors
from loguru import logger
if TYPE_CHECKING:
import docker.models.containers
class DockerSandbox:
"""
Manages a single long-running Docker container.
All container configuration (volumes, ports, environment variables,
capabilities) is supplied by the caller.
"""
def __init__(
self,
container_name: str,
image: str = "docker-agent-sandbox",
dockerfile_dir: str | None = None,
volumes: dict | None = None,
ports: dict | None = None,
environment: dict[str, str] | None = None,
working_dir: str | None = None,
network_mode: str = "bridge",
cap_drop: list[str] | None = None,
cap_add: list[str] | None = None,
security_opt: list[str] | None = None,
) -> None:
self.container_name = container_name
self._image = image
self._dockerfile_dir = dockerfile_dir
self._volumes = volumes or {}
self._ports = ports or {}
self._environment = environment or {}
self._working_dir = working_dir
self._network_mode = network_mode
self._cap_drop = cap_drop
self._cap_add = cap_add
self._security_opt = security_opt
self._client: docker.DockerClient = docker.from_env()
self._container: docker.models.containers.Container | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def build_image_if_missing(self) -> None:
"""Build the Docker image if it is not already present locally."""
try:
self._client.images.get(self._image)
logger.info("Image {!r} already present, skipping build.", self._image)
return
except docker.errors.ImageNotFound:
pass
if self._dockerfile_dir is None:
raise ValueError("dockerfile_dir must be provided to build the image")
logger.info("Building image {!r} from {}", self._image, self._dockerfile_dir)
_, logs = self._client.images.build(
path=self._dockerfile_dir,
tag=self._image,
rm=True,
)
for entry in logs:
line = entry.get("stream", "").rstrip()
if line:
logger.debug(" {}", line)
logger.success("Image {!r} built successfully.", self._image)
def start(self) -> None:
"""
Start the container.
Any existing container with the same name is removed first so that
re-runs always start from a clean state.
"""
try:
old = self._client.containers.get(self.container_name)
old.remove(force=True)
except docker.errors.NotFound:
pass
run_kwargs: dict = dict(
name=self.container_name,
detach=True,
volumes=self._volumes,
environment=self._environment,
ports=self._ports,
network_mode=self._network_mode,
)
if self._working_dir is not None:
run_kwargs["working_dir"] = self._working_dir
if self._cap_drop is not None:
run_kwargs["cap_drop"] = self._cap_drop
if self._cap_add is not None:
run_kwargs["cap_add"] = self._cap_add
if self._security_opt is not None:
run_kwargs["security_opt"] = self._security_opt
self._container = self._client.containers.run(self._image, **run_kwargs)
self._container.reload()
logger.info(
"Container {!r} started (id={}).",
self.container_name,
self._container.short_id,
)
def stop(self) -> None:
"""Remove the container."""
if self._container is not None:
self._container.remove(force=True)
logger.info("Container {!r} stopped.", self.container_name)
self._container = None
# ------------------------------------------------------------------
# Port inspection
# ------------------------------------------------------------------
def get_host_port(self, container_port: str) -> str:
"""
Return the host port Docker mapped to *container_port*.
*container_port* should include the protocol, e.g. ``"8080/tcp"``.
Raises ``RuntimeError`` if the container is not running or the port
is not exposed.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
mappings = self._container.ports.get(container_port)
if not mappings:
raise RuntimeError(
f"Port {container_port!r} is not mapped on container {self.container_name!r}"
)
return mappings[0]["HostPort"]
# ------------------------------------------------------------------
# File I/O
# ------------------------------------------------------------------
def write_file(self, path: str, content: str) -> None:
"""
Write *content* to *path* inside the container using ``put_archive``.
The tar archive API avoids all shell-escaping concerns. Parent
directories are created automatically via a preceding ``mkdir -p``.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
p = Path(path)
encoded = content.encode("utf-8")
self._container.exec_run(["mkdir", "-p", str(p.parent)])
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tar:
info = tarfile.TarInfo(name=p.name)
info.size = len(encoded)
info.mode = 0o644
tar.addfile(info, io.BytesIO(encoded))
buf.seek(0)
self._container.put_archive(str(p.parent), buf)
# ------------------------------------------------------------------
# Command execution
# ------------------------------------------------------------------
def exec(self, command: str, timeout: int = 120) -> tuple[int, str]:
"""
Run *command* inside the container via ``exec_run``.
Returns ``(exit_code, combined_stdout_stderr)``.
The call is wrapped in a thread so the *timeout* is enforced without
modifying the command string.
"""
if self._container is None:
return 1, "Sandbox container is not running."
def _run() -> tuple[int, bytes]:
kwargs: dict = dict(demux=False)
if self._working_dir is not None:
kwargs["workdir"] = self._working_dir
exit_code, output = self._container.exec_run(
["bash", "-c", command], **kwargs
)
return exit_code, output or b""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(_run)
try:
exit_code, raw = future.result(timeout=timeout)
return exit_code, raw.decode("utf-8", errors="replace")
except concurrent.futures.TimeoutError:
return 124, f"Command timed out after {timeout}s"
except Exception as exc:
return 1, f"Error running command in container: {exc}"