MCP技术概述

MCP (Model Context Protocol) 是一个用于连接AI模型与外部数据源和工具的开放标准协议。它允许AI助手安全地访问本地和远程资源,实现更强大的功能扩展。

MCP核心特性:

  • 标准化的客户端-服务器通信协议
  • 支持多种传输方式(STDIO、HTTP、SSE)
  • 基于JSON-RPC的消息格式
  • 工具调用和资源访问能力

协议架构

┌─────────────────┐    ┌─────────────────┐
│   MCP Client    │    │   MCP Server    │
│   (AI模型)       │◄──►│   (工具提供者)   │
└─────────────────┘    └─────────────────┘
        │                        │
        └───────── MCP ──────────┘
     (JSON-RPC over Transport)

服务器实现

计算器MCP服务器

以下是一个完整的MCP服务器实现,支持三种传输方式:

#!/usr/bin/env python3
"""
Calculator MCP服务器
使用FastMCP实现标准MCP协议
"""

import logging
from mcp.server.fastmcp import FastMCP

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("calculator-mcp-server")

# 初始化FastMCP服务器
mcp = FastMCP("Calculator MCP Server")


@mcp.tool()
def add(a: float, b: float) -> float:
    """Add two numbers(两个数字相加)

    Parameters:
        a (float): First number to add
        b (float): Second number to add
    
    Returns:
        float: The sum of a and b.
    """
    try:
        result = a + b
        logger.info(f"Addition: {a} + {b} = {result}")
        return result
    except Exception as e:
        logger.error(f"Failed to add numbers: {e}")
        raise RuntimeError(f"Failed to add numbers: {str(e)}")


def main_stdio():
    """STDIO传输模式入口点"""
    logger.info("启动Calculator MCP服务器 (STDIO传输模式)")
    mcp.run(transport="stdio")


def main_remote(host: str = "127.0.0.1", port: int = 8008, transport: str = "http"):
    """HTTP传输模式入口点"""
    import uvicorn

    logger.info(f"启动Calculator MCP服务器 ({transport.upper()}传输模式) - {host}:{port}")
    if transport == "sse":
        app = mcp.sse_app()
    else:
        app = mcp.streamable_http_app()
    uvicorn.run(app, host=host, port=port)


def main_http_with_args():
    """带命令行参数解析的HTTP服务器启动器"""
    import argparse
    import sys

    # 如果从主脚本调用,需要过滤掉 --http 参数
    argv = sys.argv[1:]
    if argv and argv[0] == "--http":
        argv = argv[1:]

    parser = argparse.ArgumentParser(description="Calculator MCP服务器 - HTTP传输模式")
    parser.add_argument("--host", default="127.0.0.1", help="绑定的主机地址")
    parser.add_argument("--port", type=int, default=8008, help="绑定的端口号")

    args = parser.parse_args(argv)
    main_remote(args.host, args.port)


def main_sse_with_args():
    """带命令行参数解析的SSE服务器启动器"""
    import argparse
    import sys

    # 如果从主脚本调用,需要过滤掉 --sse 参数
    argv = sys.argv[1:]
    if argv and argv[0] == "--sse":
        argv = argv[1:]

    parser = argparse.ArgumentParser(description="Calculator MCP服务器 - SSE传输模式")
    parser.add_argument("--host", default="127.0.0.1", help="绑定的主机地址")
    parser.add_argument("--port", type=int, default=8008, help="绑定的端口号")

    args = parser.parse_args(argv)
    main_remote(args.host, args.port, transport="sse")


if __name__ == "__main__":
    import sys

    if len(sys.argv) > 1 and sys.argv[1] == "--http":
        # HTTP模式:python calculator.py --http [--host HOST] [--port PORT]
        main_http_with_args()
    elif len(sys.argv) > 1 and sys.argv[1] == "--sse":
        # SSE模式:python calculator.py --sse [--host HOST] [--port PORT]
        main_sse_with_args()
    else:
        # 默认使用STDIO模式
        main_stdio()

服务器启动方式

STDIO传输模式(默认):

python calculator.py

HTTP传输模式:

python calculator.py --http
# 或指定主机和端口
python calculator.py --http --host 0.0.0.0 --port 8080

SSE传输模式:

python calculator.py --sse
# 或指定主机和端口
python calculator.py --sse --host 0.0.0.0 --port 8080

客户端实现

STDIO客户端

以下是一个完整的STDIO客户端实现,用于连接MCP服务器:

#!/usr/bin/env python3
"""
MCP Client Demo

A simplified MCP client that connects to MCP servers and demonstrates
basic tool listing and execution functionality.
"""

