ci: Add unit and integration tests
This commit is contained in:
@@ -0,0 +1,43 @@
|
||||
"""Shared fixtures for integration tests.
|
||||
|
||||
All integration tests share a single container (session scope) to avoid the
|
||||
overhead of starting/stopping Docker for every test function. Each test that
|
||||
needs filesystem isolation gets its own temporary working directory via the
|
||||
``workdir`` fixture, which is torn down after the test.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from docker_agent_sandbox import DockerSandbox
|
||||
|
||||
# python:3.11-slim ships bash, grep (GNU), find, and standard POSIX utilities.
|
||||
_IMAGE = "python:3.11-slim"
|
||||
_CONTAINER_NAME = "docker-agent-sandbox-tests"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def sandbox():
|
||||
"""Start a long-running container shared by all integration tests."""
|
||||
sb = DockerSandbox(
|
||||
container_name=_CONTAINER_NAME,
|
||||
image=_IMAGE,
|
||||
command="sleep infinity",
|
||||
cpu_limit=2,
|
||||
memory_limit="512m",
|
||||
)
|
||||
sb.start()
|
||||
yield sb
|
||||
sb.stop()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def workdir(sandbox: DockerSandbox):
|
||||
"""Create a fresh temp directory in the container for the calling test."""
|
||||
d = f"/tmp/test-{uuid.uuid4().hex}"
|
||||
sandbox.exec(f"mkdir -p {d}")
|
||||
yield d
|
||||
sandbox.exec(f"rm -rf {d}")
|
||||
@@ -0,0 +1,202 @@
|
||||
"""Integration tests for DockerSandbox core (exec, file I/O, path resolution)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from docker_agent_sandbox import DockerSandbox
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# exec()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_exec_simple_command(sandbox: DockerSandbox):
|
||||
code, out = sandbox.exec("echo hello")
|
||||
assert code == 0
|
||||
assert "hello" in out
|
||||
|
||||
|
||||
def test_exec_nonzero_exit_code(sandbox: DockerSandbox):
|
||||
code, _ = sandbox.exec("exit 42")
|
||||
assert code == 42
|
||||
|
||||
|
||||
def test_exec_stderr_captured(sandbox: DockerSandbox):
|
||||
code, out = sandbox.exec("echo msg_on_stderr >&2")
|
||||
assert code == 0
|
||||
assert "msg_on_stderr" in out
|
||||
|
||||
|
||||
def test_exec_combined_stdout_and_stderr(sandbox: DockerSandbox):
|
||||
code, out = sandbox.exec("echo stdout_line; echo stderr_line >&2")
|
||||
assert code == 0
|
||||
assert "stdout_line" in out
|
||||
assert "stderr_line" in out
|
||||
|
||||
|
||||
def test_exec_returns_error_when_container_not_running():
|
||||
# Construct a sandbox without starting it to exercise the guard.
|
||||
sb = DockerSandbox.__new__(DockerSandbox)
|
||||
sb._container = None
|
||||
sb._working_dir = None
|
||||
code, out = sb.exec("echo hi")
|
||||
assert code == 1
|
||||
assert "not running" in out.lower()
|
||||
|
||||
|
||||
def test_exec_timeout(sandbox: DockerSandbox):
|
||||
code, out = sandbox.exec("sleep 60", timeout=2)
|
||||
assert code == 124
|
||||
assert "timed out" in out
|
||||
|
||||
|
||||
def test_exec_working_dir_respected():
|
||||
"""When working_dir is set, exec uses it as cwd."""
|
||||
sb = DockerSandbox(
|
||||
container_name="test-workdir-check",
|
||||
image="python:3.11-slim",
|
||||
command="sleep infinity",
|
||||
working_dir="/tmp",
|
||||
)
|
||||
sb.start()
|
||||
try:
|
||||
code, out = sb.exec("pwd")
|
||||
assert code == 0
|
||||
assert "/tmp" in out
|
||||
finally:
|
||||
sb.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# write_file() / read_file()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_write_read_roundtrip(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/hello.txt"
|
||||
sandbox.write_file(path, "hello world\n")
|
||||
assert sandbox.read_file(path) == b"hello world\n"
|
||||
|
||||
|
||||
def test_write_read_unicode(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/unicode.txt"
|
||||
content = "héllo wörld 你好\n"
|
||||
sandbox.write_file(path, content)
|
||||
assert sandbox.read_file(path).decode("utf-8") == content
|
||||
|
||||
|
||||
def test_write_creates_parent_directories(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/deep/nested/dir/file.txt"
|
||||
sandbox.write_file(path, "content")
|
||||
assert sandbox.read_file(path) == b"content"
|
||||
|
||||
|
||||
def test_write_overwrites_existing_file(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/overwrite.txt"
|
||||
sandbox.write_file(path, "first")
|
||||
sandbox.write_file(path, "second")
|
||||
assert sandbox.read_file(path) == b"second"
|
||||
|
||||
|
||||
def test_read_file_not_found_raises(sandbox: DockerSandbox, workdir: str):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
sandbox.read_file(f"{workdir}/no_such_file.txt")
|
||||
|
||||
|
||||
def test_read_directory_raises(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.exec(f"mkdir -p {workdir}/subdir")
|
||||
with pytest.raises(IsADirectoryError):
|
||||
sandbox.read_file(f"{workdir}/subdir")
|
||||
|
||||
|
||||
def test_read_file_when_container_not_running_raises():
|
||||
sb = DockerSandbox.__new__(DockerSandbox)
|
||||
sb._container = None
|
||||
sb._working_dir = None
|
||||
with pytest.raises(RuntimeError, match="not running"):
|
||||
sb.read_file("/tmp/anything.txt")
|
||||
|
||||
|
||||
def test_write_file_when_container_not_running_raises():
|
||||
sb = DockerSandbox.__new__(DockerSandbox)
|
||||
sb._container = None
|
||||
sb._working_dir = None
|
||||
with pytest.raises(RuntimeError, match="not running"):
|
||||
sb.write_file("/tmp/anything.txt", "data")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_host_port()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_host_port_raises_when_not_running():
|
||||
sb = DockerSandbox.__new__(DockerSandbox)
|
||||
sb._container = None
|
||||
with pytest.raises(RuntimeError, match="not running"):
|
||||
sb.get_host_port("8080/tcp")
|
||||
|
||||
|
||||
def test_get_host_port_raises_for_unmapped_port(sandbox: DockerSandbox):
|
||||
with pytest.raises(RuntimeError, match="not mapped"):
|
||||
sandbox.get_host_port("9999/tcp")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _resolve_path()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path, working_dir, expected",
|
||||
[
|
||||
("/absolute/path", "/work", "/absolute/path"),
|
||||
("relative/file", "/work", "/work/relative/file"),
|
||||
("relative/file", None, "relative/file"),
|
||||
("/absolute/path", None, "/absolute/path"),
|
||||
],
|
||||
)
|
||||
def test_resolve_path(path, working_dir, expected):
|
||||
sb = DockerSandbox.__new__(DockerSandbox)
|
||||
sb._working_dir = working_dir
|
||||
assert str(sb._resolve_path(path)) == expected
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# context manager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_context_manager_stops_container():
|
||||
sb = DockerSandbox(
|
||||
container_name="test-ctx-manager",
|
||||
image="python:3.11-slim",
|
||||
command="sleep infinity",
|
||||
)
|
||||
sb.start()
|
||||
with sb:
|
||||
code, _ = sb.exec("echo alive")
|
||||
assert code == 0
|
||||
assert sb._container is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_image_if_missing()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_build_image_if_missing_skips_when_present(sandbox: DockerSandbox):
|
||||
# python:3.11-slim was already pulled by the session fixture; this must not
|
||||
# raise even though dockerfile_dir is None.
|
||||
sandbox.build_image_if_missing()
|
||||
|
||||
|
||||
def test_build_image_if_missing_raises_without_dockerfile_dir():
|
||||
sb = DockerSandbox(
|
||||
container_name="irrelevant",
|
||||
image="image-that-does-not-exist-xyzzy123",
|
||||
)
|
||||
with pytest.raises((RuntimeError, ValueError)):
|
||||
sb.build_image_if_missing()
|
||||
@@ -0,0 +1,490 @@
|
||||
"""Integration tests for all LangChain tools.
|
||||
|
||||
Each tool is invoked through its public LangChain interface (``tool.invoke``)
|
||||
so that argument validation, logging, and output formatting are all exercised
|
||||
exactly as they would be when called by an LLM agent.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
from docker_agent_sandbox import (
|
||||
DockerSandbox,
|
||||
make_bash_tool,
|
||||
make_copy_file_tool,
|
||||
make_delete_file_tool,
|
||||
make_edit_file_tool,
|
||||
make_file_ops_tools,
|
||||
make_grep_tool,
|
||||
make_list_dir_tool,
|
||||
make_make_dir_tool,
|
||||
make_move_file_tool,
|
||||
make_read_file_tool,
|
||||
make_search_files_tool,
|
||||
make_write_file_tool,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# bash
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_bash_success(sandbox: DockerSandbox):
|
||||
tool = make_bash_tool(sandbox)
|
||||
result = tool.invoke({"command": "echo hello"})
|
||||
assert result.startswith("EXIT:0")
|
||||
assert "hello" in result
|
||||
|
||||
|
||||
def test_bash_nonzero_exit(sandbox: DockerSandbox):
|
||||
tool = make_bash_tool(sandbox)
|
||||
result = tool.invoke({"command": "exit 3"})
|
||||
assert result.startswith("EXIT:3")
|
||||
|
||||
|
||||
def test_bash_stderr_included(sandbox: DockerSandbox):
|
||||
tool = make_bash_tool(sandbox)
|
||||
result = tool.invoke({"command": "echo err >&2"})
|
||||
assert "EXIT:0" in result
|
||||
assert "err" in result
|
||||
|
||||
|
||||
def test_bash_large_output_truncated(sandbox: DockerSandbox):
|
||||
tool = make_bash_tool(sandbox)
|
||||
# Generate 300 lines — more than the 200-line cap.
|
||||
result = tool.invoke({"command": "python3 -c \"print('\\n'.join(['x'] * 300))\""})
|
||||
assert "[output truncated]" in result
|
||||
|
||||
|
||||
def test_bash_timeout(sandbox: DockerSandbox):
|
||||
tool = make_bash_tool(sandbox)
|
||||
result = tool.invoke({"command": "sleep 60", "timeout": 2})
|
||||
assert "EXIT:124" in result
|
||||
assert "timed out" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# write_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_write_file_ok(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_write_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/new.txt", "content": "data"})
|
||||
assert result.startswith("[OK]")
|
||||
assert "bytes" in result
|
||||
|
||||
|
||||
def test_write_file_reports_byte_count(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_write_file_tool(sandbox)
|
||||
content = "hello"
|
||||
result = tool.invoke({"path": f"{workdir}/bytes.txt", "content": content})
|
||||
assert str(len(content.encode())) in result
|
||||
|
||||
|
||||
def test_write_file_creates_parent_dirs(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_write_file_tool(sandbox)
|
||||
path = f"{workdir}/a/b/c/file.txt"
|
||||
result = tool.invoke({"path": path, "content": "nested"})
|
||||
assert result.startswith("[OK]")
|
||||
# Verify the file exists
|
||||
code, _ = sandbox.exec(f"test -f {path}")
|
||||
assert code == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# read_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_read_file_full(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.write_file(f"{workdir}/r.txt", "line1\nline2\nline3\n")
|
||||
tool = make_read_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/r.txt"})
|
||||
assert "line1" in result
|
||||
assert "line2" in result
|
||||
assert "line3" in result
|
||||
|
||||
|
||||
def test_read_file_pagination(sandbox: DockerSandbox, workdir: str):
|
||||
content = "\n".join(f"line{i}" for i in range(1, 11)) + "\n"
|
||||
sandbox.write_file(f"{workdir}/paged.txt", content)
|
||||
tool = make_read_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/paged.txt", "start_line": 3, "end_line": 5})
|
||||
assert "line3" in result
|
||||
assert "line5" in result
|
||||
assert "line1" not in result
|
||||
assert "line6" not in result
|
||||
|
||||
|
||||
def test_read_file_shows_total_line_count(sandbox: DockerSandbox, workdir: str):
|
||||
content = "\n".join(f"line{i}" for i in range(1, 21)) + "\n"
|
||||
sandbox.write_file(f"{workdir}/info.txt", content)
|
||||
tool = make_read_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/info.txt", "start_line": 1, "end_line": 5})
|
||||
# There are 20 lines but we only requested 1-5, suffix should mention totals.
|
||||
assert "20" in result
|
||||
|
||||
|
||||
def test_read_file_missing_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_read_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/does_not_exist.txt"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
def test_read_file_directory_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.exec(f"mkdir -p {workdir}/adir")
|
||||
tool = make_read_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/adir"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# edit_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_edit_file_basic_replace(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/edit.txt"
|
||||
sandbox.write_file(path, "foo bar baz\n")
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path, "old_str": "bar", "new_str": "qux"})
|
||||
assert result.startswith("[OK]")
|
||||
assert sandbox.read_file(path) == b"foo qux baz\n"
|
||||
|
||||
|
||||
def test_edit_file_old_str_not_found(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/nf.txt"
|
||||
sandbox.write_file(path, "hello\n")
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path, "old_str": "missing", "new_str": "x"})
|
||||
assert result.startswith("[ERROR]")
|
||||
assert "not found" in result
|
||||
|
||||
|
||||
def test_edit_file_ambiguous_old_str(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/amb.txt"
|
||||
sandbox.write_file(path, "foo\nfoo\n")
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path, "old_str": "foo", "new_str": "bar"})
|
||||
assert result.startswith("[ERROR]")
|
||||
assert "2 times" in result
|
||||
|
||||
|
||||
def test_edit_file_delete_block(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/del.txt"
|
||||
sandbox.write_file(path, "keep\nremove me\nalso keep\n")
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path, "old_str": "remove me\n", "new_str": ""})
|
||||
assert result.startswith("[OK]")
|
||||
content = sandbox.read_file(path).decode()
|
||||
assert "remove me" not in content
|
||||
assert "keep" in content
|
||||
|
||||
|
||||
def test_edit_file_missing_file_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/ghost.txt", "old_str": "x", "new_str": "y"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
def test_edit_file_multiline_replace(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/multi.txt"
|
||||
sandbox.write_file(path, "line1\nline2\nline3\n")
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path, "old_str": "line1\nline2\n", "new_str": "replaced\n"})
|
||||
assert result.startswith("[OK]")
|
||||
assert sandbox.read_file(path) == b"replaced\nline3\n"
|
||||
|
||||
|
||||
def test_edit_file_over_size_limit_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/big.txt"
|
||||
# Write just over 1 MB
|
||||
sandbox.write_file(path, "x" * (1_000_001))
|
||||
tool = make_edit_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path, "old_str": "x", "new_str": "y"})
|
||||
assert result.startswith("[ERROR]")
|
||||
assert "bytes" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# list_dir
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_list_dir_shows_files(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.write_file(f"{workdir}/a.txt", "a")
|
||||
sandbox.write_file(f"{workdir}/b.txt", "b")
|
||||
tool = make_list_dir_tool(sandbox)
|
||||
result = tool.invoke({"path": workdir})
|
||||
assert "a.txt" in result
|
||||
assert "b.txt" in result
|
||||
|
||||
|
||||
def test_list_dir_missing_path_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_list_dir_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/nonexistent"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
def test_list_dir_default_path(sandbox: DockerSandbox):
|
||||
# Default path is "." — just check it doesn't crash and returns something.
|
||||
tool = make_list_dir_tool(sandbox)
|
||||
result = tool.invoke({})
|
||||
assert "[ERROR" not in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# delete_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_delete_file_removes_file(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/to_delete.txt"
|
||||
sandbox.write_file(path, "bye")
|
||||
tool = make_delete_file_tool(sandbox)
|
||||
result = tool.invoke({"path": path})
|
||||
assert result.startswith("[OK]")
|
||||
code, _ = sandbox.exec(f"test -f {path}")
|
||||
assert code != 0
|
||||
|
||||
|
||||
def test_delete_file_missing_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_delete_file_tool(sandbox)
|
||||
result = tool.invoke({"path": f"{workdir}/ghost.txt"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
def test_delete_file_nonempty_dir_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
d = f"{workdir}/nonempty"
|
||||
sandbox.exec(f"mkdir -p {d}")
|
||||
sandbox.write_file(f"{d}/file.txt", "x")
|
||||
tool = make_delete_file_tool(sandbox)
|
||||
result = tool.invoke({"path": d})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
def test_delete_empty_directory(sandbox: DockerSandbox, workdir: str):
|
||||
d = f"{workdir}/emptydir"
|
||||
sandbox.exec(f"mkdir -p {d}")
|
||||
tool = make_delete_file_tool(sandbox)
|
||||
result = tool.invoke({"path": d})
|
||||
assert result.startswith("[OK]")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# move_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_move_file_renames_file(sandbox: DockerSandbox, workdir: str):
|
||||
src = f"{workdir}/src.txt"
|
||||
dst = f"{workdir}/dst.txt"
|
||||
sandbox.write_file(src, "move me")
|
||||
tool = make_move_file_tool(sandbox)
|
||||
result = tool.invoke({"src": src, "dst": dst})
|
||||
assert result.startswith("[OK]")
|
||||
assert sandbox.read_file(dst) == b"move me"
|
||||
code, _ = sandbox.exec(f"test -f {src}")
|
||||
assert code != 0
|
||||
|
||||
|
||||
def test_move_file_creates_parent_dirs(sandbox: DockerSandbox, workdir: str):
|
||||
src = f"{workdir}/mv_src.txt"
|
||||
dst = f"{workdir}/new/nested/dst.txt"
|
||||
sandbox.write_file(src, "data")
|
||||
tool = make_move_file_tool(sandbox)
|
||||
result = tool.invoke({"src": src, "dst": dst})
|
||||
assert result.startswith("[OK]")
|
||||
assert sandbox.read_file(dst) == b"data"
|
||||
|
||||
|
||||
def test_move_file_missing_src_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_move_file_tool(sandbox)
|
||||
result = tool.invoke({"src": f"{workdir}/nope.txt", "dst": f"{workdir}/out.txt"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# copy_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_copy_file_duplicates_file(sandbox: DockerSandbox, workdir: str):
|
||||
src = f"{workdir}/orig.txt"
|
||||
dst = f"{workdir}/copy.txt"
|
||||
sandbox.write_file(src, "original")
|
||||
tool = make_copy_file_tool(sandbox)
|
||||
result = tool.invoke({"src": src, "dst": dst})
|
||||
assert result.startswith("[OK]")
|
||||
assert sandbox.read_file(dst) == b"original"
|
||||
assert sandbox.read_file(src) == b"original" # source still present
|
||||
|
||||
|
||||
def test_copy_file_creates_parent_dirs(sandbox: DockerSandbox, workdir: str):
|
||||
src = f"{workdir}/cp_src.txt"
|
||||
dst = f"{workdir}/deep/copy/file.txt"
|
||||
sandbox.write_file(src, "copied")
|
||||
tool = make_copy_file_tool(sandbox)
|
||||
result = tool.invoke({"src": src, "dst": dst})
|
||||
assert result.startswith("[OK]")
|
||||
assert sandbox.read_file(dst) == b"copied"
|
||||
|
||||
|
||||
def test_copy_file_missing_src_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_copy_file_tool(sandbox)
|
||||
result = tool.invoke({"src": f"{workdir}/ghost.txt", "dst": f"{workdir}/out.txt"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# make_dir
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_make_dir_creates_directory(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/brand_new"
|
||||
tool = make_make_dir_tool(sandbox)
|
||||
result = tool.invoke({"path": path})
|
||||
assert result.startswith("[OK]")
|
||||
code, _ = sandbox.exec(f"test -d {path}")
|
||||
assert code == 0
|
||||
|
||||
|
||||
def test_make_dir_idempotent(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/existing_dir"
|
||||
sandbox.exec(f"mkdir -p {path}")
|
||||
tool = make_make_dir_tool(sandbox)
|
||||
result = tool.invoke({"path": path})
|
||||
assert result.startswith("[OK]")
|
||||
|
||||
|
||||
def test_make_dir_nested(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/a/b/c/d"
|
||||
tool = make_make_dir_tool(sandbox)
|
||||
result = tool.invoke({"path": path})
|
||||
assert result.startswith("[OK]")
|
||||
code, _ = sandbox.exec(f"test -d {path}")
|
||||
assert code == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# search_files
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_search_files_finds_match(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.write_file(f"{workdir}/target.py", "# python file")
|
||||
sandbox.write_file(f"{workdir}/other.txt", "text file")
|
||||
tool = make_search_files_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "*.py", "directory": workdir})
|
||||
assert "target.py" in result
|
||||
assert "other.txt" not in result
|
||||
|
||||
|
||||
def test_search_files_no_matches(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_search_files_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "*.nonexistent", "directory": workdir})
|
||||
assert result == "[no matches found]"
|
||||
|
||||
|
||||
def test_search_files_nested(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.write_file(f"{workdir}/sub/deep.txt", "content")
|
||||
tool = make_search_files_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "*.txt", "directory": workdir})
|
||||
assert "deep.txt" in result
|
||||
|
||||
|
||||
def test_search_files_by_exact_name(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.write_file(f"{workdir}/exact_name.txt", "x")
|
||||
tool = make_search_files_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "exact_name.txt", "directory": workdir})
|
||||
assert "exact_name.txt" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# grep
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_grep_finds_pattern(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/grep_me.txt"
|
||||
sandbox.write_file(path, "line one\nline two\nline three\n")
|
||||
tool = make_grep_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "two", "path": path})
|
||||
assert "line two" in result
|
||||
|
||||
|
||||
def test_grep_no_matches(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/grep_empty.txt"
|
||||
sandbox.write_file(path, "no match here\n")
|
||||
tool = make_grep_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "zzznomatch", "path": path})
|
||||
assert result == "[no matches found]"
|
||||
|
||||
|
||||
def test_grep_includes_line_numbers(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/ln.txt"
|
||||
sandbox.write_file(path, "alpha\nbeta\ngamma\n")
|
||||
tool = make_grep_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "beta", "path": path})
|
||||
assert "2" in result # line number
|
||||
|
||||
|
||||
def test_grep_recursive(sandbox: DockerSandbox, workdir: str):
|
||||
sandbox.write_file(f"{workdir}/d/a.txt", "find_me\n")
|
||||
sandbox.write_file(f"{workdir}/d/b.txt", "not here\n")
|
||||
tool = make_grep_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "find_me", "path": f"{workdir}/d", "recursive": True})
|
||||
assert "find_me" in result
|
||||
assert "a.txt" in result
|
||||
|
||||
|
||||
def test_grep_extended_regex(sandbox: DockerSandbox, workdir: str):
|
||||
path = f"{workdir}/regex.txt"
|
||||
sandbox.write_file(path, "foo123\nbar456\nbaz\n")
|
||||
tool = make_grep_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "foo[0-9]+", "path": path})
|
||||
assert "foo123" in result
|
||||
assert "bar456" not in result
|
||||
|
||||
|
||||
def test_grep_missing_file_returns_error(sandbox: DockerSandbox, workdir: str):
|
||||
tool = make_grep_tool(sandbox)
|
||||
result = tool.invoke({"pattern": "x", "path": f"{workdir}/no_file.txt"})
|
||||
assert result.startswith("[ERROR")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# make_file_ops_tools assembly
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_make_file_ops_tools_returns_ten_tools(sandbox: DockerSandbox):
|
||||
tools = make_file_ops_tools(sandbox)
|
||||
assert len(tools) == 10
|
||||
|
||||
|
||||
def test_make_file_ops_tools_all_are_base_tools(sandbox: DockerSandbox):
|
||||
for t in make_file_ops_tools(sandbox):
|
||||
assert isinstance(t, BaseTool)
|
||||
|
||||
|
||||
def test_make_file_ops_tools_expected_names(sandbox: DockerSandbox):
|
||||
names = {t.name for t in make_file_ops_tools(sandbox)}
|
||||
expected = {
|
||||
"read_file",
|
||||
"write_file",
|
||||
"edit_file",
|
||||
"list_dir",
|
||||
"delete_file",
|
||||
"move_file",
|
||||
"copy_file",
|
||||
"make_dir",
|
||||
"search_files",
|
||||
"grep",
|
||||
}
|
||||
assert names == expected
|
||||
@@ -0,0 +1,99 @@
|
||||
"""Unit tests for docker_agent_sandbox.tools._utils — no Docker required."""
|
||||
|
||||
import pytest
|
||||
|
||||
from docker_agent_sandbox.tools._utils import (
|
||||
_MAX_OUTPUT_CHARS,
|
||||
_MAX_OUTPUT_LINES,
|
||||
_TRUNCATION_NOTICE,
|
||||
_parent,
|
||||
truncate_output,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# truncate_output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_truncate_output_short_string_unchanged():
|
||||
assert truncate_output("hello world") == "hello world"
|
||||
|
||||
|
||||
def test_truncate_output_empty_string():
|
||||
assert truncate_output("") == ""
|
||||
|
||||
|
||||
def test_truncate_output_exactly_at_line_limit():
|
||||
output = "line\n" * _MAX_OUTPUT_LINES
|
||||
assert truncate_output(output) == output
|
||||
|
||||
|
||||
def test_truncate_output_one_over_line_limit():
|
||||
output = "line\n" * (_MAX_OUTPUT_LINES + 1)
|
||||
result = truncate_output(output)
|
||||
assert _TRUNCATION_NOTICE in result
|
||||
# 200 "line\n" kept + notice; the 201st "line" must not appear
|
||||
assert result.count("line\n") == _MAX_OUTPUT_LINES
|
||||
|
||||
|
||||
def test_truncate_output_line_limit_keeps_first_200():
|
||||
output = "line\n" * 250
|
||||
result = truncate_output(output)
|
||||
assert result.startswith("line\n" * _MAX_OUTPUT_LINES)
|
||||
assert _TRUNCATION_NOTICE in result
|
||||
|
||||
|
||||
def test_truncate_output_exactly_at_char_limit():
|
||||
output = "x" * _MAX_OUTPUT_CHARS
|
||||
assert truncate_output(output) == output
|
||||
|
||||
|
||||
def test_truncate_output_one_over_char_limit():
|
||||
output = "x" * (_MAX_OUTPUT_CHARS + 1)
|
||||
result = truncate_output(output)
|
||||
assert _TRUNCATION_NOTICE in result
|
||||
# Exactly _MAX_OUTPUT_CHARS x's are kept before the notice
|
||||
assert result.startswith("x" * _MAX_OUTPUT_CHARS)
|
||||
assert result[_MAX_OUTPUT_CHARS] != "x"
|
||||
|
||||
|
||||
def test_truncate_output_char_limit_takes_first_20000():
|
||||
output = "x" * 25_000
|
||||
result = truncate_output(output)
|
||||
assert result.startswith("x" * _MAX_OUTPUT_CHARS)
|
||||
assert _TRUNCATION_NOTICE in result
|
||||
|
||||
|
||||
def test_truncate_output_line_limit_checked_before_char_limit():
|
||||
# 201 lines of 200 chars each = 201 * 201 = ~40k chars (> char limit too).
|
||||
# Lines are checked first, so only the line-limit truncation notice appears.
|
||||
output = ("x" * 200 + "\n") * 201
|
||||
result = truncate_output(output)
|
||||
assert _TRUNCATION_NOTICE in result
|
||||
# After line truncation the result is 200 * 201 = 40200 chars + notice,
|
||||
# which is still > _MAX_OUTPUT_CHARS, so the char truncation fires too.
|
||||
# Either way the result must be shorter than the input.
|
||||
assert len(result) < len(output)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _parent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
("/foo/bar/baz.txt", "/foo/bar"),
|
||||
("/foo/bar/baz/", "/foo/bar"), # trailing slash stripped before dirname
|
||||
("/foo/bar", "/foo"),
|
||||
("/foo", "/"),
|
||||
("foo/bar/baz", "foo/bar"),
|
||||
("foo/bar", "foo"),
|
||||
("foo", "."),
|
||||
("", "."),
|
||||
],
|
||||
)
|
||||
def test_parent(path, expected):
|
||||
assert _parent(path) == expected
|
||||
Reference in New Issue
Block a user