Coverage for /Users/antonigmitruk/golf/src/golf/core/parser.py: 63%

585 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-08-16 18:46 +0200

1"""Python file parser for extracting tools, resources, and prompts using AST.""" 

2 

3import ast 

4import re 

5from dataclasses import dataclass 

6from enum import Enum 

7from pathlib import Path 

8from typing import Any 

9 

10from rich.console import Console 

11 

12console = Console() 

13 

14 

15class ComponentType(str, Enum): 

16 """Type of component discovered by the parser.""" 

17 

18 TOOL = "tool" 

19 RESOURCE = "resource" 

20 PROMPT = "prompt" 

21 ROUTE = "route" 

22 UNKNOWN = "unknown" 

23 

24 

25@dataclass 

26class ParsedComponent: 

27 """Represents a parsed MCP component (tool, resource, or prompt).""" 

28 

29 name: str # Derived from file path or explicit name 

30 type: ComponentType 

31 file_path: Path 

32 module_path: str 

33 docstring: str | None = None 

34 input_schema: dict[str, Any] | None = None 

35 output_schema: dict[str, Any] | None = None 

36 uri_template: str | None = None # For resources 

37 parameters: list[str] | None = None # For resources with URI params 

38 parent_module: str | None = None # For nested components 

39 entry_function: str | None = None # Store the name of the function to use 

40 annotations: dict[str, Any] | None = None # Tool annotations for MCP hints 

41 

42 

43class AstParser: 

44 """AST-based parser for extracting MCP components from Python files.""" 

45 

46 def __init__(self, project_root: Path) -> None: 

47 """Initialize the parser. 

48 

49 Args: 

50 project_root: Root directory of the project 

51 """ 

52 self.project_root = project_root 

53 self.components: dict[str, ParsedComponent] = {} 

54 

55 def parse_directory(self, directory: Path) -> list[ParsedComponent]: 

56 """Parse all Python files in a directory recursively.""" 

57 components = [] 

58 

59 for file_path in directory.glob("**/*.py"): 

60 # Skip __pycache__ and other hidden directories 

61 if "__pycache__" in file_path.parts or any(part.startswith(".") for part in file_path.parts): 

62 continue 

63 

64 try: 

65 file_components = self.parse_file(file_path) 

66 components.extend(file_components) 

67 except Exception as e: 

68 relative_path = file_path.relative_to(self.project_root) 

69 console.print(f"[bold red]Error parsing {relative_path}:[/bold red] {e}") 

70 

71 return components 

72 

73 def parse_file(self, file_path: Path) -> list[ParsedComponent]: 

74 """Parse a single Python file using AST to extract MCP components.""" 

75 # Handle common.py files 

76 if file_path.name == "common.py": 

77 # Register as a known shared module but don't return as a component 

78 return [] 

79 

80 # Skip __init__.py files for direct parsing 

81 if file_path.name == "__init__.py": 

82 return [] 

83 

84 # Determine component type based on directory structure 

85 rel_path = file_path.relative_to(self.project_root) 

86 parent_dir = rel_path.parts[0] if rel_path.parts else None 

87 

88 component_type = ComponentType.UNKNOWN 

89 if parent_dir == "tools": 

90 component_type = ComponentType.TOOL 

91 elif parent_dir == "resources": 

92 component_type = ComponentType.RESOURCE 

93 elif parent_dir == "prompts": 

94 component_type = ComponentType.PROMPT 

95 

96 if component_type == ComponentType.UNKNOWN: 

97 return [] # Not in a recognized directory 

98 

99 # Read the file content and parse it with AST 

100 with open(file_path, encoding="utf-8") as f: 

101 file_content = f.read() 

102 

103 try: 

104 tree = ast.parse(file_content) 

105 except SyntaxError as e: 

106 raise ValueError(f"Syntax error in {file_path}: {e}") 

107 

108 # Extract module docstring 

109 module_docstring = ast.get_docstring(tree) 

110 if not module_docstring: 

111 raise ValueError(f"Missing module docstring in {file_path}") 

112 

113 # Find the entry function - look for "export = function_name" pattern, 

114 # or any top-level function (like "run") as a fallback 

115 entry_function = None 

116 export_target = None 

117 

118 # Look for export = function_name assignment 

119 for node in tree.body: 

120 if isinstance(node, ast.Assign): 

121 for target in node.targets: 

122 if isinstance(target, ast.Name) and target.id == "export" and isinstance(node.value, ast.Name): 

123 export_target = node.value.id 

124 break 

125 

126 # Find all top-level functions 

127 functions = [] 

128 for node in tree.body: 

129 if isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef): 

130 functions.append(node) 

131 # If this function matches our export target, it's our entry function 

132 if export_target and node.name == export_target: 

133 entry_function = node 

134 

135 # Check for the run function as a fallback 

136 run_function = None 

137 for func in functions: 

138 if func.name == "run": 

139 run_function = func 

140 