import asyncio
import json
import logging
import os
import shutil
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class MCPServer:
    """Manages connection to an MCP server and tool execution."""

    def __init__(self, name: str, config: Dict[str, Any]) -> None:
        self.name = name
        self.config = config
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.tools: List[Dict[str, Any]] = []

    async def connect(self) -> None:
        """Connect to the MCP server."""
        command = (
            shutil.which("npx")
            if self.config["command"] == "npx"
            else self.config["command"]
        )
        if command is None:
            raise ValueError(f"Invalid command for server {self.name}: {self.config['command']}")

        logger.info(f"Connecting to {self.name} server...")
        server_params = StdioServerParameters(
            command=command,
            args=self.config["args"],
            env={**os.environ, **self.config.get("env", {})}
        )
        
        try:
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read, write = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await session.initialize()
            self.session = session
            logger.info(f"Successfully connected to {self.name} server")
        except Exception as e:
            logger.error(f"Error connecting to server {self.name}: {e}")
            await self.disconnect()
            raise

    async def get_tools(self) -> List[Dict[str, Any]]:
        """Get available tools from the server."""
        if not self.session:
            raise RuntimeError(f"Server {self.name} not connected")

        logger.info(f"Getting tools from {self.name} server...")
        tools_response = await self.session.list_tools()
        tools = []

        for item in tools_response:
            if isinstance(item, tuple) and item[0] == "tools":
                for tool in item[1]:
                    tools.append({
                        "name": tool.name,
                        "description": tool.description,
                        "input_schema": tool.inputSchema
                    })
        
        self.tools = tools
        return tools

    async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a tool on the server."""
        if not self.session:
            raise RuntimeError(f"Server {self.name} not connected")

        logger.info(f"Executing tool {tool_name} on {self.name} server...")
        try:
            result = await self.session.call_tool(tool_name, arguments)
            logger.info(f"Tool execution successful")
            return result
        except Exception as e:
            logger.error(f"Error executing tool {tool_name}: {e}")
            raise

    async def disconnect(self) -> None:
        """Disconnect from the server."""
        try:
            await self.exit_stack.aclose()
            self.session = None
            logger.info(f"Disconnected from {self.name} server")
        except Exception as e:
            logger.error(f"Error disconnecting from server {self.name}: {e}")

SSE客户端

以下是一个SSE客户端实现,用于测试SSE传输:

import asyncio
import os
import sys
import traceback
from typing import Dict, Any
from unittest import result
import httpx
from mcp import ClientSession
from mcp.client.sse import sse_client

TOOL_TESTS = {
    "add": {"a": 5, "b": 3},
    "subtract": {"a": 10, "b": 4},
    "multiply": {"a": 6, "b": 7},
    "divide": {"a": 20, "b": 5},
    "factorial": {"a": 5},
    "log": {"a": 100, "base": 10},
    "remainder": {"a": 17, "b": 5},
    "sin": {"angle": 30, "degrees": True},
    "cos": {"angle": 60, "degrees": True},
    "tan": {"angle": 45, "degrees": True},
    "power": {"base": 2, "exponent": 8},
    "sqrt": {"a": 16}
}


class MCPClient:
    def __init__(self):
        self.session = None
        self.available_tools = []

    async def connect(self, server_url: str):
        print(f"Connecting to server {server_url}")

        try:
            print("Create SSE Client...")
            # Store the context managers but don't enter them yet
            self._streams_context = sse_client(url=server_url)
            streams = await self._streams_context.__aenter__()

            print("Create MCP Session...")
            self._session_context = ClientSession(*streams)
            self.session = await self._session_context.__aenter__()

            print("Init Session...")
            await self.session.initialize()
            
            print("Get Tool List...")
            response = await self.session.list_tools()
            self.available_tools = response.tools

            tool_names = [tool.name for tool in self.available_tools]
            print(f'Connect Successfully! Available Tools: {tool_names}')

            return True
        except Exception as e:
            print(f'Connect Failed: {e}')
            print(traceback.format_exc())
            
            # Clean up any resources that might have been created
            await self.cleanup()
            return False
        
    async def call_tool(self, tool_name: str, parameters: Dict[str, Any]) -> str:
        if not self.session:
            print('Error: Not Connect to MCP server')
            return "Not Connect to MCP server"
        
        try:
            print(f'Call Tool: {tool_name}')
            print(f'Params: {parameters}')

            result = await self.session.call_tool(tool_name, parameters)
            print(result)

            if hasattr(result, 'content'):
                content_str = ""
                for item in result.content:
                    if hasattr(item, 'text'):
                        content_str += item.text + ", "
                content_str = content_str.rstrip(', ')
            else:
                content_str = str(result)
            
            output = content_str or "No Output"
            print(f"Tool Execute Result: {output}")
            return output
        except Exception as e:
            error_msg = f"Tool Execute Failed: {e}"
            print(error_msg)
            print(traceback.format_exc())
            return error_msg


async def main():
    if len(sys.argv) < 2:
        print("Usage: python test_mcp_client.py <MCP Server URL> (Such as http://localhost:8000/sse)")
        sys.exit(1)
    
    server_url = sys.argv[1]
    client = MCPClient()

    try:
        if await client.connect(server_url):
            print('Connect Success')
            
            # Test tool execution
            await client.call_tool("add", {"a": 5, "b": 3})
            print("\nTests completed, exiting...")
        else:
            print("Connect Failed")
            sys.exit(1)
    
    except KeyboardInterrupt:
        print("\nKeyboard interrupt received, shutting down...")
    except Exception as e:
        print('Execute Failed')
        print(traceback.format_exc())
        sys.exit(1)
    finally:
        print("\nCleaning up resources...")
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

使用示例

配置文件

客户端配置文件 servers_config.json

{
  "mcpServers": {
    "calculator": {
      "command": "python",
      "args": ["../server/calculator.py"],
      "env": {}
    }
  }
}

运行示例

启动服务器:

# STDIO模式
python calculator.py

# HTTP模式
python calculator.py --http --host 127.0.0.1 --port 8008

# SSE模式
python calculator.py --sse --host 127.0.0.1 --port 8008

运行客户端:

# STDIO客户端
python mcp_client_demo.py

# SSE客户端
python test_mcp_sse_client.py http://localhost:8008/sse

参考资料

Model Context Protocol 官方文档

FastMCP Python库文档

MCP协议规范