在Cursor中用python开发MCP Server(stdio 和sse两种方式)


一、安装 uv 进行python包管理

安装方式

#On macOS and Linux.
curl -LsSf https://astral.sh/uv/install.sh | sh
#On Windows.
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

  下面我以windows为例,因为我的是windows😁

640-de73b80b5206981975c6e32f0988a73b.png

二、创建MCP服务

1、初始化weather项目

uv init weather

640-41b584ad7a75c7fb31a90851753765d3.png

2、创建虚拟环境并且添加mcp相关依赖

cd weather
# Create virtual environment and activate it
uv venv
.venv\Scripts\activate
# Install dependencies
uv add mcp[cli] httpx
# Create our server file。new-item 是powershell 命令,用于创建文件
new-item weather.py

640-478633761aaf1121ec5334cc02f896b1.png

3、用cursor写一个查询天气的weather.py

from typing import Any, Final, TypedDict, List, Optional
import httpx  
from mcp.server.fastmcp import FastMCP  
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
from functools import wraps, lru_cache
from dataclasses import dataclass
from typing import Optional
import os
from datetime import datetime, timedelta
  
# Initialize FastMCP server  
mcp = FastMCP("weather")  
  
# 从MCP配置中获取API密钥
# Constants  
NWS_API_BASE: Final = "https://api.weather.gov"  
USER_AGENT: Final = "weather-app/1.0"  
CACHE_EXPIRATION: Final = 300  # 5 minutes  
MAX_RETRIES: Final = 3  
REQUEST_TIMEOUT: Final = 30.0  
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('weather_service.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
  
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1min=4max=10))
async def make_nws_request(url: str) -> dict[str, Any] | None:  
    """Make a request to the NWS API with proper error handling and retry mechanism."""  
    headers = {  
        "User-Agent": USER_AGENT,  
        "Accept""application/geo+json",  
        "Content-Type""application/json"  # 添加内容类型头
    }  
    async with httpx.AsyncClient(timeout=30.0as client:  # 直接在客户端设置超时
        try:  
            response = await client.get(url, headers=headers)  
            response.raise_for_status()  
            return response.json()  
        except httpx.HTTPStatusError as e:  
            logger.error(f"HTTP error occurred: {e.response.status_code}")  
            return None  
        except httpx.RequestError as e:  
            print(f"Request error occurred: {str(e)}")  
            return None  
        except Exception as e:  
            print(f"Unexpected error: {str(e)}")  
            return None  
  
def format_alert(feature: dict) -> str:  
    """Format an alert feature into a readable string."""  
    props = feature["properties"]  
    return f"""  
Event: {props.get('event''Unknown')}  
Area: {props.get('areaDesc''Unknown')}  
Severity: {props.get('severity''Unknown')}  
Description: {props.get('description''No description available')}  
Instructions: {props.get('instruction''No specific instructions provided')}  
"""  
  
@mcp.tool()  
async def get_alerts(state: str) -> str:  
    """Get weather alerts for a US state.  
  
    Args:        state: Two-letter US state code (e.g. CA, NY)    """    
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"  
    data = await make_nws_request(url)  
  
    if not data or "features" not in data:  
        return "Unable to fetch alerts or no alerts found."  
  
    if not data["features"]:  
        return "No active alerts for this state."  
  
    alerts = [format_alert(feature) for feature in data["features"]]  
    return "\n---\n".join(alerts)  
  
def validate_coordinates(func):
    @wraps(func)
    async def wrapper(latitude: float, longitude: float, *args, **kwargs):
        try:
            lat = float(latitude)
            lon = float(longitude)
            if not (-90 <= lat <= 90or not (-180 <= lon <= 180):
                return "Invalid latitude or longitude values"
            return await func(lat, lon, *args, **kwargs)
        except ValueError:
            return "Coordinates must be valid numbers"
    return wrapper
@mcp.tool()
@validate_coordinates
async def get_forecast(latitude: float, longitude: float) -> str:  
    """Get weather forecast for a location.  
  
    Args:        latitude: Latitude of the location        longitude: Longitude of the location    """    
    try:
        # 从MCP配置获取API密钥
        api_key = os.getenv("openweather_api_key")
        if not api_key:
            return "请在MCP配置中设置 openweather_api_key"
            
        url = f"https://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={api_key}&units=metric"
        
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            data = response.json()
            
            if response.status_code == 200:
                return format_openweather_data(data)
            else:
                logger.error(f"OpenWeather API error: {response.status_code}")
                return "Weather service temporarily unavailable1"
                
    except Exception as e:
        logger.error(f"Forecast error: {str(e)}")
        return "Weather service temporarily unavailable2"
  
@dataclass
class WeatherData:
    temperature: float
    condition: str
    wind_speed: str
    wind_direction: str
    description: str
def format_openweather_data(data: dict) -> str:
    """Format OpenWeatherMap API response data."""
    try:
        # 获取温度,默认为0
        temp = data.get('main', {}).get('temp'0)
        
        # 获取天气状况,默认为未知
        weather = data.get('weather', [{}])[0]
        condition = weather.get('main''Unknown')
        description = weather.get('description''No description available')
        
        # 获取风速,默认为0
        wind = data.get('wind', {})
        wind_speed = f"{wind.get('speed'0)} m/s"
        
        # 获取风向,如果deg不存在则显示未知方向
        wind_deg = wind.get('deg')
        wind_dir = get_wind_direction(wind_deg) if wind_deg is not None else 'Unknown direction'
        
        return f"""Current Weather:
Temperature: {temp}°C
Condition: {condition}
Wind: {wind_speed} {wind_dir}
Description: {description}"""
    except Exception as e:
        logger.error(f"Error formatting weather data: {e}")
        return "Unable to format weather data"
def get_wind_direction(degrees: float) -> str:
    """Convert wind degrees to cardinal direction."""
    directions = ["N""NE""E""SE""S""SW""W""NW"]
    index = round(degrees / 45) % 8
    return directions[index]
async def get_backup_forecast(latitude: float, longitude: float) -> str:
    """备用天气API处理函数"""
    try:
        # 从MCP配置获取API密钥
        api_key = os.getenv("openweather_api_key")
        if not api_key:
            return "请在MCP配置中设置 openweather_api_key"
            
        url = f"https://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={api_key}&units=metric"
        
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            data = response.json()
            
            if response.status_code == 200:
                return format_openweather_data(data)
            else:
                logger.error(f"Backup API error: {response.status_code}")
                return "Weather service temporarily unavailable3"
                
    except Exception as e:
        logger.error(f"Backup forecast error: {str(e)}")
        return "Weather service temporarily unavailable4"  
  
class TimedCache:
    def __init__(self, expiration_time: int = 300):  # 默认5分钟过期
        self.cache = {}
        self.expiration_time = expiration_time
    def get(self, key: str) -> Optional[dict]:
        if key in self.cache:
            data, timestamp = self.cache[key]
            if datetime.now() - timestamp < timedelta(seconds=self.expiration_time):
                return data
            else:
                del self.cache[key]
        return None
    def set(self, key: str, value: dict):
        self.cache[key] = (value, datetime.now())
weather_cache = TimedCache()
  
if __name__ == "__main__":  
    # Initialize and run the server  
    mcp.run(transport='stdio')
class WeatherPeriod(TypedDict):
    name: str
    temperature: float
    temperatureUnit: str
    windSpeed: str
    windDirection: str
    detailedForecast: str
class ForecastResponse(TypedDict):
    properties: dict
    periods: List[WeatherPeriod]

4、 客户端配置MCP服务

{
  "mcpServers": {
    "weather": {
      "command""uv",
      "args": [
          "--directory",
          "D:/BaiduNetdiskWorkspace/cy工作空间/aicoding/mcp/server/weather"//改成你项目的绝对路径
          "run",
          "weather.py"
      ],
      "env": {
        "openweather_api_key""天气的Apikey去申请" //申请地址:https://openweathermap.org/api
      }
    }
   }
}

640-4993df3dd35106a896b98685aca40b47.png

成功了就是这个图上的样子*如果遇到下面这张图的错误,或没有启动起来,就执行下面这个

uv add tenacity

640-b582f40fc0f94f168f63f5cf558498df.png

来测试一下是否会用我们的工具:如下图,没有问题,能正常调用到了

640-f811330c737f19b9dc0dd334900d6da8.png

5、现在我们改造一下,改为sse方式调用,我们重新建一个weather-sse.py

new-item weather-sse.py
from typing import Any, Fi    nal, TypedDict, List, Optional
import httpx
from mcp.server.lowlevel import Server
import mcp.types as types
import os
import logging
from datetime import datetime, timedelta
from functools import wraps
from dataclasses import dataclass
import json
# 初始化服务器
app = Server("weather-server")
# 常量
NWS_API_BASE: Final = "https://api.weather.gov"
USER_AGENT: Final = "weather-app/1.0"
CACHE_EXPIRATION: Final = 300  # 5分钟
MAX_RETRIES: Final = 3
REQUEST_TIMEOUT: Final = 30.0
# 存储API密钥 - 多种方式获取
OPENWEATHER_API_KEYstr = os.getenv("openweather_api_key""")
# 如果环境变量没有设置,尝试使用配置文件中的默认值(MCP配置中的值)
if not OPENWEATHER_API_KEY:
    OPENWEATHER_API_KEY = ""  # 从mcp.json复制过来的值,用于临时测试
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('weather_service.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
    """处理来自客户端的工具调用。"""
    try:
        if name == "get_alerts":
            state = arguments.get("state""")
            if not state:
                return [types.TextContent(type="text", text="错误:请提供州代码")]
            result = await get_alerts(state)
            return [types.TextContent(type="text", text=result)]
        elif name == "get_forecast":
            latitude = arguments.get("latitude")
            longitude = arguments.get("longitude")
            if latitude is None or longitude is None:
                return [types.TextContent(type="text", text="错误:请提供纬度和经度")]
            result = await get_forecast(latitude, longitude)
            return [types.TextContent(type="text", text=result)]
        else:
            return [types.TextContent(type="text", text=f"错误:未知工具:{name}")]
    except Exception as e:
        logger.error(f"工具调用错误:{str(e)}")
        return [types.TextContent(type="text", text=f"错误:{str(e)}")]
async def make_nws_request(url: str) -> dict[str, Any] | None:  
    """Make a request to the NWS API with proper error handling and retry mechanism."""  
    headers = {  
        "User-Agent": USER_AGENT,  
        "Accept""application/geo+json",  
        "Content-Type""application/json"  # 添加内容类型头
    }  
    async with httpx.AsyncClient(timeout=30.0as client:  # 直接在客户端设置超时
        try:  
            response = await client.get(url, headers=headers)  
            response.raise_for_status()  
            return response.json()  
        except httpx.HTTPStatusError as e:  
            logger.error(f"HTTP error occurred: {e.response.status_code}")  
            return None  
        except httpx.RequestError as e:  
            print(f"Request error occurred: {str(e)}")  
            return None  
        except Exception as e:  
            print(f"Unexpected error: {str(e)}")  
            return None  
async def get_alerts(state: str) -> str:
    """获取指定州的天气警报"""
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"
    data = await make_nws_request(url)
    if not data or "features" not in data:
        return "无法获取警报或未找到警报"
    if not data["features"]:
        return "该州没有活跃的警报"
    alerts = [format_alert(feature) for feature in data["features"]]
    return "\n---\n".join(alerts)
def format_alert(feature: dict) -> str:
    """格式化警报信息"""
    props = feature["properties"]
    return f"""
事件:{props.get('event''未知')}
地区:{props.get('areaDesc''未知')}
严重程度:{props.get('severity''未知')}
描述:{props.get('description''无可用描述')}
说明:{props.get('instruction''未提供具体说明')}
"""
async def get_forecast(latitude: float, longitude: float) -> str:
    """获取指定位置的天气预报"""
    global OPENWEATHER_API_KEY
    
    if not OPENWEATHER_API_KEY:
        return "请在MCP配置中设置 openweather_api_key"
    url = f"https://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={OPENWEATHER_API_KEY}&units=metric"
    
    async with httpx.AsyncClient(timeout=30.0as client:
        try:
            response = await client.get(url)
            response.raise_for_status()
            data = response.json()
            return format_openweather_data(data)
        except httpx.HTTPStatusError as e:
            logger.error(f"OpenWeather API HTTP错误:{e.response.status_code}")
            return f"天气服务错误:{e.response.status_code}"
        except httpx.RequestError as e:
            logger.error(f"OpenWeather API 请求错误:{str(e)}")
            return "天气服务暂时不可用"
        except Exception as e:
            logger.error(f"OpenWeather API 未知错误:{str(e)}")
            return "天气服务暂时不可用"
def format_openweather_data(data: dict) -> str:
    """格式化OpenWeatherMap API响应数据"""
    try:
        temp = data.get('main', {}).get('temp'0)
        weather = data.get('weather', [{}])[0]
        condition = weather.get('main''未知')
        description = weather.get('description''无可用描述')
        wind = data.get('wind', {})
        wind_speed = f"{wind.get('speed'0)} m/s"
        wind_deg = wind.get('deg')
        wind_dir = get_wind_direction(wind_deg) if wind_deg is not None else '未知方向'
        
        return f"""当前天气:
温度:{temp}°C
状况:{condition}
风速:{wind_speed} {wind_dir}
描述:{description}"""
    except Exception as e:
        logger.error(f"格式化天气数据错误:{e}")
        return "无法格式化天气数据"
def get_wind_direction(degrees: float) -> str:
    """将风向度数转换为方位"""
    directions = ["北""东北""东""东南""南""西南""西""西北"]
    index = round(degrees / 45) % 8
    return directions[index]
@app.list_tools()
async def list_tools() -> list[types.Tool]:
    """列出可用工具"""
    return [
        types.Tool(
            name="get_alerts",
            description="获取指定州的天气警报",
            inputSchema={
                "type""object",
                "required": ["state"],
                "properties": {
                    "state": {"type""string""description""两字母州代码(例如 CA, NY)"}
                }
            }
        ),
        types.Tool(
            name="get_forecast",
            description="获取指定位置的天气预报",
            inputSchema={
                "type""object",
                "required": ["latitude""longitude"],
                "properties": {
                    "latitude": {"type""number""description""位置的纬度"},
                    "longitude": {"type""number""description""位置的经度"}
                }
            }
        )
    ]
if __name__ == "__main__":
    import sys
    # 默认使用标准输入输出传输
    transport = "sse"
    port = 9000
    # 检查命令行参数
    if len(sys.argv) > 1:
        if sys.argv[1] == "sse":
            transport = "sse"
        if len(sys.argv) > 2:
            try:
                port = int(sys.argv[2])
            except ValueError:
                pass
    if transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import Starlette
        from starlette.routing import Mount, Route
        import uvicorn
        
        # 使用通配符路径,允许捕获路径中的API密钥
        sse = SseServerTransport("/messages/")
        
        async def handle_sse(request):
            global OPENWEATHER_API_KEY
            
            # 尝试从路由参数中获取API密钥
            api_key = request.path_params.get("api_key")
            if api_key:
                OPENWEATHER_API_KEY = api_key
                logger.info(f"从URL路径参数成功获取OpenWeather API密钥: {api_key[:4]}...{api_key[-4:]}")
            else:
                # 如果路由参数中没有API密钥,尝试从URL路径中提取
                path = request.scope["path"]
                logger.info(f"请求路径: {path}")
                
                if path.startswith('/sse/'):
                    # 从路径中提取密钥 - 格式 /sse/API_KEY
                    path_parts = path.split('/sse/')
                    if len(path_parts) > 1 and path_parts[1]:
                        OPENWEATHER_API_KEY = path_parts[1]
                        logger.info(f"从URL路径成功获取OpenWeather API密钥: {OPENWEATHER_API_KEY[:4]}...{OPENWEATHER_API_KEY[-4:]}")
                else:
                    # 如果URL中没有找到密钥,尝试从环境变量获取
                    api_keys_to_try = [
                        os.getenv("openweather_api_key"),           # 系统环境变量
                        os.getenv("OPENWEATHER_API_KEY"),           # 大写变量名
                        os.environ.get("openweather_api_key")       # environ字典
                    ]
                    
                    # 尝试所有可能的API密钥获取方式
                    for key in api_keys_to_try:
                        if key:
                            OPENWEATHER_API_KEY = key
                            logger.info(f"成功获取OpenWeather API密钥: {key[:4]}...{key[-4:]}")
                            break
            
            if not OPENWEATHER_API_KEY:
                logger.warning("无法从任何来源获取OpenWeather API密钥")
            
            # 创建初始化选项,但不尝试修改它
            init_options = app.create_initialization_options()
            
            # 连接SSE并运行应用
            async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
                await app.run(streams[0], streams[1], init_options)
        
        # 添加两个路由:一个是基本的/sse路径,另一个是带密钥的/sse/{api_key}路径
        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse),
                Route("/sse/{api_key:path}", endpoint=handle_sse),  # 捕获API密钥作为路径参数
                Mount("/messages/", app=sse.handle_post_message),
            ]
        )
        print(f"在端口{port}上启动MCP服务器,使用SSE传输")
        logger.info(f"在端口{port}上启动MCP服务器,使用SSE传输")
        if OPENWEATHER_API_KEY:
            masked_key = f"{OPENWEATHER_API_KEY[:4]}...{OPENWEATHER_API_KEY[-4:]}"
            logger.info(f"OpenWeather API密钥已配置: {masked_key}")
        else:
            logger.warning("OpenWeather API密钥未配置,天气预报功能将不可用")
        logger.info(f"当前环境变量: {os.environ.get('openweather_api_key''未设置')}")
        uvicorn.run(starlette_app, host="0.0.0.0", port=port)
    else:
        from mcp.server.stdio import stdio_server
        import anyio
uv run .\weather-sse.py

640-4d56886532b642bd7a47e8d0d9cab23f.png

到这这就代表see方式也启动了

6、配置sse mcp服务

{
  "mcpServers": {
    "weather": {
      "url""http://localhost:9000/sse/天气的Apikey" //申请地址 https://openweathermap.org/api
    }
   }
}

7、在cursor中链接mcp服务

640-62ca58e0f77edb1f1ebdc3c19f03feb8.png

_成功了就是上图这个样子_我们来测试一下sse端点的方式是否会用来我们的工具:如下图,没有问题,也能正常调用到了

640-ba5984861ad4fa7376d7f48d524e4183.png