141 # If we have an export but didn't find the target function, warn 

142 if export_target and not entry_function: 

143 console.print(f"[yellow]Warning: Export target '{export_target}' not found in {file_path}[/yellow]") 

144 

145 # Use the export target function if found, otherwise fall back to run 

146 entry_function = entry_function or run_function 

147 

148 # If no valid function found, skip this file 

149 if not entry_function: 

150 return [] 

151 

152 # Create component 

153 component = ParsedComponent( 

154 name="", # Will be set later 

155 type=component_type, 

156 file_path=file_path, 

157 module_path=file_path.relative_to(self.project_root).as_posix(), 

158 docstring=module_docstring, 

159 entry_function=export_target or "run", # Store the name of the entry function 

160 ) 

161 

162 # Process the entry function 

163 self._process_entry_function(component, entry_function, tree, file_path) 

164 

165 # Process other component-specific information 

166 if component_type == ComponentType.TOOL: 

167 self._process_tool(component, tree) 

168 elif component_type == ComponentType.RESOURCE: 

169 self._process_resource(component, tree) 

170 elif component_type == ComponentType.PROMPT: 

171 self._process_prompt(component, tree) 

172 

173 # Set component name based on file path 

174 component.name = self._derive_component_name(file_path, component_type) 

175 

176 # Set parent module if it's in a nested structure 

177 if len(rel_path.parts) > 2: # More than just "tools/file.py" 

178 parent_parts = rel_path.parts[1:-1] # Skip the root category and the file itself 

179 if parent_parts: 

180 component.parent_module = ".".join(parent_parts) 

181 

182 return [component] 

183 

184 def _process_entry_function( 

185 self, 

186 component: ParsedComponent, 

187 func_node: ast.FunctionDef | ast.AsyncFunctionDef, 

188 tree: ast.Module, 

189 file_path: Path, 

190 ) -> None: 

191 """Process the entry function to extract parameters and return type.""" 

192 # Check for return annotation - STRICT requirement 

193 if func_node.returns is None: 

194 raise ValueError(f"Missing return annotation for {func_node.name} function in {file_path}") 

195 

196 # Extract parameter names for basic info 

197 parameters = [] 

198 for arg in func_node.args.args: 

199 # Skip self, cls, ctx parameters 

200 if arg.arg not in ("self", "cls", "ctx"): 

201 parameters.append(arg.arg) 

202 

203 # Store parameters 

204 component.parameters = parameters 

205 

206 # Extract schemas using runtime inspection (safer and more accurate) 

207 try: 

208 self._extract_schemas_at_runtime(component, file_path) 

209 except Exception as e: 

210 console.print(f"[yellow]Warning: Could not extract schemas from {file_path}: {e}[/yellow]") 

211 # Continue without schemas - better than failing the build 

212 

213 def _extract_schemas_at_runtime(self, component: ParsedComponent, file_path: Path) -> None: 

214 """Extract input/output schemas by importing and inspecting the 

215 actual function.""" 

216 import importlib.util 

217 import sys 

218 

219 # Convert file path to module name 

220 rel_path = file_path.relative_to(self.project_root) 

221 module_name = str(rel_path.with_suffix("")).replace("/", ".") 

222 

223 # Temporarily add project root to sys.path 

224 project_root_str = str(self.project_root) 

225 if project_root_str not in sys.path: 

226 sys.path.insert(0, project_root_str) 

227 cleanup_path = True 

228 else: 

229 cleanup_path = False 

230 

231 try: 

232 # Import the module 

233 spec = importlib.util.spec_from_file_location(module_name, file_path) 

234 if spec is None or spec.loader is None: 

235 return 

236 

237 module = importlib.util.module_from_spec(spec) 

238 spec.loader.exec_module(module) 

239 

240 # Get the entry function 

241 if not hasattr(module, component.entry_function): 

242 return 

243 

244 func = getattr(module, component.entry_function) 

245 

246 # Extract input schema from function signature 

247 component.input_schema = self._extract_input_schema(func) 

248 

249 # Extract output schema from return type annotation 

250 component.output_schema = self._extract_output_schema(func) 

251 

252 finally: 

253 # Clean up sys.path 

254 if cleanup_path and project_root_str in sys.path: 

255 sys.path.remove(project_root_str) 

256 

257 def _extract_input_schema(self, func: Any) -> dict[str, Any] | None: 

258 """Extract input schema from function signature using runtime inspection.""" 

259 import inspect 

260 from typing import get_type_hints 

261 

262 try: 

263 sig = inspect.signature(func) 

264 type_hints = get_type_hints(func, include_extras=True) 

265 

266 properties = {} 

267 required = [] 

268 

269 for param_name, param in sig.parameters.items(): 

270 # Skip special parameters 

271 if param_name in ("self", "cls", "ctx"): 

272 continue 

273 

274 # Get type hint 

275 if param_name not in type_hints: 

276 continue 

277 

278 type_hint = type_hints[param_name] 

