feat: Initial library extraction from PIN LLM benchmark

DockerSandbox + LangChain file/shell tools extracted into a standalone package.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-02 11:47:44 +02:00
commit 80c2f9b159
17 changed files with 758 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
"""docker_agent_sandbox Docker sandbox + LangChain tools for LLM agents."""
from docker_agent_sandbox.sandbox import DockerSandbox
from docker_agent_sandbox.tools import make_bash_tool, make_file_ops_tools
__all__ = ["DockerSandbox", "make_bash_tool", "make_file_ops_tools"]

View File

@@ -0,0 +1,214 @@
"""sandbox.py Docker container lifecycle and execution environment."""
from __future__ import annotations
import concurrent.futures
import io
import socket
import tarfile
import time
from pathlib import Path
from typing import TYPE_CHECKING
import docker
import docker.errors
from loguru import logger
if TYPE_CHECKING:
import docker.models.containers
class DockerSandbox:
"""
Manages a single long-running Docker container used as the bash execution
environment for an LLM agent.
The sandbox directory is bind-mounted at *container_workdir* inside the
container (default ``/workspace``), giving the model a stable, short path
regardless of where the sandbox lives on the host.
"""
def __init__(
self,
sandbox_dir: str,
container_name: str,
container_workdir: str = "/workspace",
pin_mcp_port: int = 8080,
image: str = "docker-agent-sandbox",
dockerfile_dir: str | None = None,
) -> None:
self.sandbox_dir = sandbox_dir
self.container_name = container_name
self.container_workdir = container_workdir
self.pin_mcp_port = pin_mcp_port
self.mcp_url: str = ""
self._image = image
self._dockerfile_dir = dockerfile_dir
self._client: docker.DockerClient = docker.from_env()
self._container: docker.models.containers.Container | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def build_image_if_missing(self) -> None:
"""Build the Docker image if it is not already present locally."""
try:
self._client.images.get(self._image)
logger.info("Image {!r} already present, skipping build.", self._image)
return
except docker.errors.ImageNotFound:
pass
if self._dockerfile_dir is None:
raise ValueError(
"dockerfile_dir must be provided to build the image"
)
logger.info("Building image {!r} from {}", self._image, self._dockerfile_dir)
_, logs = self._client.images.build(
path=self._dockerfile_dir,
tag=self._image,
rm=True,
)
for entry in logs:
line = entry.get("stream", "").rstrip()
if line:
logger.debug(" {}", line)
logger.success("Image {!r} built successfully.", self._image)
def start(self) -> None:
"""
Start the sandbox container.
Any existing container with the same name is removed first so that
re-running the agent always starts from a clean state.
"""
# Remove any leftover container from a previous run.
try:
old = self._client.containers.get(self.container_name)
old.remove(force=True)
except docker.errors.NotFound:
pass
self._container = self._client.containers.run(
self._image,
name=self.container_name,
detach=True,
volumes={
self.sandbox_dir: {"bind": self.container_workdir, "mode": "rw,Z"}
},
working_dir=self.container_workdir,
environment={
"CONTAINER_WORKSPACE": self.container_workdir,
"PIN_MCP_PORT": str(self.pin_mcp_port),
},
# Expose pin-mcp port; Docker assigns a random host port.
ports={f"{self.pin_mcp_port}/tcp": None},
# No outbound network needed; all tools are pre-installed.
network_mode="bridge",
# Minimal capability set; SYS_PTRACE is required for ltrace/strace/gdb.
cap_drop=["ALL"],
cap_add=["SYS_PTRACE"],
security_opt=["no-new-privileges"],
)
# Resolve the host port Docker assigned and wait for pin-mcp to be ready.
self._container.reload()
host_port = self._container.ports[f"{self.pin_mcp_port}/tcp"][0]["HostPort"]
self.mcp_url = f"http://localhost:{host_port}/mcp"
self._wait_for_mcp()
logger.info(
"Container {!r} started (id={}), pin-mcp at {}.",
self.container_name,
self._container.short_id,
self.mcp_url,
)
def _wait_for_mcp(self, timeout: int = 30) -> None:
"""Block until pin-mcp's TCP port accepts connections, or raise on timeout."""
host_port = int(self.mcp_url.split(":")[-1].split("/")[0])
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
with socket.create_connection(("localhost", host_port), timeout=1):
return
except OSError:
time.sleep(0.5)
raise RuntimeError(
f"pin-mcp did not become ready on port {host_port} within {timeout}s"
)
def stop(self) -> None:
"""Remove the sandbox container."""
if self._container is not None:
self._container.remove(force=True)
logger.info("Container {!r} stopped.", self.container_name)
self._container = None
# ------------------------------------------------------------------
# File I/O
# ------------------------------------------------------------------
def write_file(self, path: str, content: str) -> None:
"""
Write *content* to *path* inside the container using ``put_archive``.
Using the archive API avoids all shell-escaping concerns; any text
(including content with quotes, backslashes, or null bytes) is
transferred safely as a tar stream. Parent directories are created
automatically via a preceding ``mkdir -p``.
"""
if self._container is None:
raise RuntimeError("Sandbox container is not running.")
p = Path(path)
encoded = content.encode("utf-8")
# Ensure the parent directory exists inside the container.
self._container.exec_run(["mkdir", "-p", str(p.parent)])
# Pack the file into an in-memory tar archive and push it in.
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tar:
info = tarfile.TarInfo(name=p.name)
info.size = len(encoded)
info.mode = 0o644
tar.addfile(info, io.BytesIO(encoded))
buf.seek(0)
self._container.put_archive(str(p.parent), buf)
# ------------------------------------------------------------------
# Command execution
# ------------------------------------------------------------------
def exec(self, command: str, timeout: int = 120) -> tuple[int, str]:
"""
Run *command* inside the container via ``exec_run``.
Returns ``(exit_code, combined_stdout_stderr)``.
The call is wrapped in a thread so the *timeout* is enforced without
modifying the command string.
"""
if self._container is None:
return 1, "Sandbox container is not running."
def _run() -> tuple[int, bytes]:
exit_code, output = self._container.exec_run(
["bash", "-c", command],
workdir=self.container_workdir,
demux=False,
)
return exit_code, output or b""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(_run)
try:
exit_code, raw = future.result(timeout=timeout)
return exit_code, raw.decode("utf-8", errors="replace")
except concurrent.futures.TimeoutError:
return 124, f"Command timed out after {timeout}s"
except Exception as exc:
return 1, f"Error running command in container: {exc}"

