142 lines
5.3 KiB
Python
142 lines
5.3 KiB
Python
"""MCPToolbox: main entry point for the library."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .config import ServerConfig, from_dict, load_claude_json
|
|
from .session import ServerSession
|
|
from .tool import MCPTool
|
|
|
|
|
|
class _ServerNamespace:
|
|
"""Attribute-style access to tools belonging to one server.
|
|
|
|
Usage: ``toolbox.pin.list_plugins(output="/tmp/out.txt")``
|
|
"""
|
|
|
|
def __init__(self, session: ServerSession, all_tools: dict[str, MCPTool]) -> None:
|
|
self._session = session
|
|
self._tools = {
|
|
name: tool for name, tool in all_tools.items() if tool._session is session
|
|
}
|
|
|
|
def __getattr__(self, name: str) -> MCPTool:
|
|
if name in self._tools:
|
|
return self._tools[name]
|
|
raise AttributeError(name)
|
|
|
|
def __dir__(self) -> list[str]:
|
|
return list(self._tools)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"_ServerNamespace(tools={list(self._tools)})"
|
|
|
|
|
|
class MCPToolbox:
|
|
"""Connects to one or more MCP servers and exposes their tools."""
|
|
|
|
def __init__(self, configs: dict[str, ServerConfig]) -> None:
|
|
self._configs = configs
|
|
self._sessions: dict[str, ServerSession] = {}
|
|
self._tools: dict[str, MCPTool] = {}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Constructors #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
@classmethod
|
|
def from_claude_json(cls, path: Path | None = None) -> MCPToolbox:
|
|
"""Load server config from ~/.claude.json (or a custom path)."""
|
|
return cls(load_claude_json(path))
|
|
|
|
@classmethod
|
|
def from_config(cls, servers: dict) -> MCPToolbox:
|
|
"""Load server config from a plain dict (claude.json mcpServers format)."""
|
|
return cls(from_dict(servers))
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Async context manager #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def __aenter__(self) -> MCPToolbox:
|
|
await self.discover()
|
|
return self
|
|
|
|
async def __aexit__(self, *_: Any) -> None:
|
|
await asyncio.gather(
|
|
*(s.disconnect() for s in self._sessions.values()),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Discovery #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def discover(self) -> None:
|
|
"""Connect to all servers in parallel and discover their tools."""
|
|
await asyncio.gather(*(self._connect_server(cfg) for cfg in self._configs.values()))
|
|
|
|
async def _connect_server(self, cfg: ServerConfig) -> None:
|
|
session = ServerSession(cfg)
|
|
await session.connect()
|
|
self._sessions[cfg.name] = session
|
|
for mcp_tool in await session.list_tools():
|
|
tool = MCPTool(
|
|
name=mcp_tool.name,
|
|
description=mcp_tool.description or "",
|
|
input_schema=mcp_tool.inputSchema if isinstance(mcp_tool.inputSchema, dict)
|
|
else mcp_tool.inputSchema.model_dump(),
|
|
session=session,
|
|
)
|
|
self._tools[tool.name] = tool
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Tool access #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
@property
|
|
def tools(self) -> list[MCPTool]:
|
|
return list(self._tools.values())
|
|
|
|
def __getattr__(self, name: str) -> _ServerNamespace | MCPTool:
|
|
# toolbox.pin → ServerNamespace for that server
|
|
if name in self._sessions:
|
|
return _ServerNamespace(self._sessions[name], self._tools)
|
|
# toolbox.list_plugins → MCPTool directly (flat access)
|
|
if name in self._tools:
|
|
return self._tools[name]
|
|
raise AttributeError(name)
|
|
|
|
def __getitem__(self, name: str) -> MCPTool:
|
|
return self._tools[name]
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# LLM integration helpers #
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def as_anthropic_tools(self) -> list[dict]:
|
|
"""Return all tools in Anthropic API format."""
|
|
return [t.as_anthropic_tool() for t in self._tools.values()]
|
|
|
|
async def call_tool(self, name: str, arguments: dict | None = None) -> str:
|
|
"""Call a tool by name and return its text output."""
|
|
return await self._tools[name]._session.call_tool(name, arguments or {})
|
|
|
|
async def handle_anthropic_tool_use(self, tool_use_block: Any) -> dict:
|
|
"""Dispatch an Anthropic ToolUseBlock and return a tool_result dict."""
|
|
try:
|
|
content = await self.call_tool(tool_use_block.name, tool_use_block.input)
|
|
is_error = False
|
|
except Exception as exc:
|
|
content = str(exc)
|
|
is_error = True
|
|
return {
|
|
"type": "tool_result",
|
|
"tool_use_id": tool_use_block.id,
|
|
"content": content,
|
|
"is_error": is_error,
|
|
}
|