279 

280 # Extract schema for this parameter 

281 param_schema = self._extract_param_schema_from_hint(type_hint, param_name) 

282 if param_schema: 

283 # Clean the schema to remove problematic objects 

284 cleaned_schema = self._clean_schema(param_schema) 

285 if cleaned_schema: 

286 properties[param_name] = cleaned_schema 

287 

288 # Check if required (no default value) 

289 if param.default is param.empty: 

290 required.append(param_name) 

291 

292 if properties: 

293 return { 

294 "type": "object", 

295 "properties": properties, 

296 "required": required, 

297 } 

298 

299 except Exception as e: 

300 console.print(f"[yellow]Warning: Could not extract input schema: {e}[/yellow]") 

301 

302 return None 

303 

304 def _extract_output_schema(self, func: Any) -> dict[str, Any] | None: 

305 """Extract output schema from return type annotation.""" 

306 from typing import get_type_hints 

307 

308 try: 

309 type_hints = get_type_hints(func, include_extras=True) 

310 return_type = type_hints.get("return") 

311 

312 if return_type is None: 

313 return None 

314 

315 # If it's a Pydantic BaseModel, extract schema manually 

316 if hasattr(return_type, "model_fields"): 

317 return self._extract_pydantic_model_schema(return_type) 

318 

319 # For other types, create a simple schema 

320 return self._type_to_schema(return_type) 

321 

322 except Exception as e: 

323 console.print(f"[yellow]Warning: Could not extract output schema: {e}[/yellow]") 

324 

325 return None 

326 

327 def _extract_pydantic_model_schema(self, model_class: Any) -> dict[str, Any]: 

328 """Extract schema from Pydantic model by inspecting fields directly.""" 

329 try: 

330 schema = {"type": "object", "properties": {}, "required": []} 

331 

332 if hasattr(model_class, "model_fields"): 

333 for field_name, field_info in model_class.model_fields.items(): 

334 # Extract field type 

335 field_type = field_info.annotation if hasattr(field_info, "annotation") else None 

336 if field_type: 

337 field_schema = self._type_to_schema(field_type) 

338 

339 # Add description if available 

340 if hasattr(field_info, "description") and field_info.description: 

341 field_schema["description"] = field_info.description 

342 

343 # Add title 

344 field_schema["title"] = field_name.replace("_", " ").title() 

345 

346 # Add default if available 

347 if hasattr(field_info, "default") and field_info.default is not None: 

348 try: 

349 # Only add if it's JSON serializable 

350 import json 

351 

352 json.dumps(field_info.default) 

353 field_schema["default"] = field_info.default 

354 except: 

355 pass 

356 

357 schema["properties"][field_name] = field_schema 

358 

359 # Check if required 

360 if hasattr(field_info, "is_required") and field_info.is_required(): 

361 schema["required"].append(field_name) 

362 elif not hasattr(field_info, "default") or field_info.default is None: 

363 # Assume required if no default 

364 schema["required"].append(field_name) 

365 

366 return schema 

367 

368 except Exception as e: 

369 console.print(f"[yellow]Warning: Could not extract Pydantic model schema: {e}[/yellow]") 

370 return {"type": "object"} 

371 

372 def _clean_schema(self, schema: Any) -> dict[str, Any]: 

373 """Clean up a schema to remove non-JSON-serializable objects.""" 

374 import json 

375 

376 def clean_object(obj: Any) -> Any: 

377 if obj is None: 

378 return None 

379 elif isinstance(obj, (str, int, float, bool)): 

380 return obj 

381 elif isinstance(obj, dict): 

382 cleaned = {} 

383 for k, v in obj.items(): 

384 # Skip problematic keys 

385 if k in ["definitions", "$defs", "allOf", "anyOf", "oneOf"]: 

386 continue 

387 cleaned_v = clean_object(v) 

388 if cleaned_v is not None: 

389 cleaned[k] = cleaned_v 

390 return cleaned if cleaned else None 

391 elif isinstance(obj, list): 

392 cleaned = [] 

393 for item in obj: 

394 cleaned_item = clean_object(item) 

395 if cleaned_item is not None: 

396 cleaned.append(cleaned_item) 

397 return cleaned if cleaned else None 

398 else: 

399 # For any other type, test JSON serializability 

400 try: 

401 json.dumps(obj) 

402 return obj 

403 except (TypeError, ValueError): 

404 # If it's not JSON serializable, try to get a string representation 

405 if hasattr(obj, "__name__"): 

406 return obj.__name__ 

407 elif hasattr(obj, "__str__"): 

408 try: 

409 str_val = str(obj) 

410 if str_val and str_val != repr(obj): 

411 return str_val 

412 except: 

413 pass 

414 return None 

415 

416 cleaned = clean_object(schema) 

417 return cleaned if cleaned else {"type": "object"} 

418 

419 def _extract_param_schema_from_hint(self, type_hint: Any, param_name: str) -> dict[str, Any] | None: 