View File

@@ -0,0 +1,6 @@
"""tools LangChain tools that operate inside a DockerSandbox."""
from docker_agent_sandbox.tools.bash import make_bash_tool
from docker_agent_sandbox.tools.file_ops import make_file_ops_tools
__all__ = ["make_bash_tool", "make_file_ops_tools"]

View File

@@ -0,0 +1,25 @@
"""_utils.py shared helpers for file-ops tools."""
from __future__ import annotations
import posixpath
_MAX_OUTPUT_LINES = 200
_MAX_OUTPUT_CHARS = 20_000
_TRUNCATION_NOTICE = "\n... [output truncated] ..."
def truncate_output(output: str) -> str:
"""Truncate *output* to avoid hitting token limits."""
lines = output.splitlines(keepends=True)
if len(lines) > _MAX_OUTPUT_LINES:
output = "".join(lines[:_MAX_OUTPUT_LINES]) + _TRUNCATION_NOTICE
if len(output) > _MAX_OUTPUT_CHARS:
output = output[:_MAX_OUTPUT_CHARS] + _TRUNCATION_NOTICE
return output
def _parent(path: str) -> str:
"""Return the parent directory of *path* (best-effort, no I/O)."""
parent = posixpath.dirname(path.rstrip("/"))
return parent or "."

View File

