构建一个MCP系统

#AI #mcp

MCP介绍

什么是MCP

MCP 是一个开放协议,标准化了应用程序如何向 LLMs 提供上下文。可以将 MCP 想象成 AI 应用程序的 USB-C 端口。正如 USB-C 提供了一种标准化的方式将您的设备连接到各种外围设备和配件一样,MCP 提供了一种标准化的方式将 AI 模型连接到不同的数据源和工具。

为什么选择MCP

MCP 帮助您在 LLMs 之上构建代理和复杂的工作流程。LLMs 经常需要与数据和工具集成,而 MCP 提供了:

  • 一系列不断增长的预构建集成,您的 LLM 可以直接接入其中
  • 能够在不同的 LLM 提供商和供应商之间灵活切换
  • 在您的基础设施内保护数据的最佳实践

MCP架构

[[Drawing 2025-05-18 09.12.41.excalidraw]] ![[Pasted image 20250518101148.png]]

  • MCP 主机 :像 Claude Desktop、IDE 或 AI 工具这样希望通过 MCP 访问数据的程序
  • MCP 客户端 :与服务器保持 1:1 连接的协议客户端
  • MCP 服务器 :轻量级程序,每个程序通过标准化的模型上下文协议暴露特定功能
  • 本地数据源 :您电脑上的文件、数据库以及 MCP 服务器可以安全访问的服务
  • 远程服务 :通过互联网(例如通过 API)可用的外部系统,MCP 服务器可以连接到这些系统

一个简单的 MCP

Step1: 从方法入手

import arxiv
import json
import os
from typing import List
from dotenv import load_dotenv
 
PAPER_DIR = "papers"
 