420 """Extract parameter schema from type hint (including Annotated types).""" 

421 from typing import get_args, get_origin 

422 

423 # Handle Annotated types 

424 if get_origin(type_hint) is not None: 

425 origin = get_origin(type_hint) 

426 args = get_args(type_hint) 

427 

428 # Check for Annotated[Type, Field(...)] 

429 if hasattr(origin, "__name__") and origin.__name__ == "Annotated" and len(args) >= 2: 

430 base_type = args[0] 

431 metadata = args[1:] 

432 

433 # Start with base type schema 

434 schema = self._type_to_schema(base_type) 

435 

436 # Extract Field metadata 

437 for meta in metadata: 

438 if hasattr(meta, "description") and meta.description: 

439 schema["description"] = meta.description 

440 if hasattr(meta, "title") and meta.title: 

441 schema["title"] = meta.title 

442 if hasattr(meta, "default") and meta.default is not None: 

443 schema["default"] = meta.default 

444 # Add other Field constraints as needed 

445 

446 return schema 

447 

448 # For non-Annotated types, just convert the type 

449 return self._type_to_schema(type_hint) 

450 

451 def _type_to_schema(self, type_hint: object) -> dict[str, Any]: 

452 """Convert a Python type to JSON schema.""" 

453 from typing import get_args, get_origin 

454 import types 

455 

456 # Handle None/NoneType 

457 if type_hint is type(None): 

458 return {"type": "null"} 

459 

460 # Handle basic types 

461 if type_hint is str: 

462 return {"type": "string"} 

463 elif type_hint is int: 

464 return {"type": "integer"} 

465 elif type_hint is float: 

466 return {"type": "number"} 

467 elif type_hint is bool: 

468 return {"type": "boolean"} 

469 elif type_hint is list: 

470 return {"type": "array"} 

471 elif type_hint is dict: 

472 return {"type": "object"} 

473 

474 # Handle generic types 

475 origin = get_origin(type_hint) 

476 if origin is not None: 

477 args = get_args(type_hint) 

478 

479 if origin is list: 

480 if args: 

481 item_schema = self._type_to_schema(args[0]) 

482 return {"type": "array", "items": item_schema} 

483 return {"type": "array"} 

484 

485 elif origin is dict: 

486 return {"type": "object"} 

487 

488 elif ( 

489 origin is types.UnionType 

490 or (hasattr(types, "UnionType") and origin is types.UnionType) 

491 or str(origin).startswith("typing.Union") 

492 ): 

493 # Handle Union types (including Optional) 

494 non_none_types = [arg for arg in args if arg is not type(None)] 

495 if len(non_none_types) == 1: 

496 # This is Optional[Type] 

497 return self._type_to_schema(non_none_types[0]) 

498 # For complex unions, default to object 

499 return {"type": "object"} 

500 

501 # For unknown types, try to use Pydantic schema if available 

502 if hasattr(type_hint, "model_json_schema"): 

503 schema = type_hint.model_json_schema() 

504 return self._clean_schema(schema) 

505 

506 # Default fallback 

507 return {"type": "object"} 

508 

509 def _process_tool(self, component: ParsedComponent, tree: ast.Module) -> None: 

510 """Process a tool component to extract input/output schemas and annotations.""" 

511 # Look for Input and Output classes in the AST 

512 input_class = None 

513 output_class = None 

514 annotations = None 

515 

516 for node in tree.body: 

517 if isinstance(node, ast.ClassDef): 

518 if node.name == "Input": 

519 input_class = node 

520 elif node.name == "Output": 

521 output_class = node 

522 # Look for annotations assignment 

523 elif isinstance(node, ast.Assign): 

524 for target in node.targets: 

525 if isinstance(target, ast.Name) and target.id == "annotations": 

526 if isinstance(node.value, ast.Dict): 

527 annotations = self._extract_dict_from_ast(node.value) 

528 break 

529 

530 # Process Input class if found 

531 if input_class: 

532 # Check if it inherits from BaseModel 

533 for base in input_class.bases: 

534 if isinstance(base, ast.Name) and base.id == "BaseModel": 

535 component.input_schema = self._extract_pydantic_schema_from_ast(input_class) 

536 break 

537 

538 # Process Output class if found 

539 if output_class: 

540 # Check if it inherits from BaseModel 

541 for base in output_class.bases: 

542 if isinstance(base, ast.Name) and base.id == "BaseModel": 

543 component.output_schema = self._extract_pydantic_schema_from_ast(output_class) 

544 break 

545 

546 # Store annotations if found 

547 if annotations: 

548 component.annotations = annotations 

549 

550 def _process_resource(self, component: ParsedComponent, tree: ast.Module) -> None: 

551 """Process a resource component to extract URI template.""" 

552 # Look for resource_uri assignment in the AST 

553 for node in tree.body: 

554 if isinstance(node, ast.Assign): 

555 for target in node.targets: 