@@ -0,0 +1,38 @@
"""bash.py tool for executing shell commands inside the sandbox."""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
from docker_agent_sandbox.tools._utils import truncate_output
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_bash_tool(sandbox: "DockerSandbox") -> BaseTool:
"""
Return a bash tool that executes commands inside the Docker sandbox container.
The model's working directory is the sandbox root; all paths it uses are
identical on the host (via the bind mount) and inside the container.
"""
@tool
def bash(command: str, timeout: int = 120) -> str:
"""
Execute a shell command in the sandbox container.
Returns EXIT:<code> followed by combined stdout+stderr.
Large outputs are truncated to stay within token limits.
Use for: running the target binary, processing PIN output,
compiling plugins, or any other shell operation during analysis.
"""
logger.debug("Running inside sandbox: {}", command)
exit_code, output = sandbox.exec(command, timeout=timeout)
return f"EXIT:{exit_code}\n{truncate_output(output)}"
return bash

View File

@@ -0,0 +1,37 @@
"""copy_file.py tool for copying files inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
from docker_agent_sandbox.tools._utils import _parent
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_copy_file_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a copy_file tool bound to *sandbox*."""
@tool
def copy_file(src: str, dst: str) -> str:
"""
Copy a file from *src* to *dst*.
Parent directories of *dst* are created automatically.
Returns a confirmation message or an error.
"""
logger.debug("Copying file inside sandbox: {!r} -> {!r}", src, dst)
mkdir_cmd = f"mkdir -p -- {quote(_parent(dst))}"
exit_code, output = sandbox.exec(
f"{mkdir_cmd} && cp -- {quote(src)} {quote(dst)}"
)
if exit_code != 0:
return f"[ERROR copying {src!r} to {dst!r}] {output.strip()}"
return f"[OK] Copied {src} -> {dst}"
return copy_file

View File

@@ -0,0 +1,33 @@
"""delete_file.py tool for deleting files inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_delete_file_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a delete_file tool bound to *sandbox*."""
@tool
def delete_file(path: str) -> str:
"""
Delete a file or empty directory at *path*.
Use ``delete_file`` only for files or empty directories. To remove a
directory tree use ``move_file`` to archive it first, or call this tool
repeatedly. Returns a confirmation message or an error.
"""
logger.debug("Deleting file inside sandbox: {}", path)
exit_code, output = sandbox.exec(f"rm -d -- {quote(path)}")
if exit_code != 0:
return f"[ERROR deleting {path!r}] {output.strip()}"
return f"[OK] Deleted {path}"
return delete_file

View File

@@ -0,0 +1,65 @@
"""edit_file.py tool for str_replace editing of files inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_edit_file_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return an edit_file tool bound to *sandbox*."""
@tool
def edit_file(path: str, old_str: str, new_str: str) -> str:
"""
Replace the first exact occurrence of *old_str* with *new_str* in *path*.
This is the standard ``str_replace`` editing primitive: read the file,
find the unique snippet you want to change, and supply the replacement.
Rules:
- *old_str* must match **exactly** (including whitespace and indentation).
- *old_str* must appear **at least once**; the tool returns an error if it
is not found.
- If *old_str* appears more than once the tool refuses and asks you to
provide more surrounding context to make it unique.
- To insert text without removing anything, set *old_str* to a line that
will remain and include it verbatim in *new_str* (i.e. keep the anchor
line and add your new lines around it).
- To delete a block, set *new_str* to an empty string ``""``.
Returns a confirmation with the number of lines affected, or an error.
"""
logger.debug("Editing file inside sandbox: {!r}", path)
exit_code, content = sandbox.exec(f"cat -- {quote(path)}")
if exit_code != 0:
return f"[ERROR reading {path!r} for edit] {content.strip()}"
count = content.count(old_str)
if count == 0:
return (
f"[ERROR] old_str not found in {path!r}. "
"Check that whitespace and indentation match exactly."
)
if count > 1:
return (
f"[ERROR] old_str appears {count} times in {path!r}. "
"Provide more surrounding context to make it unique."
)
new_content = content.replace(old_str, new_str, 1)
old_lines = old_str.count("\n") + 1
new_lines = new_str.count("\n") + 1 if new_str else 0
try:
sandbox.write_file(path, new_content)
except Exception as exc:
return f"[ERROR writing {path!r} after edit] {exc}"
return f"[OK] Replaced {old_lines} line(s) with {new_lines} line(s) in {path}"
return edit_file

View File

