Implementation:Infiniflow Ragflow MCPToolCallSession
| Knowledge Sources | |
|---|---|
| Domains | Agent_System, MCP |
| Last Updated | 2026-02-12 06:00 GMT |
Overview
Concrete tool for managing MCP (Model Context Protocol) tool calling sessions with async/sync transport support provided by the RAGFlow agent system.
Description
The MCPToolCallSession class manages connections to MCP servers via SSE or Streamable HTTP transports. It runs an internal asyncio event loop in a dedicated thread to handle asynchronous MCP communication while exposing a synchronous public API. The class supports querying available tools, executing tool calls with timeouts, and graceful shutdown of sessions.
Usage
Import this class when implementing agent workflow components that need to call external tools via the Model Context Protocol. It is used by the agent canvas system to dispatch tool calls to registered MCP servers.
Code Reference
Source Location
- Repository: Infiniflow_Ragflow
- File: common/mcp_tool_call_conn.py
- Lines: 1-333
Signature
class MCPToolCallSession:
def __init__(self, mcp_server: dict, server_variables: dict = None, custom_header: dict = None):
"""Initialize session with MCP server configuration."""
def get_tools(self, timeout: int = 10) -> list:
"""Get available tools from the MCP server."""
def tool_call(self, name: str, arguments: dict, timeout: int = 10) -> str:
"""Execute a tool call on the MCP server."""
def close_sync(self, timeout: int = 5) -> None:
"""Synchronously close the session and stop the event loop."""
def close_multiple_mcp_toolcall_sessions(sessions: list) -> None:
"""Close multiple MCP sessions concurrently."""
def shutdown_all_mcp_sessions() -> None:
"""Shutdown all active MCP sessions globally."""
def mcp_tool_metadata_to_openai_tool(mcp_tool: dict) -> dict:
"""Convert MCP tool metadata to OpenAI function calling format."""
Import
from common.mcp_tool_call_conn import MCPToolCallSession, mcp_tool_metadata_to_openai_tool
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| mcp_server | dict | Yes | Server config with url, type (sse/streamablehttp), etc. |
| server_variables | dict | No | Variable substitutions for server URL |
| custom_header | dict | No | Custom HTTP headers for authentication |
| name | str | Yes | Tool name to invoke (for tool_call) |
| arguments | dict | Yes | Tool arguments (for tool_call) |
| timeout | int | No | Request timeout in seconds (default: 10) |
Outputs
| Name | Type | Description |
|---|---|---|
| get_tools() returns | list | List of MCP tool metadata dictionaries |
| tool_call() returns | str | Serialized tool response string |
| mcp_tool_metadata_to_openai_tool() returns | dict | OpenAI-compatible function definition |
Usage Examples
from common.mcp_tool_call_conn import MCPToolCallSession, mcp_tool_metadata_to_openai_tool
# Initialize session with an MCP server
server_config = {
"url": "http://localhost:8080/mcp",
"type": "streamablehttp",
}
session = MCPToolCallSession(server_config)
# List available tools
tools = session.get_tools(timeout=10)
for tool in tools:
print(tool["name"], tool["description"])
# Execute a tool call
result = session.tool_call("search_web", {"query": "RAG systems"}, timeout=15)
print(result)
# Clean up
session.close_sync()