556 if ( 

557 isinstance(target, ast.Name) 

558 and target.id == "resource_uri" 

559 and isinstance(node.value, ast.Constant) 

560 ): 

561 uri_template = node.value.value 

562 component.uri_template = uri_template 

563 

564 # Extract URI parameters (parts in {}) 

565 uri_params = re.findall(r"{([^}]+)}", uri_template) 

566 if uri_params: 

567 component.parameters = uri_params 

568 break 

569 

570 def _process_prompt(self, component: ParsedComponent, tree: ast.Module) -> None: 

571 """Process a prompt component (no special processing needed).""" 

572 pass 

573 

574 def _derive_component_name(self, file_path: Path, component_type: ComponentType) -> str: 

575 """Derive a component name from its file path according to the spec. 

576 

577 Following the spec: <filename> + ("_" + "_".join(PathRev) if PathRev else "") 

578 where PathRev is the reversed list of parent directories under the category. 

579 """ 

580 rel_path = file_path.relative_to(self.project_root) 

581 

582 # Find which category directory this is in 

583 category_idx = -1 

584 for i, part in enumerate(rel_path.parts): 

585 if part in ["tools", "resources", "prompts"]: 

586 category_idx = i 

587 break 

588 

589 if category_idx == -1: 

590 return "" 

591 

592 # Get the filename without extension 

593 filename = rel_path.stem 

594 

595 # Get parent directories between category and file 

596 parent_dirs = list(rel_path.parts[category_idx + 1 : -1]) 

597 

598 # Reverse parent dirs according to spec 

599 parent_dirs.reverse() 

600 

601 # Form the ID according to spec 

602 if parent_dirs: 

603 return f"{filename}_{'_'.join(parent_dirs)}" 

604 else: 

605 return filename 

606 

607 def _extract_pydantic_schema_from_ast(self, class_node: ast.ClassDef) -> dict[str, Any]: 

608 """Extract a JSON schema from an AST class definition. 

609 

610 This is a simplified version that extracts basic field information. 

611 For complex annotations, a more sophisticated approach would be needed. 

612 """ 

613 schema = {"type": "object", "properties": {}, "required": []} 

614 

615 for node in class_node.body: 

616 if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name): 

617 field_name = node.target.id 

618 

619 # Extract type annotation as string 

620 annotation = "" 

621 if isinstance(node.annotation, ast.Name): 

622 annotation = node.annotation.id 

623 elif isinstance(node.annotation, ast.Subscript): 

624 # Simple handling of things like List[str] 

625 annotation = ast.unparse(node.annotation) 

626 else: 

627 annotation = ast.unparse(node.annotation) 

628 

629 # Create property definition using improved type extraction 

630 if isinstance(node.annotation, ast.Subscript): 

631 # Use the improved complex type extraction 

632 type_schema = self._extract_complex_type_schema(node.annotation) 

633 if isinstance(type_schema, dict) and "type" in type_schema: 

634 prop = type_schema.copy() 

635 prop["title"] = field_name.replace("_", " ").title() 

636 else: 

637 prop = { 

638 "type": self._type_hint_to_json_type(annotation), 

639 "title": field_name.replace("_", " ").title(), 

640 } 

641 elif isinstance(node.annotation, ast.Name): 

642 prop = { 

643 "type": self._type_hint_to_json_type(node.annotation.id), 

644 "title": field_name.replace("_", " ").title(), 

645 } 

646 else: 

647 prop = { 

648 "type": self._type_hint_to_json_type(annotation), 

649 "title": field_name.replace("_", " ").title(), 

650 } 

651 

652 # Extract default value if present 

653 if node.value is not None: 

654 if isinstance(node.value, ast.Constant): 

655 # Simple constant default 

656 prop["default"] = node.value.value 

657 elif ( 

658 isinstance(node.value, ast.Call) 

659 and isinstance(node.value.func, ast.Name) 

660 and node.value.func.id == "Field" 

661 ): 

662 # Field object - extract its parameters 

663 for keyword in node.value.keywords: 

664 if keyword.arg == "default" or keyword.arg == "default_factory": 

665 if isinstance(keyword.value, ast.Constant): 

666 prop["default"] = keyword.value.value 

667 elif keyword.arg == "description": 

668 if isinstance(keyword.value, ast.Constant): 

669 prop["description"] = keyword.value.value 

670 elif keyword.arg == "title": 

671 if isinstance(keyword.value, ast.Constant): 

672 prop["title"] = keyword.value.value 

673 

674 # Check for position default argument 

675 # (Field(..., "description")) 

676 if node.value.args: 

677 for i, arg in enumerate(node.value.args): 

678 if i == 0 and isinstance(arg, ast.Constant) and arg.value != Ellipsis: 

679 prop["default"] = arg.value 

680 elif i == 1 and isinstance(arg, ast.Constant): 

681 prop["description"] = arg.value 

682 

683 # Add to properties 

684 schema["properties"][field_name] = prop 