@@ -0,0 +1,43 @@
"""file_ops.py assembles all file-operation tools into a single list."""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool
from docker_agent_sandbox.tools.copy_file import make_copy_file_tool
from docker_agent_sandbox.tools.delete_file import make_delete_file_tool
from docker_agent_sandbox.tools.edit_file import make_edit_file_tool
from docker_agent_sandbox.tools.grep import make_grep_tool
from docker_agent_sandbox.tools.list_dir import make_list_dir_tool
from docker_agent_sandbox.tools.make_dir import make_make_dir_tool
from docker_agent_sandbox.tools.move_file import make_move_file_tool
from docker_agent_sandbox.tools.read_file import make_read_file_tool
from docker_agent_sandbox.tools.search_files import make_search_files_tool
from docker_agent_sandbox.tools.write_file import make_write_file_tool
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_file_ops_tools(sandbox: "DockerSandbox") -> list[BaseTool]:
"""
Return file-operation tools bound to *sandbox*.
All paths are interpreted by the filesystem the model is working in — it
can use any absolute path (e.g. ``/tmp/re-agent/output.csv``) or a relative
one (resolved against the working directory).
"""
return [
make_read_file_tool(sandbox),
make_write_file_tool(sandbox),
make_edit_file_tool(sandbox),
make_list_dir_tool(sandbox),
make_delete_file_tool(sandbox),
make_move_file_tool(sandbox),
make_copy_file_tool(sandbox),
make_make_dir_tool(sandbox),
make_search_files_tool(sandbox),
make_grep_tool(sandbox),
]

View File

@@ -0,0 +1,45 @@
"""grep.py tool for searching file contents inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_grep_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a grep tool bound to *sandbox*."""
@tool
def grep(pattern: str, path: str, recursive: bool = False) -> str:
"""
Search for *pattern* (extended regex) in *path*.
*path* can be a file or a directory; when *path* is a directory,
*recursive* must be ``True``. Returns matching lines with file names
and line numbers, or an error message.
Useful for locating strings, symbol names, or byte sequences in
binaries and text files.
"""
logger.debug(
"Grepping inside sandbox: pattern={!r} path={!r} recursive={}",
pattern,
path,
recursive,
)
flags = "-rn" if recursive else "-n"
exit_code, output = sandbox.exec(
f"grep -E {flags} -- {quote(pattern)} {quote(path)} 2>&1"
)
# grep exits 1 when no matches — that is not an error
if exit_code not in (0, 1):
return f"[ERROR grepping {path!r}] {output.strip()}"
return output.strip() or "[no matches found]"
return grep

View File

@@ -0,0 +1,32 @@
"""list_dir.py tool for listing directory contents inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_list_dir_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a list_dir tool bound to *sandbox*."""
@tool
def list_dir(path: str = ".") -> str:
"""
List the contents of a directory at *path*.
Returns ``ls -lA`` output,
or an error message if the path does not exist or is not a directory.
"""
logger.debug("Listing files inside sandbox: {}", path)
exit_code, output = sandbox.exec(f"ls -lA -- {quote(path)}")
if exit_code != 0:
return f"[ERROR listing {path!r}] {output.strip()}"
return output
return list_dir

View File

@@ -0,0 +1,32 @@
"""make_dir.py tool for creating directories inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_make_dir_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a make_dir tool bound to *sandbox*."""
@tool
def make_dir(path: str) -> str:
"""
Create directory *path* (and all missing parents).
Succeeds silently if the directory already exists.
Returns a confirmation message or an error.
"""
logger.debug("Creating directory inside sandbox: {}", path)
exit_code, output = sandbox.exec(f"mkdir -p -- {quote(path)}")
if exit_code != 0:
return f"[ERROR creating directory {path!r}] {output.strip()}"
return f"[OK] Directory exists: {path}"
return make_dir

View File

