Coverage for src/pip_project_template/mcp_servers/McpServer01.py: 100.00%
46 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-08-28 08:53 +1000
« prev ^ index » next coverage.py v7.9.2, created at 2025-08-28 08:53 +1000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Timestamp: "2025-08-27 01:43:35 (ywatanabe)"
4# File: /home/ywatanabe/proj/pip-project-template/src/mcp_servers/McpServer01.py
5# ----------------------------------------
6from __future__ import annotations
7import os
8__FILE__ = (
9 "./pip-project-template/src/mcp_servers/McpServer01.py"
10)
11__DIR__ = os.path.dirname(__FILE__)
12# ----------------------------------------
14"""FastMCP Server 01 implementation."""
16from typing import Any, Dict
18from fastmcp import FastMCP
20from ..core._Calculator import Calculator
22# Create FastMCP server instance
23mcp = FastMCP("mcp-server-01")
25# Initialize calculator
26calculator = Calculator()
29@mcp.tool
30def add_numbers(a: float, b: float) -> str:
31 """Add two numbers.
33 Args:
34 a: First number
35 b: Second number
37 Returns:
38 String representation of the result
39 """
40 result = calculator.calculate(a, b, "add")
41 return str(result)
44@mcp.tool
45def multiply_numbers(a: float, b: float) -> str:
46 """Multiply two numbers.
48 Args:
49 a: First number
50 b: Second number
52 Returns:
53 String representation of the result
54 """
55 result = calculator.calculate(a, b, "multiply")
56 return str(result)
59@mcp.tool
60def batch_calculate(operations: list) -> Dict[str, Any]:
61 """Perform multiple calculations in batch.
63 Args:
64 operations: List of operations, each with 'a', 'b', and 'operation' keys
66 Returns:
67 Dictionary with batch results
68 """
69 results = []
70 for i, op in enumerate(operations):
71 try:
72 a = float(op.get('a', 0))
73 b = float(op.get('b', 0))
74 operation = op.get('operation', 'add')
76 result = calculator.calculate(a, b, operation)
78 results.append({
79 "index": i,
80 "result": result,
81 "operation": operation,
82 "inputs": {"a": a, "b": b}
83 })
84 except Exception as e:
85 results.append({
86 "index": i,
87 "error": str(e),
88 "operation": op.get('operation', 'unknown'),
89 "inputs": op
90 })
92 return {
93 "success": True,
94 "batch_size": len(operations),
95 "results": results
96 }
99@mcp.resource("server://status")
100def server_status():
101 """Server status resource."""
102 return {"status": "running", "server": "McpServer01", "tools_available": 2}
105def run_server(
106 transport: str = "stdio", host: str = "localhost", port: int = 8081
107):
108 """Run the MCP server with specified transport."""
109 if transport == "stdio":
110 mcp.run(transport="stdio")
111 elif transport == "http":
112 mcp.run(transport="http", host=host, port=port, path="/mcp")
113 elif transport == "sse":
114 mcp.run(transport="sse", host=host, port=port)
115 else:
116 raise ValueError(f"Unsupported transport: {transport}")
119def main():
120 """Run MCP server in STDIO mode."""
121 run_server()
124if __name__ == "__main__":
125 main()
127# EOF