685 

686 # Check if required (no default value or Field(...)) 

687 is_required = True 

688 if node.value is not None: 

689 if isinstance(node.value, ast.Constant): 

690 is_required = False 

691 elif ( 

692 isinstance(node.value, ast.Call) 

693 and isinstance(node.value.func, ast.Name) 

694 and node.value.func.id == "Field" 

695 ): 

696 # Field has default if it doesn't use ... or if it has a 

697 # default keyword 

698 has_ellipsis = False 

699 has_default = False 

700 

701 if node.value.args and isinstance(node.value.args[0], ast.Constant): 

702 has_ellipsis = node.value.args[0].value is Ellipsis 

703 

704 for keyword in node.value.keywords: 

705 if keyword.arg == "default" or keyword.arg == "default_factory": 

706 has_default = True 

707 

708 is_required = has_ellipsis and not has_default 

709 

710 if is_required: 

711 schema["required"].append(field_name) 

712 

713 return schema 

714 

715 def _type_hint_to_json_type(self, type_hint: str) -> str: 

716 """Convert a Python type hint to a JSON schema type. 

717 

718 This handles complex types and edge cases better than the original version. 

719 """ 

720 # Handle None type 

721 if type_hint.lower() in ["none", "nonetype"]: 

722 return "null" 

723 

724 # Handle basic types first 

725 type_map = { 

726 "str": "string", 

727 "int": "integer", 

728 "float": "number", 

729 "bool": "boolean", 

730 "list": "array", 

731 "dict": "object", 

732 "any": "object", # Any maps to object 

733 } 

734 

735 # Exact matches for simple types 

736 lower_hint = type_hint.lower() 

737 if lower_hint in type_map: 

738 return type_map[lower_hint] 

739 

740 # Handle common complex patterns 

741 if "list[" in type_hint or "List[" in type_hint: 

742 return "array" 

743 elif "dict[" in type_hint or "Dict[" in type_hint: 

744 return "object" 

745 elif "union[" in type_hint or "Union[" in type_hint: 

746 # For Union types, try to extract the first non-None type 

747 if "none" in lower_hint or "nonetype" in lower_hint: 

748 # This is Optional[SomeType] - extract the SomeType 

749 for basic_type in type_map: 

750 if basic_type in lower_hint: 

751 return type_map[basic_type] 

752 return "object" # Fallback for complex unions 

753 elif "optional[" in type_hint or "Optional[" in type_hint: 

754 # Extract the wrapped type from Optional[Type] 

755 for basic_type in type_map: 

756 if basic_type in lower_hint: 

757 return type_map[basic_type] 

758 return "object" 

759 

760 # Handle some common pydantic/typing types 

761 if any(keyword in lower_hint for keyword in ["basemodel", "model"]): 

762 return "object" 

763 

764 # Check for numeric patterns 

765 if any(num_type in lower_hint for num_type in ["int", "integer", "number"]): 

766 return "integer" 

767 elif any(num_type in lower_hint for num_type in ["float", "double", "decimal"]): 

768 return "number" 

769 elif any(str_type in lower_hint for str_type in ["str", "string", "text"]): 

770 return "string" 

771 elif any(bool_type in lower_hint for bool_type in ["bool", "boolean"]): 

772 return "boolean" 

773 

774 # Default to object for unknown complex types, string for simple unknowns 

775 if "[" in type_hint or "." in type_hint: 

776 return "object" 

777 else: 

778 return "string" 

779 

780 def _extract_dict_from_ast(self, dict_node: ast.Dict) -> dict[str, Any]: 

781 """Extract a dictionary from an AST Dict node. 

782 

783 This handles simple literal dictionaries with string keys and 

784 boolean/string/number values. 

785 """ 

786 result = {} 

787 

788 for key, value in zip(dict_node.keys, dict_node.values, strict=False): 

789 # Extract the key 

790 if isinstance(key, ast.Constant) and isinstance(key.value, str): 

791 key_str = key.value 

792 elif isinstance(key, ast.Str): # For older Python versions 

793 key_str = key.s 

794 else: 

795 # Skip non-string keys 

796 continue 

797 

798 # Extract the value 

799 if isinstance(value, ast.Constant): 

800 # Handles strings, numbers, booleans, None 

801 result[key_str] = value.value 

802 elif isinstance(value, ast.Str): # For older Python versions 

803 result[key_str] = value.s 

804 elif isinstance(value, ast.Num): # For older Python versions 

805 result[key_str] = value.n 

806 elif isinstance(value, ast.NameConstant): # For older Python versions (True/False/None) 

807 result[key_str] = value.value 

808 elif isinstance(value, ast.Name): 

809 # Handle True/False/None as names 

810 if value.id in ("True", "False", "None"): 

811 result[key_str] = {"True": True, "False": False, "None": None}[value.id] 

812 # We could add more complex value handling here if needed 

813 

814 return result 

815 