@@ -0,0 +1,37 @@
"""move_file.py tool for moving/renaming files inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
from docker_agent_sandbox.tools._utils import _parent
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_move_file_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a move_file tool bound to *sandbox*."""
@tool
def move_file(src: str, dst: str) -> str:
"""
Move or rename a file from *src* to *dst*.
Parent directories of *dst* are created automatically.
Returns a confirmation message or an error.
"""
logger.debug("Moving file inside sandbox: {!r} -> {!r}", src, dst)
mkdir_cmd = f"mkdir -p -- {quote(_parent(dst))}"
exit_code, output = sandbox.exec(
f"{mkdir_cmd} && mv -- {quote(src)} {quote(dst)}"
)
if exit_code != 0:
return f"[ERROR moving {src!r} to {dst!r}] {output.strip()}"
return f"[OK] Moved {src} -> {dst}"
return move_file

View File

@@ -0,0 +1,60 @@
"""read_file.py tool for reading files inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_read_file_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a read_file tool bound to *sandbox*."""
@tool
def read_file(path: str, offset: int = 0, length: int = 5000) -> str:
"""
Read a file at *path*.
*path* can be absolute (``/tmp/re-agent/result.csv``) or relative to the
working directory.
*offset* is the number of bytes to skip from the start of the file.
*length* is the maximum number of bytes to return. If the file is
longer than ``offset + length``, the
output is trimmed and a summary line is appended showing how many
bytes were omitted.
Returns the (possibly trimmed) file contents as text, or an error message.
"""
logger.debug(
"Reading file inside sandbox: {} offset={} length={}", path, offset, length
)
exit_code, wc_out = sandbox.exec(f"wc -c -- {quote(path)}")
if exit_code != 0:
return f"[ERROR reading {path!r}] {wc_out.strip()}"
try:
total = int(wc_out.split()[0])
except (ValueError, IndexError):
return f"[ERROR parsing file size for {path!r}] {wc_out.strip()}"
exit_code, chunk = sandbox.exec(
f"dd if={quote(path)} iflag=skip_bytes,count_bytes"
f" skip={offset} count={length} 2>/dev/null"
)
if exit_code != 0:
return f"[ERROR reading {path!r}] {chunk.strip()}"
suffix = ""
if offset + length < total:
remaining = total - (offset + length)
suffix = f"\n[... {remaining} more bytes not shown (total {total} bytes). Use offset/length to read further.]"
elif offset > 0 or total > length:
suffix = f"\n[File total: {total} bytes, showing {len(chunk)} chars from offset {offset}.]"
return chunk + suffix
return read_file

View File

@@ -0,0 +1,40 @@
"""search_files.py tool for finding files by name pattern inside the sandbox."""
from __future__ import annotations
from shlex import quote
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_search_files_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a search_files tool bound to *sandbox*."""
@tool
def search_files(pattern: str, directory: str = ".") -> str:
"""
Find files whose names match *pattern* (shell glob) under *directory*.
Examples::
search_files("*.so", "/tmp/re-agent")
search_files("main", "/usr/bin")
Returns a newline-separated list of matching paths, or an error message.
"""
logger.debug(
"Searching files inside sandbox: pattern={!r} dir={!r}", pattern, directory
)
exit_code, output = sandbox.exec(
f"find {quote(directory)} -name {quote(pattern)} -print 2>/dev/null"
)
if exit_code != 0:
return f"[ERROR searching {directory!r}] {output.strip()}"
return output.strip() or "[no matches found]"
return search_files

View File

@@ -0,0 +1,32 @@
"""write_file.py tool for writing files inside the sandbox."""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain_core.tools import BaseTool, tool
from loguru import logger
if TYPE_CHECKING:
from docker_agent_sandbox.sandbox import DockerSandbox
def make_write_file_tool(sandbox: "DockerSandbox") -> BaseTool:
"""Return a write_file tool bound to *sandbox*."""
@tool
def write_file(path: str, content: str) -> str:
"""
Write *content* to *path*.
*path* can be absolute or relative. Parent directories are created
automatically. Returns a confirmation message or an error.
"""
try:
logger.debug("Writing file inside sandbox: {}", path)
sandbox.write_file(path, content)
except Exception as exc:
return f"[ERROR writing {path!r}] {exc}"
return f"[OK] Written {len(content.encode())} bytes to {path}"
return write_file

13
pyproject.toml Normal file
View File

@@ -0,0 +1,13 @@
[project]
name = "docker-agent-sandbox"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"docker",
"langchain-core",
"loguru",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"