Coverage for /Users/antonigmitruk/golf/src/golf/core/parser.py: 63%
585 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-16 18:46 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-08-16 18:46 +0200
1"""Python file parser for extracting tools, resources, and prompts using AST."""
3import ast
4import re
5from dataclasses import dataclass
6from enum import Enum
7from pathlib import Path
8from typing import Any
10from rich.console import Console
12console = Console()
15class ComponentType(str, Enum):
16 """Type of component discovered by the parser."""
18 TOOL = "tool"
19 RESOURCE = "resource"
20 PROMPT = "prompt"
21 ROUTE = "route"
22 UNKNOWN = "unknown"
25@dataclass
26class ParsedComponent:
27 """Represents a parsed MCP component (tool, resource, or prompt)."""
29 name: str # Derived from file path or explicit name
30 type: ComponentType
31 file_path: Path
32 module_path: str
33 docstring: str | None = None
34 input_schema: dict[str, Any] | None = None
35 output_schema: dict[str, Any] | None = None
36 uri_template: str | None = None # For resources
37 parameters: list[str] | None = None # For resources with URI params
38 parent_module: str | None = None # For nested components
39 entry_function: str | None = None # Store the name of the function to use
40 annotations: dict[str, Any] | None = None # Tool annotations for MCP hints
43class AstParser:
44 """AST-based parser for extracting MCP components from Python files."""
46 def __init__(self, project_root: Path) -> None:
47 """Initialize the parser.
49 Args:
50 project_root: Root directory of the project
51 """
52 self.project_root = project_root
53 self.components: dict[str, ParsedComponent] = {}
55 def parse_directory(self, directory: Path) -> list[ParsedComponent]:
56 """Parse all Python files in a directory recursively."""
57 components = []
59 for file_path in directory.glob("**/*.py"):
60 # Skip __pycache__ and other hidden directories
61 if "__pycache__" in file_path.parts or any(part.startswith(".") for part in file_path.parts):
62 continue
64 try:
65 file_components = self.parse_file(file_path)
66 components.extend(file_components)
67 except Exception as e:
68 relative_path = file_path.relative_to(self.project_root)
69 console.print(f"[bold red]Error parsing {relative_path}:[/bold red] {e}")
71 return components
73 def parse_file(self, file_path: Path) -> list[ParsedComponent]:
74 """Parse a single Python file using AST to extract MCP components."""
75 # Handle common.py files
76 if file_path.name == "common.py":
77 # Register as a known shared module but don't return as a component
78 return []
80 # Skip __init__.py files for direct parsing
81 if file_path.name == "__init__.py":
82 return []
84 # Determine component type based on directory structure
85 rel_path = file_path.relative_to(self.project_root)
86 parent_dir = rel_path.parts[0] if rel_path.parts else None
88 component_type = ComponentType.UNKNOWN
89 if parent_dir == "tools":
90 component_type = ComponentType.TOOL
91 elif parent_dir == "resources":
92 component_type = ComponentType.RESOURCE
93 elif parent_dir == "prompts":
94 component_type = ComponentType.PROMPT
96 if component_type == ComponentType.UNKNOWN:
97 return [] # Not in a recognized directory
99 # Read the file content and parse it with AST
100 with open(file_path, encoding="utf-8") as f:
101 file_content = f.read()
103 try:
104 tree = ast.parse(file_content)
105 except SyntaxError as e:
106 raise ValueError(f"Syntax error in {file_path}: {e}")
108 # Extract module docstring
109 module_docstring = ast.get_docstring(tree)
110 if not module_docstring:
111 raise ValueError(f"Missing module docstring in {file_path}")
113 # Find the entry function - look for "export = function_name" pattern,
114 # or any top-level function (like "run") as a fallback
115 entry_function = None
116 export_target = None
118 # Look for export = function_name assignment
119 for node in tree.body:
120 if isinstance(node, ast.Assign):
121 for target in node.targets:
122 if isinstance(target, ast.Name) and target.id == "export" and isinstance(node.value, ast.Name):
123 export_target = node.value.id
124 break
126 # Find all top-level functions
127 functions = []
128 for node in tree.body:
129 if isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
130 functions.append(node)
131 # If this function matches our export target, it's our entry function
132 if export_target and node.name == export_target:
133 entry_function = node
135 # Check for the run function as a fallback
136 run_function = None
137 for func in functions:
138 if func.name == "run":
139 run_function = func
141 # If we have an export but didn't find the target function, warn
142 if export_target and not entry_function:
143 console.print(f"[yellow]Warning: Export target '{export_target}' not found in {file_path}[/yellow]")
145 # Use the export target function if found, otherwise fall back to run
146 entry_function = entry_function or run_function
148 # If no valid function found, skip this file
149 if not entry_function:
150 return []
152 # Create component
153 component = ParsedComponent(
154 name="", # Will be set later
155 type=component_type,
156 file_path=file_path,
157 module_path=file_path.relative_to(self.project_root).as_posix(),
158 docstring=module_docstring,
159 entry_function=export_target or "run", # Store the name of the entry function
160 )
162 # Process the entry function
163 self._process_entry_function(component, entry_function, tree, file_path)
165 # Process other component-specific information
166 if component_type == ComponentType.TOOL:
167 self._process_tool(component, tree)
168 elif component_type == ComponentType.RESOURCE:
169 self._process_resource(component, tree)
170 elif component_type == ComponentType.PROMPT:
171 self._process_prompt(component, tree)
173 # Set component name based on file path
174 component.name = self._derive_component_name(file_path, component_type)
176 # Set parent module if it's in a nested structure
177 if len(rel_path.parts) > 2: # More than just "tools/file.py"
178 parent_parts = rel_path.parts[1:-1] # Skip the root category and the file itself
179 if parent_parts:
180 component.parent_module = ".".join(parent_parts)
182 return [component]
184 def _process_entry_function(
185 self,
186 component: ParsedComponent,
187 func_node: ast.FunctionDef | ast.AsyncFunctionDef,
188 tree: ast.Module,
189 file_path: Path,
190 ) -> None:
191 """Process the entry function to extract parameters and return type."""
192 # Check for return annotation - STRICT requirement
193 if func_node.returns is None:
194 raise ValueError(f"Missing return annotation for {func_node.name} function in {file_path}")
196 # Extract parameter names for basic info
197 parameters = []
198 for arg in func_node.args.args:
199 # Skip self, cls, ctx parameters
200 if arg.arg not in ("self", "cls", "ctx"):
201 parameters.append(arg.arg)
203 # Store parameters
204 component.parameters = parameters
206 # Extract schemas using runtime inspection (safer and more accurate)
207 try:
208 self._extract_schemas_at_runtime(component, file_path)
209 except Exception as e:
210 console.print(f"[yellow]Warning: Could not extract schemas from {file_path}: {e}[/yellow]")
211 # Continue without schemas - better than failing the build
213 def _extract_schemas_at_runtime(self, component: ParsedComponent, file_path: Path) -> None:
214 """Extract input/output schemas by importing and inspecting the
215 actual function."""
216 import importlib.util
217 import sys
219 # Convert file path to module name
220 rel_path = file_path.relative_to(self.project_root)
221 module_name = str(rel_path.with_suffix("")).replace("/", ".")
223 # Temporarily add project root to sys.path
224 project_root_str = str(self.project_root)
225 if project_root_str not in sys.path:
226 sys.path.insert(0, project_root_str)
227 cleanup_path = True
228 else:
229 cleanup_path = False
231 try:
232 # Import the module
233 spec = importlib.util.spec_from_file_location(module_name, file_path)
234 if spec is None or spec.loader is None:
235 return
237 module = importlib.util.module_from_spec(spec)
238 spec.loader.exec_module(module)
240 # Get the entry function
241 if not hasattr(module, component.entry_function):
242 return
244 func = getattr(module, component.entry_function)
246 # Extract input schema from function signature
247 component.input_schema = self._extract_input_schema(func)
249 # Extract output schema from return type annotation
250 component.output_schema = self._extract_output_schema(func)
252 finally:
253 # Clean up sys.path
254 if cleanup_path and project_root_str in sys.path:
255 sys.path.remove(project_root_str)
257 def _extract_input_schema(self, func: Any) -> dict[str, Any] | None:
258 """Extract input schema from function signature using runtime inspection."""
259 import inspect
260 from typing import get_type_hints
262 try:
263 sig = inspect.signature(func)
264 type_hints = get_type_hints(func, include_extras=True)
266 properties = {}
267 required = []
269 for param_name, param in sig.parameters.items():
270 # Skip special parameters
271 if param_name in ("self", "cls", "ctx"):
272 continue
274 # Get type hint
275 if param_name not in type_hints:
276 continue
278 type_hint = type_hints[param_name]
280 # Extract schema for this parameter
281 param_schema = self._extract_param_schema_from_hint(type_hint, param_name)
282 if param_schema:
283 # Clean the schema to remove problematic objects
284 cleaned_schema = self._clean_schema(param_schema)
285 if cleaned_schema:
286 properties[param_name] = cleaned_schema
288 # Check if required (no default value)
289 if param.default is param.empty:
290 required.append(param_name)
292 if properties:
293 return {
294 "type": "object",
295 "properties": properties,
296 "required": required,
297 }
299 except Exception as e:
300 console.print(f"[yellow]Warning: Could not extract input schema: {e}[/yellow]")
302 return None
304 def _extract_output_schema(self, func: Any) -> dict[str, Any] | None:
305 """Extract output schema from return type annotation."""
306 from typing import get_type_hints
308 try:
309 type_hints = get_type_hints(func, include_extras=True)
310 return_type = type_hints.get("return")
312 if return_type is None:
313 return None
315 # If it's a Pydantic BaseModel, extract schema manually
316 if hasattr(return_type, "model_fields"):
317 return self._extract_pydantic_model_schema(return_type)
319 # For other types, create a simple schema
320 return self._type_to_schema(return_type)
322 except Exception as e:
323 console.print(f"[yellow]Warning: Could not extract output schema: {e}[/yellow]")
325 return None
327 def _extract_pydantic_model_schema(self, model_class: Any) -> dict[str, Any]:
328 """Extract schema from Pydantic model by inspecting fields directly."""
329 try:
330 schema = {"type": "object", "properties": {}, "required": []}
332 if hasattr(model_class, "model_fields"):
333 for field_name, field_info in model_class.model_fields.items():
334 # Extract field type
335 field_type = field_info.annotation if hasattr(field_info, "annotation") else None
336 if field_type:
337 field_schema = self._type_to_schema(field_type)
339 # Add description if available
340 if hasattr(field_info, "description") and field_info.description:
341 field_schema["description"] = field_info.description
343 # Add title
344 field_schema["title"] = field_name.replace("_", " ").title()
346 # Add default if available
347 if hasattr(field_info, "default") and field_info.default is not None:
348 try:
349 # Only add if it's JSON serializable
350 import json
352 json.dumps(field_info.default)
353 field_schema["default"] = field_info.default
354 except:
355 pass
357 schema["properties"][field_name] = field_schema
359 # Check if required
360 if hasattr(field_info, "is_required") and field_info.is_required():
361 schema["required"].append(field_name)
362 elif not hasattr(field_info, "default") or field_info.default is None:
363 # Assume required if no default
364 schema["required"].append(field_name)
366 return schema
368 except Exception as e:
369 console.print(f"[yellow]Warning: Could not extract Pydantic model schema: {e}[/yellow]")
370 return {"type": "object"}
372 def _clean_schema(self, schema: Any) -> dict[str, Any]:
373 """Clean up a schema to remove non-JSON-serializable objects."""
374 import json
376 def clean_object(obj: Any) -> Any:
377 if obj is None:
378 return None
379 elif isinstance(obj, (str, int, float, bool)):
380 return obj
381 elif isinstance(obj, dict):
382 cleaned = {}
383 for k, v in obj.items():
384 # Skip problematic keys
385 if k in ["definitions", "$defs", "allOf", "anyOf", "oneOf"]:
386 continue
387 cleaned_v = clean_object(v)
388 if cleaned_v is not None:
389 cleaned[k] = cleaned_v
390 return cleaned if cleaned else None
391 elif isinstance(obj, list):
392 cleaned = []
393 for item in obj:
394 cleaned_item = clean_object(item)
395 if cleaned_item is not None:
396 cleaned.append(cleaned_item)
397 return cleaned if cleaned else None
398 else:
399 # For any other type, test JSON serializability
400 try:
401 json.dumps(obj)
402 return obj
403 except (TypeError, ValueError):
404 # If it's not JSON serializable, try to get a string representation
405 if hasattr(obj, "__name__"):
406 return obj.__name__
407 elif hasattr(obj, "__str__"):
408 try:
409 str_val = str(obj)
410 if str_val and str_val != repr(obj):
411 return str_val
412 except:
413 pass
414 return None
416 cleaned = clean_object(schema)
417 return cleaned if cleaned else {"type": "object"}
419 def _extract_param_schema_from_hint(self, type_hint: Any, param_name: str) -> dict[str, Any] | None:
420 """Extract parameter schema from type hint (including Annotated types)."""
421 from typing import get_args, get_origin
423 # Handle Annotated types
424 if get_origin(type_hint) is not None:
425 origin = get_origin(type_hint)
426 args = get_args(type_hint)
428 # Check for Annotated[Type, Field(...)]
429 if hasattr(origin, "__name__") and origin.__name__ == "Annotated" and len(args) >= 2:
430 base_type = args[0]
431 metadata = args[1:]
433 # Start with base type schema
434 schema = self._type_to_schema(base_type)
436 # Extract Field metadata
437 for meta in metadata:
438 if hasattr(meta, "description") and meta.description:
439 schema["description"] = meta.description
440 if hasattr(meta, "title") and meta.title:
441 schema["title"] = meta.title
442 if hasattr(meta, "default") and meta.default is not None:
443 schema["default"] = meta.default
444 # Add other Field constraints as needed
446 return schema
448 # For non-Annotated types, just convert the type
449 return self._type_to_schema(type_hint)
451 def _type_to_schema(self, type_hint: object) -> dict[str, Any]:
452 """Convert a Python type to JSON schema."""
453 from typing import get_args, get_origin
454 import types
456 # Handle None/NoneType
457 if type_hint is type(None):
458 return {"type": "null"}
460 # Handle basic types
461 if type_hint is str:
462 return {"type": "string"}
463 elif type_hint is int:
464 return {"type": "integer"}
465 elif type_hint is float:
466 return {"type": "number"}
467 elif type_hint is bool:
468 return {"type": "boolean"}
469 elif type_hint is list:
470 return {"type": "array"}
471 elif type_hint is dict:
472 return {"type": "object"}
474 # Handle generic types
475 origin = get_origin(type_hint)
476 if origin is not None:
477 args = get_args(type_hint)
479 if origin is list:
480 if args:
481 item_schema = self._type_to_schema(args[0])
482 return {"type": "array", "items": item_schema}
483 return {"type": "array"}
485 elif origin is dict:
486 return {"type": "object"}
488 elif (
489 origin is types.UnionType
490 or (hasattr(types, "UnionType") and origin is types.UnionType)
491 or str(origin).startswith("typing.Union")
492 ):
493 # Handle Union types (including Optional)
494 non_none_types = [arg for arg in args if arg is not type(None)]
495 if len(non_none_types) == 1:
496 # This is Optional[Type]
497 return self._type_to_schema(non_none_types[0])
498 # For complex unions, default to object
499 return {"type": "object"}
501 # For unknown types, try to use Pydantic schema if available
502 if hasattr(type_hint, "model_json_schema"):
503 schema = type_hint.model_json_schema()
504 return self._clean_schema(schema)
506 # Default fallback
507 return {"type": "object"}
509 def _process_tool(self, component: ParsedComponent, tree: ast.Module) -> None:
510 """Process a tool component to extract input/output schemas and annotations."""
511 # Look for Input and Output classes in the AST
512 input_class = None
513 output_class = None
514 annotations = None
516 for node in tree.body:
517 if isinstance(node, ast.ClassDef):
518 if node.name == "Input":
519 input_class = node
520 elif node.name == "Output":
521 output_class = node
522 # Look for annotations assignment
523 elif isinstance(node, ast.Assign):
524 for target in node.targets:
525 if isinstance(target, ast.Name) and target.id == "annotations":
526 if isinstance(node.value, ast.Dict):
527 annotations = self._extract_dict_from_ast(node.value)
528 break
530 # Process Input class if found
531 if input_class:
532 # Check if it inherits from BaseModel
533 for base in input_class.bases:
534 if isinstance(base, ast.Name) and base.id == "BaseModel":
535 component.input_schema = self._extract_pydantic_schema_from_ast(input_class)
536 break
538 # Process Output class if found
539 if output_class:
540 # Check if it inherits from BaseModel
541 for base in output_class.bases:
542 if isinstance(base, ast.Name) and base.id == "BaseModel":
543 component.output_schema = self._extract_pydantic_schema_from_ast(output_class)
544 break
546 # Store annotations if found
547 if annotations:
548 component.annotations = annotations
550 def _process_resource(self, component: ParsedComponent, tree: ast.Module) -> None:
551 """Process a resource component to extract URI template."""
552 # Look for resource_uri assignment in the AST
553 for node in tree.body:
554 if isinstance(node, ast.Assign):
555 for target in node.targets:
556 if (
557 isinstance(target, ast.Name)
558 and target.id == "resource_uri"
559 and isinstance(node.value, ast.Constant)
560 ):
561 uri_template = node.value.value
562 component.uri_template = uri_template
564 # Extract URI parameters (parts in {})
565 uri_params = re.findall(r"{([^}]+)}", uri_template)
566 if uri_params:
567 component.parameters = uri_params
568 break
570 def _process_prompt(self, component: ParsedComponent, tree: ast.Module) -> None:
571 """Process a prompt component (no special processing needed)."""
572 pass
574 def _derive_component_name(self, file_path: Path, component_type: ComponentType) -> str:
575 """Derive a component name from its file path according to the spec.
577 Following the spec: <filename> + ("_" + "_".join(PathRev) if PathRev else "")
578 where PathRev is the reversed list of parent directories under the category.
579 """
580 rel_path = file_path.relative_to(self.project_root)
582 # Find which category directory this is in
583 category_idx = -1
584 for i, part in enumerate(rel_path.parts):
585 if part in ["tools", "resources", "prompts"]:
586 category_idx = i
587 break
589 if category_idx == -1:
590 return ""
592 # Get the filename without extension
593 filename = rel_path.stem
595 # Get parent directories between category and file
596 parent_dirs = list(rel_path.parts[category_idx + 1 : -1])
598 # Reverse parent dirs according to spec
599 parent_dirs.reverse()
601 # Form the ID according to spec
602 if parent_dirs:
603 return f"{filename}_{'_'.join(parent_dirs)}"
604 else:
605 return filename
607 def _extract_pydantic_schema_from_ast(self, class_node: ast.ClassDef) -> dict[str, Any]:
608 """Extract a JSON schema from an AST class definition.
610 This is a simplified version that extracts basic field information.
611 For complex annotations, a more sophisticated approach would be needed.
612 """
613 schema = {"type": "object", "properties": {}, "required": []}
615 for node in class_node.body:
616 if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name):
617 field_name = node.target.id
619 # Extract type annotation as string
620 annotation = ""
621 if isinstance(node.annotation, ast.Name):
622 annotation = node.annotation.id
623 elif isinstance(node.annotation, ast.Subscript):
624 # Simple handling of things like List[str]
625 annotation = ast.unparse(node.annotation)
626 else:
627 annotation = ast.unparse(node.annotation)
629 # Create property definition using improved type extraction
630 if isinstance(node.annotation, ast.Subscript):
631 # Use the improved complex type extraction
632 type_schema = self._extract_complex_type_schema(node.annotation)
633 if isinstance(type_schema, dict) and "type" in type_schema:
634 prop = type_schema.copy()
635 prop["title"] = field_name.replace("_", " ").title()
636 else:
637 prop = {
638 "type": self._type_hint_to_json_type(annotation),
639 "title": field_name.replace("_", " ").title(),
640 }
641 elif isinstance(node.annotation, ast.Name):
642 prop = {
643 "type": self._type_hint_to_json_type(node.annotation.id),
644 "title": field_name.replace("_", " ").title(),
645 }
646 else:
647 prop = {
648 "type": self._type_hint_to_json_type(annotation),
649 "title": field_name.replace("_", " ").title(),
650 }
652 # Extract default value if present
653 if node.value is not None:
654 if isinstance(node.value, ast.Constant):
655 # Simple constant default
656 prop["default"] = node.value.value
657 elif (
658 isinstance(node.value, ast.Call)
659 and isinstance(node.value.func, ast.Name)
660 and node.value.func.id == "Field"
661 ):
662 # Field object - extract its parameters
663 for keyword in node.value.keywords:
664 if keyword.arg == "default" or keyword.arg == "default_factory":
665 if isinstance(keyword.value, ast.Constant):
666 prop["default"] = keyword.value.value
667 elif keyword.arg == "description":
668 if isinstance(keyword.value, ast.Constant):
669 prop["description"] = keyword.value.value
670 elif keyword.arg == "title":
671 if isinstance(keyword.value, ast.Constant):
672 prop["title"] = keyword.value.value
674 # Check for position default argument
675 # (Field(..., "description"))
676 if node.value.args:
677 for i, arg in enumerate(node.value.args):
678 if i == 0 and isinstance(arg, ast.Constant) and arg.value != Ellipsis:
679 prop["default"] = arg.value
680 elif i == 1 and isinstance(arg, ast.Constant):
681 prop["description"] = arg.value
683 # Add to properties
684 schema["properties"][field_name] = prop
686 # Check if required (no default value or Field(...))
687 is_required = True
688 if node.value is not None:
689 if isinstance(node.value, ast.Constant):
690 is_required = False
691 elif (
692 isinstance(node.value, ast.Call)
693 and isinstance(node.value.func, ast.Name)
694 and node.value.func.id == "Field"
695 ):
696 # Field has default if it doesn't use ... or if it has a
697 # default keyword
698 has_ellipsis = False
699 has_default = False
701 if node.value.args and isinstance(node.value.args[0], ast.Constant):
702 has_ellipsis = node.value.args[0].value is Ellipsis
704 for keyword in node.value.keywords:
705 if keyword.arg == "default" or keyword.arg == "default_factory":
706 has_default = True
708 is_required = has_ellipsis and not has_default
710 if is_required:
711 schema["required"].append(field_name)
713 return schema
715 def _type_hint_to_json_type(self, type_hint: str) -> str:
716 """Convert a Python type hint to a JSON schema type.
718 This handles complex types and edge cases better than the original version.
719 """
720 # Handle None type
721 if type_hint.lower() in ["none", "nonetype"]:
722 return "null"
724 # Handle basic types first
725 type_map = {
726 "str": "string",
727 "int": "integer",
728 "float": "number",
729 "bool": "boolean",
730 "list": "array",
731 "dict": "object",
732 "any": "object", # Any maps to object
733 }
735 # Exact matches for simple types
736 lower_hint = type_hint.lower()
737 if lower_hint in type_map:
738 return type_map[lower_hint]
740 # Handle common complex patterns
741 if "list[" in type_hint or "List[" in type_hint:
742 return "array"
743 elif "dict[" in type_hint or "Dict[" in type_hint:
744 return "object"
745 elif "union[" in type_hint or "Union[" in type_hint:
746 # For Union types, try to extract the first non-None type
747 if "none" in lower_hint or "nonetype" in lower_hint:
748 # This is Optional[SomeType] - extract the SomeType
749 for basic_type in type_map:
750 if basic_type in lower_hint:
751 return type_map[basic_type]
752 return "object" # Fallback for complex unions
753 elif "optional[" in type_hint or "Optional[" in type_hint:
754 # Extract the wrapped type from Optional[Type]
755 for basic_type in type_map:
756 if basic_type in lower_hint:
757 return type_map[basic_type]
758 return "object"
760 # Handle some common pydantic/typing types
761 if any(keyword in lower_hint for keyword in ["basemodel", "model"]):
762 return "object"
764 # Check for numeric patterns
765 if any(num_type in lower_hint for num_type in ["int", "integer", "number"]):
766 return "integer"
767 elif any(num_type in lower_hint for num_type in ["float", "double", "decimal"]):
768 return "number"
769 elif any(str_type in lower_hint for str_type in ["str", "string", "text"]):
770 return "string"
771 elif any(bool_type in lower_hint for bool_type in ["bool", "boolean"]):
772 return "boolean"
774 # Default to object for unknown complex types, string for simple unknowns
775 if "[" in type_hint or "." in type_hint:
776 return "object"
777 else:
778 return "string"
780 def _extract_dict_from_ast(self, dict_node: ast.Dict) -> dict[str, Any]:
781 """Extract a dictionary from an AST Dict node.
783 This handles simple literal dictionaries with string keys and
784 boolean/string/number values.
785 """
786 result = {}
788 for key, value in zip(dict_node.keys, dict_node.values, strict=False):
789 # Extract the key
790 if isinstance(key, ast.Constant) and isinstance(key.value, str):
791 key_str = key.value
792 elif isinstance(key, ast.Str): # For older Python versions
793 key_str = key.s
794 else:
795 # Skip non-string keys
796 continue
798 # Extract the value
799 if isinstance(value, ast.Constant):
800 # Handles strings, numbers, booleans, None
801 result[key_str] = value.value
802 elif isinstance(value, ast.Str): # For older Python versions
803 result[key_str] = value.s
804 elif isinstance(value, ast.Num): # For older Python versions
805 result[key_str] = value.n
806 elif isinstance(value, ast.NameConstant): # For older Python versions (True/False/None)
807 result[key_str] = value.value
808 elif isinstance(value, ast.Name):
809 # Handle True/False/None as names
810 if value.id in ("True", "False", "None"):
811 result[key_str] = {"True": True, "False": False, "None": None}[value.id]
812 # We could add more complex value handling here if needed
814 return result
816 def _extract_complex_type_schema(self, subscript: ast.Subscript) -> dict[str, Any]:
817 """Extract schema from complex types like list[str], dict[str, Any], etc."""
818 if isinstance(subscript.value, ast.Name):
819 base_type = subscript.value.id
821 if base_type == "list":
822 # Handle list[ItemType]
823 if isinstance(subscript.slice, ast.Name):
824 item_type = self._type_hint_to_json_type(subscript.slice.id)
825 return {"type": "array", "items": {"type": item_type}}
826 elif isinstance(subscript.slice, ast.Subscript):
827 # Nested subscript like list[dict[str, Any]]
828 item_schema = self._extract_complex_type_schema(subscript.slice)
829 return {"type": "array", "items": item_schema}
830 else:
831 # Complex item type, try to parse it
832 item_type_str = ast.unparse(subscript.slice)
833 if "dict" in item_type_str.lower():
834 return {"type": "array", "items": {"type": "object"}}
835 else:
836 item_type = self._type_hint_to_json_type(item_type_str)
837 return {"type": "array", "items": {"type": item_type}}
839 elif base_type == "dict":
840 return {"type": "object"}
842 elif base_type in ["Optional", "Union"]:
843 # Handle Optional[Type] or Union[Type, None]
844 return self._handle_optional_type(subscript)
846 # Fallback
847 type_str = ast.unparse(subscript)
848 return {"type": self._type_hint_to_json_type(type_str)}
850 def _handle_union_type(self, union_node: ast.BinOp) -> dict[str, Any]:
851 """Handle union types like str | None."""
852 # For now, just extract the first non-None type
853 left_type = self._extract_type_from_node(union_node.left)
854 right_type = self._extract_type_from_node(union_node.right)
856 # If one side is None, return the other type
857 if isinstance(right_type, str) and right_type == "null":
858 return left_type if isinstance(left_type, dict) else {"type": left_type}
859 elif isinstance(left_type, str) and left_type == "null":
860 return right_type if isinstance(right_type, dict) else {"type": right_type}
862 # Otherwise, return the first type
863 return left_type if isinstance(left_type, dict) else {"type": left_type}
865 def _handle_optional_type(self, subscript: ast.Subscript) -> dict[str, Any]:
866 """Handle Optional[Type] annotations."""
867 if isinstance(subscript.slice, ast.Name):
868 base_type = self._type_hint_to_json_type(subscript.slice.id)
869 return {"type": base_type}
870 elif isinstance(subscript.slice, ast.Subscript):
871 return self._extract_complex_type_schema(subscript.slice)
872 else:
873 type_str = ast.unparse(subscript.slice)
874 return {"type": self._type_hint_to_json_type(type_str)}
876 def _is_parameter_required(self, position: int, defaults: list, total_args: int) -> bool:
877 """Check if a function parameter is required (has no default value)."""
878 if position >= total_args or position < 0:
879 return True # Default to required if position is out of range
881 # If there are no defaults, all parameters are required
882 if not defaults:
883 return True
885 # Defaults apply to the last N parameters where N = len(defaults)
886 # So if we have 4 args and 2 defaults, defaults apply to args[2] and args[3]
887 args_with_defaults = len(defaults)
888 first_default_position = total_args - args_with_defaults
890 # If this parameter's position is before the first default position,
891 # it's required
892 return position < first_default_position
894 def _extract_return_type_schema(self, return_annotation: ast.AST, tree: ast.Module) -> dict[str, Any] | None:
895 """Extract schema from function return type annotation."""
896 if isinstance(return_annotation, ast.Name):
897 # Simple type like str, int, or a class name
898 if return_annotation.id in ["str", "int", "float", "bool", "list", "dict"]:
899 return {"type": self._type_hint_to_json_type(return_annotation.id)}
900 else:
901 # Assume it's a Pydantic model class - look for it in the module
902 return self._find_class_schema(return_annotation.id, tree)
904 elif isinstance(return_annotation, ast.Subscript):
905 # Complex type like list[dict], Optional[MyClass], etc.
906 return self._extract_complex_type_schema(return_annotation)
908 else:
909 # Other complex types
910 type_str = ast.unparse(return_annotation)
911 return {"type": self._type_hint_to_json_type(type_str)}
913 def _find_class_schema(self, class_name: str, tree: ast.Module) -> dict[str, Any] | None:
914 """Find a class definition in the module and extract its schema."""
915 for node in tree.body:
916 if isinstance(node, ast.ClassDef) and node.name == class_name:
917 # Check if it inherits from BaseModel
918 for base in node.bases:
919 if isinstance(base, ast.Name) and base.id == "BaseModel":
920 return self._extract_pydantic_schema_from_ast(node)
922 return None
925def parse_project(project_path: Path) -> dict[ComponentType, list[ParsedComponent]]:
926 """Parse a GolfMCP project to extract all components."""
927 parser = AstParser(project_path)
929 components: dict[ComponentType, list[ParsedComponent]] = {
930 ComponentType.TOOL: [],
931 ComponentType.RESOURCE: [],
932 ComponentType.PROMPT: [],
933 }
935 # Parse each directory
936 for comp_type, dir_name in [
937 (ComponentType.TOOL, "tools"),
938 (ComponentType.RESOURCE, "resources"),
939 (ComponentType.PROMPT, "prompts"),
940 ]:
941 dir_path = project_path / dir_name
942 if dir_path.exists() and dir_path.is_dir():
943 dir_components = parser.parse_directory(dir_path)
944 components[comp_type].extend([c for c in dir_components if c.type == comp_type])
946 # Check for ID collisions
947 all_ids = []
948 for comp_type, comps in components.items():
949 for comp in comps:
950 if comp.name in all_ids:
951 raise ValueError(f"ID collision detected: {comp.name} is used by multiple components")
952 all_ids.append(comp.name)
954 return components
957def parse_common_files(project_path: Path) -> dict[str, Path]:
958 """Find all common.py files in the project.
960 Args:
961 project_path: Path to the project root
963 Returns:
964 Dictionary mapping directory paths to common.py file paths
965 """
966 common_files = {}
968 # Search for common.py files in tools, resources, and prompts directories
969 for dir_name in ["tools", "resources", "prompts"]:
970 base_dir = project_path / dir_name
971 if not base_dir.exists() or not base_dir.is_dir():
972 continue
974 # Find all common.py files (recursively)
975 for common_file in base_dir.glob("**/common.py"):
976 # Skip files in __pycache__ or other hidden directories
977 if "__pycache__" in common_file.parts or any(part.startswith(".") for part in common_file.parts):
978 continue
980 # Get the parent directory as the module path
981 module_path = str(common_file.parent.relative_to(project_path))
982 common_files[module_path] = common_file
984 return common_files
987def _is_golf_component_file(file_path: Path) -> bool:
988 """Check if a Python file is a Golf component (has export or resource_uri).
990 Args:
991 file_path: Path to the Python file to check
993 Returns:
994 True if the file appears to be a Golf component, False otherwise
995 """
996 try:
997 with open(file_path, 'r', encoding='utf-8') as f:
998 content = f.read()
1000 # Parse the file to check for Golf component patterns
1001 tree = ast.parse(content)
1003 # Look for 'export' or 'resource_uri' variable assignments
1004 for node in ast.walk(tree):
1005 if isinstance(node, ast.Assign):
1006 for target in node.targets:
1007 if isinstance(target, ast.Name):
1008 if target.id in ('export', 'resource_uri'):
1009 return True
1011 return False
1013 except (SyntaxError, OSError, UnicodeDecodeError):
1014 # If we can't parse the file, assume it's not a component
1015 return False
1018def parse_shared_files(project_path: Path) -> dict[str, Path]:
1019 """Find all shared Python files in the project (non-component .py files).
1021 Args:
1022 project_path: Path to the project root
1024 Returns:
1025 Dictionary mapping module paths to shared file paths
1026 """
1027 shared_files = {}
1029 # Search for all .py files in tools, resources, and prompts directories
1030 for dir_name in ["tools", "resources", "prompts"]:
1031 base_dir = project_path / dir_name
1032 if not base_dir.exists() or not base_dir.is_dir():
1033 continue
1035 # Find all .py files (recursively)
1036 for py_file in base_dir.glob("**/*.py"):
1037 # Skip files in __pycache__ or other hidden directories
1038 if "__pycache__" in py_file.parts or any(part.startswith(".") for part in py_file.parts):
1039 continue
1041 # Skip files that are Golf components (have export or resource_uri)
1042 if _is_golf_component_file(py_file):
1043 continue
1045 # Calculate the module path for this shared file
1046 # For example: tools/weather/helpers.py -> tools/weather/helpers
1047 relative_path = py_file.relative_to(project_path)
1048 module_path = str(relative_path.with_suffix('')) # Remove .py extension
1050 shared_files[module_path] = py_file
1052 return shared_files