816 def _extract_complex_type_schema(self, subscript: ast.Subscript) -> dict[str, Any]: 

817 """Extract schema from complex types like list[str], dict[str, Any], etc.""" 

818 if isinstance(subscript.value, ast.Name): 

819 base_type = subscript.value.id 

820 

821 if base_type == "list": 

822 # Handle list[ItemType] 

823 if isinstance(subscript.slice, ast.Name): 

824 item_type = self._type_hint_to_json_type(subscript.slice.id) 

825 return {"type": "array", "items": {"type": item_type}} 

826 elif isinstance(subscript.slice, ast.Subscript): 

827 # Nested subscript like list[dict[str, Any]] 

828 item_schema = self._extract_complex_type_schema(subscript.slice) 

829 return {"type": "array", "items": item_schema} 

830 else: 

831 # Complex item type, try to parse it 

832 item_type_str = ast.unparse(subscript.slice) 

833 if "dict" in item_type_str.lower(): 

834 return {"type": "array", "items": {"type": "object"}} 

835 else: 

836 item_type = self._type_hint_to_json_type(item_type_str) 

837 return {"type": "array", "items": {"type": item_type}} 

838 

839 elif base_type == "dict": 

840 return {"type": "object"} 

841 

842 elif base_type in ["Optional", "Union"]: 

843 # Handle Optional[Type] or Union[Type, None] 

844 return self._handle_optional_type(subscript) 

845 

846 # Fallback 

847 type_str = ast.unparse(subscript) 

848 return {"type": self._type_hint_to_json_type(type_str)} 

849 

850 def _handle_union_type(self, union_node: ast.BinOp) -> dict[str, Any]: 

851 """Handle union types like str | None.""" 

852 # For now, just extract the first non-None type 

853 left_type = self._extract_type_from_node(union_node.left) 

854 right_type = self._extract_type_from_node(union_node.right) 

855 

856 # If one side is None, return the other type 

857 if isinstance(right_type, str) and right_type == "null": 

858 return left_type if isinstance(left_type, dict) else {"type": left_type} 

859 elif isinstance(left_type, str) and left_type == "null": 

860 return right_type if isinstance(right_type, dict) else {"type": right_type} 

861 

862 # Otherwise, return the first type 

863 return left_type if isinstance(left_type, dict) else {"type": left_type} 

864 

865 def _handle_optional_type(self, subscript: ast.Subscript) -> dict[str, Any]: 

866 """Handle Optional[Type] annotations.""" 

867 if isinstance(subscript.slice, ast.Name): 

868 base_type = self._type_hint_to_json_type(subscript.slice.id) 

869 return {"type": base_type} 

870 elif isinstance(subscript.slice, ast.Subscript): 

871 return self._extract_complex_type_schema(subscript.slice) 

872 else: 

873 type_str = ast.unparse(subscript.slice) 

874 return {"type": self._type_hint_to_json_type(type_str)} 

875 

876 def _is_parameter_required(self, position: int, defaults: list, total_args: int) -> bool: 

877 """Check if a function parameter is required (has no default value).""" 

878 if position >= total_args or position < 0: 

879 return True # Default to required if position is out of range 

880 

881 # If there are no defaults, all parameters are required 

882 if not defaults: 

883 return True 

884 

885 # Defaults apply to the last N parameters where N = len(defaults) 

886 # So if we have 4 args and 2 defaults, defaults apply to args[2] and args[3] 

887 args_with_defaults = len(defaults) 

888 first_default_position = total_args - args_with_defaults 

889 

890 # If this parameter's position is before the first default position, 

891 # it's required 

892 return position < first_default_position 

893 

894 def _extract_return_type_schema(self, return_annotation: ast.AST, tree: ast.Module) -> dict[str, Any] | None: 

895 """Extract schema from function return type annotation.""" 

896 if isinstance(return_annotation, ast.Name): 

897 # Simple type like str, int, or a class name 

898 if return_annotation.id in ["str", "int", "float", "bool", "list", "dict"]: 

899 return {"type": self._type_hint_to_json_type(return_annotation.id)} 

900 else: 

901 # Assume it's a Pydantic model class - look for it in the module 

902 return self._find_class_schema(return_annotation.id, tree) 

903 

904 elif isinstance(return_annotation, ast.Subscript): 

905 # Complex type like list[dict], Optional[MyClass], etc. 

906 return self._extract_complex_type_schema(return_annotation) 

907 

908 else: 

909 # Other complex types 

910 type_str = ast.unparse(return_annotation) 

911 return {"type": self._type_hint_to_json_type(type_str)} 

912 

913 def _find_class_schema(self, class_name: str, tree: ast.Module) -> dict[str, Any] | None: 

914 """Find a class definition in the module and extract its schema.""" 

915 for node in tree.body: 

916 if isinstance(node, ast.ClassDef) and node.name == class_name: 

917 # Check if it inherits from BaseModel 

918 for base in node.bases: 

