feat: Initial commit
This commit is contained in:
141
src/mcp_api/toolbox.py
Normal file
141
src/mcp_api/toolbox.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""MCPToolbox: main entry point for the library."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .config import ServerConfig, from_dict, load_claude_json
|
||||
from .session import ServerSession
|
||||
from .tool import MCPTool
|
||||
|
||||
|
||||
class _ServerNamespace:
|
||||
"""Attribute-style access to tools belonging to one server.
|
||||
|
||||
Usage: ``toolbox.pin.list_plugins(output="/tmp/out.txt")``
|
||||
"""
|
||||
|
||||
def __init__(self, session: ServerSession, all_tools: dict[str, MCPTool]) -> None:
|
||||
self._session = session
|
||||
self._tools = {
|
||||
name: tool for name, tool in all_tools.items() if tool._session is session
|
||||
}
|
||||
|
||||
def __getattr__(self, name: str) -> MCPTool:
|
||||
if name in self._tools:
|
||||
return self._tools[name]
|
||||
raise AttributeError(name)
|
||||
|
||||
def __dir__(self) -> list[str]:
|
||||
return list(self._tools)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"_ServerNamespace(tools={list(self._tools)})"
|
||||
|
||||
|
||||
class MCPToolbox:
|
||||
"""Connects to one or more MCP servers and exposes their tools."""
|
||||
|
||||
def __init__(self, configs: dict[str, ServerConfig]) -> None:
|
||||
self._configs = configs
|
||||
self._sessions: dict[str, ServerSession] = {}
|
||||
self._tools: dict[str, MCPTool] = {}
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Constructors #
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
@classmethod
|
||||
def from_claude_json(cls, path: Path | None = None) -> MCPToolbox:
|
||||
"""Load server config from ~/.claude.json (or a custom path)."""
|
||||
return cls(load_claude_json(path))
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, servers: dict) -> MCPToolbox:
|
||||
"""Load server config from a plain dict (claude.json mcpServers format)."""
|
||||
return cls(from_dict(servers))
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Async context manager #
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
async def __aenter__(self) -> MCPToolbox:
|
||||
await self.discover()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_: Any) -> None:
|
||||
await asyncio.gather(
|
||||
*(s.disconnect() for s in self._sessions.values()),
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Discovery #
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
async def discover(self) -> None:
|
||||
"""Connect to all servers in parallel and discover their tools."""
|
||||
await asyncio.gather(*(self._connect_server(cfg) for cfg in self._configs.values()))
|
||||
|
||||
async def _connect_server(self, cfg: ServerConfig) -> None:
|
||||
session = ServerSession(cfg)
|
||||
await session.connect()
|
||||
self._sessions[cfg.name] = session
|
||||
for mcp_tool in await session.list_tools():
|
||||
tool = MCPTool(
|
||||
name=mcp_tool.name,
|
||||
description=mcp_tool.description or "",
|
||||
input_schema=mcp_tool.inputSchema if isinstance(mcp_tool.inputSchema, dict)
|
||||
else mcp_tool.inputSchema.model_dump(),
|
||||
session=session,
|
||||
)
|
||||
self._tools[tool.name] = tool
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Tool access #
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
@property
|
||||
def tools(self) -> list[MCPTool]:
|
||||
return list(self._tools.values())
|
||||
|
||||
def __getattr__(self, name: str) -> _ServerNamespace | MCPTool:
|
||||
# toolbox.pin → ServerNamespace for that server
|
||||
if name in self._sessions:
|
||||
return _ServerNamespace(self._sessions[name], self._tools)
|
||||
# toolbox.list_plugins → MCPTool directly (flat access)
|
||||
if name in self._tools:
|
||||
return self._tools[name]
|
||||
raise AttributeError(name)
|
||||
|
||||
def __getitem__(self, name: str) -> MCPTool:
|
||||
return self._tools[name]
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# LLM integration helpers #
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def as_anthropic_tools(self) -> list[dict]:
|
||||
"""Return all tools in Anthropic API format."""
|
||||
return [t.as_anthropic_tool() for t in self._tools.values()]
|
||||
|
||||
async def call_tool(self, name: str, arguments: dict | None = None) -> str:
|
||||
"""Call a tool by name and return its text output."""
|
||||
return await self._tools[name]._session.call_tool(name, arguments or {})
|
||||
|
||||
async def handle_anthropic_tool_use(self, tool_use_block: Any) -> dict:
|
||||
"""Dispatch an Anthropic ToolUseBlock and return a tool_result dict."""
|
||||
try:
|
||||
content = await self.call_tool(tool_use_block.name, tool_use_block.input)
|
||||
is_error = False
|
||||
except Exception as exc:
|
||||
content = str(exc)
|
||||
is_error = True
|
||||
return {
|
||||
"type": "tool_result",
|
||||
"tool_use_id": tool_use_block.id,
|
||||
"content": content,
|
||||
"is_error": is_error,
|
||||
}
|
||||
Reference in New Issue
Block a user