def search_papers(topic: str, max_results: int = 10) -> List[str]:
    """
    Search for papers on arXiv based on a given topic and store their information in a JSON file.
 
    Args:
        topic (str): The topic to search for.
        max_results (int, optional): The maximum number of results to return. Defaults to 10.
 
    Returns:
        List of paper IDs found in the search results.
    """
 
    client = arxiv.Client()
 
    search = arxiv.Search(
        query=topic, 
        max_results=max_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
 
    papers = client.results(search)
 
    path = os.path.join(PAPER_DIR, topic.lower().replace(" ", "_"))
    os.makedirs(path, exist_ok=True)
 
    file_path = os.path.join(path, "papers_info.json")
 
    try:
        with open(file_path, "r") as json_file:
            papers_info = json.load(json_file)
    except (FileNotFoundError, json.JSONDecodeError):
        papers_info = {}
 
    paper_ids = []
 
    for paper in papers:
        paper_ids.append(paper.get_short_id())
        paper_info = {
            'title': paper.title,
            'authors': [author.name for author in paper.authors],
            'summary': paper.summary,
            'pdf_url': paper.pdf_url,
            'published': str(paper.published.date()),
        }
        papers_info[paper.get_short_id()] = paper_info
 
    with open(file_path, "w") as json_file:
        json.dump(papers_info, json_file, indent=2)
 
    print(f"Results are stored in: {file_path}")
 
    return paper_ids
 
def extract_info(paper_id: str) -> str:
    """
    Search for information about a specific paper across all topic directories.
 
    Args:
        paper_id (str): The ID of the paper to search for.
 
    Returns:
        JSON string with paper information if found, None otherwise.
    """
 
    for item in os.listdir(PAPER_DIR):
        item_path = os.path.join(PAPER_DIR, item)
        if os.path.isdir(item_path):
            file_path = os.path.join(item_path, "papers_info.json")
            if os.path.isfile(file_path):
                try:
                    with open(file_path, "r") as json_file:
                        papers_info = json.load(json_file)
                        if paper_id in papers_info:
                            return json.dumps(papers_info[paper_id], indent=2)
                except (FileNotFoundError, json.JSONDecodeError) as e:
                    print(f"Error reading {file_path}: {str(e)}")
                    continue
    
    return f"There is no saved information about {paper_id}"
 

上述是两个简单的方法,search_papers 用于从 arxiv 搜索指定主题的论文,将论文内容按照指定格式写入配置的目录下,并返回论文 id;extract_info 从写入的文件中,获取指定 id 的内容详情

step2: 构建 exectue_tool

mapping_tool_funtions = {
    "search_papers": search_papers,
    "extract_info": extract_info
}
 
def exectue_tool(tool_name: str, tool_args: dict):
 
    result = mapping_tool_funtions[tool_name](**tool_args)
 
    if result is None:
        result = "The operation completed but didn't return any results"
 
    elif isinstance(result, list):
        result = ','.join(result)
 
    elif isinstance(result, dict):
        result = json.dumps(result, indent=2)
 
    else:
        result = str(result)
 
    return result

step3: 加入大模型

import os
import json
from dotenv import load_dotenv
from openai import OpenAI
 
load_dotenv()
 
MODEL_PREFIX = "GEMINI"
 
api_key = os.getenv(f"{MODEL_PREFIX}_API_KEY")
base_url = os.getenv(f"{MODEL_PREFIX}_BASE_URL")
 
client = OpenAI(api_key=api_key, base_url=base_url)
 
model = "gemini-2.0-flash-exp"
 
tools_function = [
    {
        "type": "function",
        "function": {
            "name": "search_papers",
            "description": "Search for papers on arXiv based on a given topic and store their information in a JSON file.",
            "parameters": {
                "type": "object",
                "properties": {
                    "topic": {
                        "type": "string",
                        "description": "The topic to search for.",
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "The maximum number of results to return. Defaults to 10.",
                        "default": 5,
                    },
                },
                "required": ["topic"],
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "extract_info",
            "description": "Extract information from a paper based on its ID.",
            "parameters": {
                "type": "object",
                "properties": {
                    "paper_id": {
                        "type": "string",
                        "description": "The ID of the paper to extract information from.",
                    },
                },
                "required": ["paper_id"],
            }
        }
    }
]
 
PAPER_DIR = "papers"
 
def search_papers(topic: str, max_results: int = 10) -> List[str]:
    """
    Search for papers on arXiv based on a given topic and store their information in a JSON file.
 
    Args:
        topic (str): The topic to search for.
        max_results (int, optional): The maximum number of results to return. Defaults to 10.
 
    Returns:
        List of paper IDs found in the search results.
    """
 
    client = arxiv.Client()
 
    search = arxiv.Search(
        query=topic, 
        max_results=max_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
 
    papers = client.results(search)
 
    path = os.path.join(PAPER_DIR, topic.lower().replace(" ", "_"))
    os.makedirs(path, exist_ok=True)
 
    file_path = os.path.join(path, "papers_info.json")
 
    try:
        with open(file_path, "r") as json_file:
            papers_info = json.load(json_file)
    except (FileNotFoundError, json.JSONDecodeError):
        papers_info = {}
 
    paper_ids = []
 
    for paper in papers:
        paper_ids.append(paper.get_short_id())
        paper_info = {
            'title': paper.title,
            'authors': [author.name for author in paper.authors],
            'summary': paper.summary,
            'pdf_url': paper.pdf_url,
            'published': str(paper.published.date()),
        }
        papers_info[paper.get_short_id()] = paper_info
 
    with open(file_path, "w") as json_file:
        json.dump(papers_info, json_file, indent=2)
 
    print(f"Results are stored in: {file_path}")
 
    return paper_ids
 
def extract_info(paper_id: str) -> str:
    """
    Search for information about a specific paper across all topic directories.
 
    Args:
        paper_id (str): The ID of the paper to search for.
 
    Returns:
        JSON string with paper information if found, None otherwise.
    """
 
    for item in os.listdir(PAPER_DIR):
        item_path = os.path.join(PAPER_DIR, item)
        if os.path.isdir(item_path):
            file_path = os.path.join(item_path, "papers_info.json")
            if os.path.isfile(file_path):
                try:
                    with open(file_path, "r") as json_file:
                        papers_info = json.load(json_file)
                        if paper_id in papers_info:
                            return json.dumps(papers_info[paper_id], indent=2)
                except (FileNotFoundError, json.JSONDecodeError) as e:
                    print(f"Error reading {file_path}: {str(e)}")
                    continue
    
    return f"There is no saved information about {paper_id}"
 
mapping_tool_funtions = {
    "search_papers": search_papers,
    "extract_info": extract_info
}
 
def exectue_tool(tool_name: str, tool_args: dict):
 
    result = mapping_tool_funtions[tool_name](**tool_args)
 
    if result is None:
        result = "The operation completed but didn't return any results"
 
    elif isinstance(result, list):
        result = ','.join(result)
 
    elif isinstance(result, dict):
        result = json.dumps(result, indent=2)
 
    else:
        result = str(result)
 
    return result
 
def process_query(query):
 
    messages = [{'role': 'user', 'content': query}]
 
    response = client.chat.completions.create(
        model=model,
        tools=tools_function,
        messages=messages,
        tool_choice="auto",
    ) 
 
    process_quert = True
 
    while process_quert:
        assistant_content = []
 
        for choice in response.choices:
            if choice.finish_reason == 'stop':
 
                print(choice.message.content)
                assistant_content.append(choice.message.content)
 
                if len(response.choices) == 1:
                    process_quert = False
 
            elif choice.finish_reason == 'tool_calls':
 
                assistant_content.append(choice)
                messages.append(choice.message)
 
                call_message = []
 
                for call in choice.message.tool_calls:
 
                    tool_args = call.function.arguments
                    tool_name = call.function.name
 
                    print(f"Calling tool: {tool_name} with args: {tool_args}")
 
                    result = exectue_tool(tool_name, json.loads(tool_args))
 
                    call_message.append({
                        "role": "function",
                        "name": tool_name,
                        "content": result
                    })
 
                messages.append(call_message)
 
                response = client.chat.completions.create(
                    model=model,
                    tools=tools_function,
                    messages=messages,
                    tool_choice="auto",
                ) 
 
                if len(response.choices) == 1 and response.choices[0].finish_reason == 'stop':
                    print(response.choices[0].message.content)
                    process_quert = False
 
def chat_loop():
    print("Type your queries or type 'quit' to exit")
    while True:
        try:
            query = input("\nQuery:").strip()
            if query.lower() == "quit":
                break
 
            process_query(query)
            print("\n")
        except Exception as e:
            print(f"An error occurred: {str(e)}")
 
 
if __name__ == "__main__":
    chat_loop()
 

step4: 将方法改成 MCP Tool

import arxiv
import json
import os
from typing import List
from mcp.server.fastmcp import FastMCP
 
mcp = FastMCP("research")
 
PAPER_DIR = "papers"
 
@mcp.tool()
def search_papers(topic: str, max_results: int = 5) -> List[str]:
    """
    Search for papers on arXiv based on a given topic and store their information in a JSON file.
 
    Args:
        topic (str): The topic to search for.
        max_results (int, optional): The maximum number of results to return. Defaults to 5.
 
    Returns:
        List of paper IDs found in the search results.
    """
 
    client = arxiv.Client()
 
    search = arxiv.Search(
        query=topic, 
        max_results=max_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
 
    papers = client.results(search)
 
    path = os.path.join(PAPER_DIR, topic.lower().replace(" ", "_"))
    os.makedirs(path, exist_ok=True)
 
    file_path = os.path.join(path, "papers_info.json")
 
    try:
        with open(file_path, "r") as json_file:
            papers_info = json.load(json_file)
    except (FileNotFoundError, json.JSONDecodeError):
        papers_info = {}
 
    paper_ids = []
 
    for paper in papers:
        paper_ids.append(paper.get_short_id())
        paper_info = {
            'title': paper.title,
            'authors': [author.name for author in paper.authors],
            'summary': paper.summary,
            'pdf_url': paper.pdf_url,
            'published': str(paper.published.date()),
        }
        papers_info[paper.get_short_id()] = paper_info
 
    with open(file_path, "w") as json_file:
        json.dump(papers_info, json_file, indent=2)
 
    print(f"Results are stored in: {file_path}")
 
    return paper_ids
 
@mcp.tool()
def extract_info(paper_id: str) -> str:
    """
    Search for information about a specific paper across all topic directories.
 
    Args:
        paper_id (str): The ID of the paper to search for.
 
    Returns:
        JSON string with paper information if found, None otherwise.
    """
 
    for item in os.listdir(PAPER_DIR):
        item_path = os.path.join(PAPER_DIR, item)
        if os.path.isdir(item_path):
            file_path = os.path.join(item_path, "papers_info.json")
            if os.path.isfile(file_path):
                try:
                    with open(file_path, "r") as json_file:
                        papers_info = json.load(json_file)
                        if paper_id in papers_info:
                            return json.dumps(papers_info[paper_id], indent=2)
                except (FileNotFoundError, json.JSONDecodeError) as e:
                    print(f"Error reading {file_path}: {str(e)}")
                    continue
    
    return f"There is no saved information about {paper_id}"
 
 
if __name__ == "__main__":
    mcp.run(transport="stdio")

这里我们可以使用 MCP Inspector 来检验一下我们写的 MCP Server 是否达到我们想要的效果

npx @modelcontextprotocol/inspector uv run step4.py

step5: MCP 加入大模型调用

import os
import json
import asyncio
import nest_asyncio
from dotenv import load_dotenv
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List
 
nest_asyncio.apply()
 
load_dotenv()
 
MODEL_PREFIX = "GEMINI"
 
api_key = os.getenv(f"{MODEL_PREFIX}_API_KEY")
base_url = os.getenv(f"{MODEL_PREFIX}_BASE_URL")
 
client = OpenAI(api_key=api_key, base_url=base_url)
 
model = "gemini-2.5-flash-preview-04-17"
 
class MCP_ChatBot:
 
    def __init__(self):
        self.session: ClientSession = None
        self.client = client
        self.available_tools: List[dict] = []
 
    async def process_query(self, query):
 
        messages = [{'role': 'user', 'content': query}]
 
        response = self.client.chat.completions.create(
            model=model,
            tools=self.available_tools,
            messages=messages,
            tool_choice="auto",
        ) 
 
        process_quert = True
 
        while process_quert:
            assistant_content = []
 
            for choice in response.choices:
                if choice.finish_reason == 'stop':
 
                    print(choice.message.content)
                    assistant_content.append(choice.message.content)
 
                    if len(response.choices) == 1:
                        process_quert = False
 
                elif choice.finish_reason == 'tool_calls':
 
                    assistant_content.append(choice)
                    messages.append(choice.message)
 
                    call_message = []
 
                    for call in choice.message.tool_calls:
 
                        tool_args = call.function.arguments
                        tool_name = call.function.name
 
                        print(f"Calling tool: {tool_name} with args: {tool_args}")
 
                        result = await self.session.call_tool(tool_name, arguments=json.loads(tool_args))
 
                        call_message.append({
                            "role": "function",
                            "name": tool_name,
                            "content": result.content[0].text
                        })
 
                    messages.append(call_message)
 
                    response = self.client.chat.completions.create(
                        model=model,
                        tools=self.available_tools,
                        messages=messages,
                        tool_choice="auto",
                    ) 
 
                    if len(response.choices) == 1 and response.choices[0].finish_reason == 'stop':
                        print(response.choices[0].message.content)
                        process_quert = False
 
    async def chat_loop(self):
        print("Type your queries or type 'quit' to exit")
        while True:
            try:
                query = input("\nQuery:").strip()
                if query.lower() == "quit":
                    break
 
                await self.process_query(query)
                print("\n")
            except Exception as e:
                print(f"An error occurred: {str(e)}")
 
    async def connect_to_server_and_run(self):
        server_params = StdioServerParameters(
            command="uv",
            args=["run", "step4.py"],
            env=None
        )
        async with stdio_client(server_params) as (read, write):
            async with ClientSession(read, write) as session:
                self.session = session
                await session.initialize()
 
                response = await session.list_tools()
 
                tools = response.tools
                print("\nConnecting to server with tools: ", [tool.name for tool in tools])
 
                self.available_tools = [{
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema
                    }
                } for tool in tools]
 
                await self.chat_loop()
 
async def main():
    chatbot = MCP_ChatBot()
    await chatbot.connect_to_server_and_run()
 
if __name__ == "__main__":
    asyncio.run(main())
 

如何让大模型调用更多的 MCP Server

通过配置化

比如,通过以下的配置结构,将 MCP Server 与 Agent 解藕,MCP 无需跟 Agent 绑在一起,而是通过配置化的方式,动态调整

{
  "mcpServers": {
    "fetch": {
      "command": "uvx",
      "args": [
        "mcp-server-fetch"
      ]
    },
    "research": {
      "command": "uv",
      "args": [
        "run",
        "step4.py"
      ]
    },
    "filesystem": {
        "command": "npx",
        "args": [
            "-y",
            "@modelcontextprotocol/server-filesystem",
            "."
        ]
    }
  }
}

step6: 在 Agent 中接入

import os
import json
import asyncio
import nest_asyncio
from dotenv import load_dotenv
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from typing import List, Dict, TypedDict
from contextlib import AsyncExitStack
 
nest_asyncio.apply()
 
load_dotenv()
 
MODEL_PREFIX = "GEMINI"
 
api_key = os.getenv(f"{MODEL_PREFIX}_API_KEY")
base_url = os.getenv(f"{MODEL_PREFIX}_BASE_URL")
 
client = OpenAI(api_key=api_key, base_url=base_url)
 
model = "gemini-2.5-flash-preview-04-17"
 
class ToolFunction(TypedDict):
    name: str
    description: str
    parameters: dict
 
class ToolDefinition(TypedDict):
    type: str
    function: ToolFunction
 
class MCP_ChatBot:
 
    def __init__(self):
        self.sessions: List[ClientSession] = []
        self.exit_stack = AsyncExitStack()
        self.client = client
        self.available_tools: List[ToolDefinition] = []
        self.tool_to_session: Dict[str, ClientSession] = {}
 
    async def connect_to_server(self, server_name: str, server_config: dict) -> None:
        """Connet to a single MCP server."""
        try:
            server_params = StdioServerParameters(**server_config)
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params)
            )
            read, write = stdio_transport
            session = await self.exit_stack.enter_async_context(
                ClientSession(read, write)
            )
            await session.initialize()
            self.sessions.append(session)
 
            response = await session.list_tools()
 
            tools = response.tools
            print("\nConnecting to server with tools: ", [tool.name for tool in tools])
 
            for tool in tools:
                self.tool_to_session[tool.name] = session
                self.available_tools.append({
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.inputSchema
                    }
                })
            
        except Exception as e:
            print(f"Faild to connet to ${server_name}: {e}")
 
    
    async def connect_to_servers(self):
        """Connect to all configured MCP Servers"""
        try:
            with open("mcp.json") as file:
                data = json.load(file)
            
            servers = data.get("mcpServers", {})
            for server_name, server_config in servers.items():
                await self.connect_to_server(server_name, server_config)
        except Exception as e:
            print(f"Error loading server configuration: {e}")
            raise
 
    async def process_query(self, query):
 
        messages = [{'role': 'user', 'content': query}]
 
        response = self.client.chat.completions.create(
            model=model,
            tools=self.available_tools,
            messages=messages,
            tool_choice="auto",
        ) 
 
        process_quert = True
 
        while process_quert:
            assistant_content = []
 
            for choice in response.choices:
                if choice.finish_reason == 'stop':
 
                    print(choice.message.content)
                    assistant_content.append(choice.message.content)
 
                    if len(response.choices) == 1:
                        process_quert = False
 
                elif choice.finish_reason == 'tool_calls':
 
                    assistant_content.append(choice)
                    messages.append(choice.message)
 
                    call_message = []
 
                    for call in choice.message.tool_calls:
 
                        tool_args = call.function.arguments
                        tool_name = call.function.name
 
                        print(f"Calling tool: {tool_name} with args: {tool_args}")
 
                        session = self.tool_to_session[tool_name]
                        result = await session.call_tool(tool_name, arguments=json.loads(tool_args))
 
                        call_message.append({
                            "role": "function",
                            "name": tool_name,
                            "content": result.content[0].text
                        })
 
                    messages.append(call_message)
 
                    response = self.client.chat.completions.create(
                        model=model,
                        tools=self.available_tools,
                        messages=messages,
                        tool_choice="auto",
                    ) 
 
                    if len(response.choices) == 1 and response.choices[0].finish_reason == 'stop':
                        print(response.choices[0].message.content)
                        process_quert = False
 
    async def chat_loop(self):
        print("Type your queries or type 'quit' to exit")
        while True:
            try:
                query = input("\nQuery:").strip()
                if query.lower() == "quit":
                    break
 
                await self.process_query(query)
                print("\n")
            except Exception as e:
                print(f"An error occurred: {str(e)}")
 
    async def cleanup(self):
        await self.exit_stack.aclose()
 
async def main():
    chatbot = MCP_ChatBot()
    try:
        await chatbot.connect_to_servers()
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup()
 
if __name__ == "__main__":
    asyncio.run(main())
 

其中 connect_to_servers 是将 MCP Server 配置逐个进行连接,达到动态配置的效果

我们可以直接运行

uv run step6.py

总结

通过以上代码和配置,我们实现了一个动态连接 MCP Server 的聊天机器人系统,能够根据用户查询调用相应的工具并处理复杂的交互流程。如果再动态支持选择不同的模型厂商和模型,那么可以实现多模型+多MCP的不同组合应用,基本能够实现一个简易版的 Cherry Studio 了。如何动态接入不同的LLMs,这个可以感兴趣自己实现一下