919 if isinstance(base, ast.Name) and base.id == "BaseModel": 

920 return self._extract_pydantic_schema_from_ast(node) 

921 

922 return None 

923 

924 

925def parse_project(project_path: Path) -> dict[ComponentType, list[ParsedComponent]]: 

926 """Parse a GolfMCP project to extract all components.""" 

927 parser = AstParser(project_path) 

928 

929 components: dict[ComponentType, list[ParsedComponent]] = { 

930 ComponentType.TOOL: [], 

931 ComponentType.RESOURCE: [], 

932 ComponentType.PROMPT: [], 

933 } 

934 

935 # Parse each directory 

936 for comp_type, dir_name in [ 

937 (ComponentType.TOOL, "tools"), 

938 (ComponentType.RESOURCE, "resources"), 

939 (ComponentType.PROMPT, "prompts"), 

940 ]: 

941 dir_path = project_path / dir_name 

942 if dir_path.exists() and dir_path.is_dir(): 

943 dir_components = parser.parse_directory(dir_path) 

944 components[comp_type].extend([c for c in dir_components if c.type == comp_type]) 

945 

946 # Check for ID collisions 

947 all_ids = [] 

948 for comp_type, comps in components.items(): 

949 for comp in comps: 

950 if comp.name in all_ids: 

951 raise ValueError(f"ID collision detected: {comp.name} is used by multiple components") 

952 all_ids.append(comp.name) 

953 

954 return components 

955 

956 

957def parse_common_files(project_path: Path) -> dict[str, Path]: 

958 """Find all common.py files in the project. 

959 

960 Args: 

961 project_path: Path to the project root 

962 

963 Returns: 

964 Dictionary mapping directory paths to common.py file paths 

965 """ 

966 common_files = {} 

967 

968 # Search for common.py files in tools, resources, and prompts directories 

969 for dir_name in ["tools", "resources", "prompts"]: 

970 base_dir = project_path / dir_name 

971 if not base_dir.exists() or not base_dir.is_dir(): 

972 continue 

973 

974 # Find all common.py files (recursively) 

975 for common_file in base_dir.glob("**/common.py"): 

976 # Skip files in __pycache__ or other hidden directories 

977 if "__pycache__" in common_file.parts or any(part.startswith(".") for part in common_file.parts): 

978 continue 

979 

980 # Get the parent directory as the module path 

981 module_path = str(common_file.parent.relative_to(project_path)) 

982 common_files[module_path] = common_file 

983 

984 return common_files 

985 

986 

987def _is_golf_component_file(file_path: Path) -> bool: 

988 """Check if a Python file is a Golf component (has export or resource_uri). 

989  

990 Args: 

991 file_path: Path to the Python file to check 

992  

993 Returns: 

994 True if the file appears to be a Golf component, False otherwise 

995 """ 

996 try: 

997 with open(file_path, 'r', encoding='utf-8') as f: 

998 content = f.read() 

999 

1000 # Parse the file to check for Golf component patterns 

1001 tree = ast.parse(content) 

1002 

1003 # Look for 'export' or 'resource_uri' variable assignments 

1004 for node in ast.walk(tree): 

1005 if isinstance(node, ast.Assign): 

1006 for target in node.targets: 

1007 if isinstance(target, ast.Name): 

1008 if target.id in ('export', 'resource_uri'): 

1009 return True 

1010 

1011 return False 

1012 

1013 except (SyntaxError, OSError, UnicodeDecodeError): 

1014 # If we can't parse the file, assume it's not a component 

1015 return False 

1016 

1017 

1018def parse_shared_files(project_path: Path) -> dict[str, Path]: 

1019 """Find all shared Python files in the project (non-component .py files). 

1020 

1021 Args: 

1022 project_path: Path to the project root 

1023 

1024 Returns: 

1025 Dictionary mapping module paths to shared file paths 

1026 """ 

1027 shared_files = {} 

1028 

1029 # Search for all .py files in tools, resources, and prompts directories 

1030 for dir_name in ["tools", "resources", "prompts"]: 

1031 base_dir = project_path / dir_name 

1032 if not base_dir.exists() or not base_dir.is_dir(): 

1033 continue 

1034 

1035 # Find all .py files (recursively) 

1036 for py_file in base_dir.glob("**/*.py"): 

1037 # Skip files in __pycache__ or other hidden directories 

1038 if "__pycache__" in py_file.parts or any(part.startswith(".") for part in py_file.parts): 

1039 continue 

1040 

1041 # Skip files that are Golf components (have export or resource_uri) 

1042 if _is_golf_component_file(py_file): 

1043 continue 

1044 

1045 # Calculate the module path for this shared file 

1046 # For example: tools/weather/helpers.py -> tools/weather/helpers 

1047 relative_path = py_file.relative_to(project_path) 

1048 module_path = str(relative_path.with_suffix('')) # Remove .py extension 

1049 

1050 shared_files[module_path] = py_file 

1051 

1052 return shared_files