Coverage for mcp_bridge/tools/lsp/tools.py: 0%
445 statements
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
« prev ^ index » next coverage.py v7.10.1, created at 2026-01-10 00:20 -0500
1"""
2LSP Tools - Advanced Language Server Protocol Operations
4Provides comprehensive LSP functionality via persistent connections to language servers.
5Supplements Claude Code's native LSP support with advanced operations.
6"""
8import asyncio
9import json
10import logging
11import subprocess
12import sys
13from pathlib import Path
14from typing import Any
15from urllib.parse import unquote, urlparse
17# Use lsprotocol for types
18try:
19 from lsprotocol.types import (
20 CodeActionContext,
21 CodeActionParams,
22 CodeActionTriggerKind,
23 DidCloseTextDocumentParams,
24 DidOpenTextDocumentParams,
25 DocumentSymbolParams,
26 HoverParams,
27 Location,
28 Position,
29 PrepareRenameParams,
30 Range,
31 ReferenceContext,
32 ReferenceParams,
33 RenameParams,
34 TextDocumentIdentifier,
35 TextDocumentItem,
36 TextDocumentPositionParams,
37 WorkspaceSymbolParams,
38 )
39except ImportError:
40 # Fallback/Mock for environment without lsprotocol
41 pass
43from .manager import get_lsp_manager
45logger = logging.getLogger(__name__)
48def _get_language_for_file(file_path: str) -> str:
49 """Determine language from file extension."""
50 suffix = Path(file_path).suffix.lower()
51 mapping = {
52 ".py": "python",
53 ".ts": "typescript",
54 ".tsx": "typescriptreact",
55 ".js": "javascript",
56 ".jsx": "javascriptreact",
57 ".go": "go",
58 ".rs": "rust",
59 ".java": "java",
60 ".rb": "ruby",
61 ".c": "c",
62 ".cpp": "cpp",
63 ".h": "c",
64 ".hpp": "cpp",
65 }
66 return mapping.get(suffix, "unknown")
69async def _get_client_and_params(
70 file_path: str, needs_open: bool = True
71) -> tuple[Any | None, str | None, str]:
72 """
73 Get LSP client and prepare file for operations.
75 Returns:
76 (client, uri, language)
77 """
78 path = Path(file_path)
79 if not path.exists():
80 return None, None, "unknown"
82 lang = _get_language_for_file(file_path)
83 manager = get_lsp_manager()
84 client = await manager.get_server(lang)
86 if not client:
87 return None, None, lang
89 uri = f"file://{path.absolute()}"
91 if needs_open:
92 try:
93 content = path.read_text()
94 # Send didOpen notification
95 # We don't check if it's already open because we're stateless-ish
96 # and want to ensure fresh content.
97 # Using version=1
98 params = DidOpenTextDocumentParams(
99 text_document=TextDocumentItem(uri=uri, language_id=lang, version=1, text=content)
100 )
101 client.protocol.notify("textDocument/didOpen", params)
102 except Exception as e:
103 logger.warning(f"Failed to send didOpen for {file_path}: {e}")
105 return client, uri, lang
108async def lsp_hover(file_path: str, line: int, character: int) -> str:
109 """
110 Get type info, documentation, and signature at a position.
111 """
112 # USER-VISIBLE NOTIFICATION
113 print(f"📍 LSP-HOVER: {file_path}:{line}:{character}", file=sys.stderr)
115 client, uri, lang = await _get_client_and_params(file_path)
117 if client:
118 try:
119 params = HoverParams(
120 text_document=TextDocumentIdentifier(uri=uri),
121 position=Position(line=line - 1, character=character),
122 )
124 response = await asyncio.wait_for(
125 client.protocol.send_request_async("textDocument/hover", params), timeout=5.0
126 )
128 if response and response.contents:
129 # Handle MarkupContent or text
130 contents = response.contents
131 if hasattr(contents, "value"):
132 return contents.value
133 elif isinstance(contents, list):
134 return "\n".join([str(c) for c in contents])
135 return str(contents)
137 return f"No hover info at line {line}, character {character}"
139 except Exception as e:
140 logger.error(f"LSP hover failed: {e}")
141 # Fall through to legacy fallback
143 # Legacy Fallback
144 path = Path(file_path)
145 if not path.exists():
146 return f"Error: File not found: {file_path}"
148 try:
149 if lang == "python":
150 # Use jedi for Python hover info
151 result = subprocess.run(
152 [
153 "python",
154 "-c",
155 f"""
156import jedi
157script = jedi.Script(path='{file_path}')
158completions = script.infer({line}, {character})
159for c in completions[:1]:
160 logger.info(f"Type: {{c.type}}")
161 logger.info(f"Name: {{c.full_name}}")
162 if c.docstring():
163 logger.info(f"\\nDocstring:\\n{{c.docstring()[:500]}}")
164""",
165 ],
166 capture_output=True,
167 text=True,
168 timeout=10,
169 )
170 output = result.stdout.strip()
171 if output:
172 return output
173 return f"No hover info at line {line}, character {character}"
175 elif lang in ("typescript", "javascript", "typescriptreact", "javascriptreact"):
176 return "TypeScript hover requires running language server. Use Claude Code's native hover."
178 else:
179 return f"Hover not available for language: {lang}"
181 except FileNotFoundError as e:
182 return f"Tool not found: {e.filename}. Install jedi: pip install jedi"
183 except subprocess.TimeoutExpired:
184 return "Hover lookup timed out"
185 except Exception as e:
186 return f"Error: {str(e)}"
189async def lsp_goto_definition(file_path: str, line: int, character: int) -> str:
190 """
191 Find where a symbol is defined.
192 """
193 # USER-VISIBLE NOTIFICATION
194 print(f"🎯 LSP-GOTO-DEF: {file_path}:{line}:{character}", file=sys.stderr)
196 client, uri, lang = await _get_client_and_params(file_path)
198 if client:
199 try:
200 params = TextDocumentPositionParams(
201 text_document=TextDocumentIdentifier(uri=uri),
202 position=Position(line=line - 1, character=character),
203 )
205 response = await asyncio.wait_for(
206 client.protocol.send_request_async("textDocument/definition", params), timeout=5.0
207 )
209 if response:
210 if isinstance(response, list):
211 locations = response
212 else:
213 locations = [response]
215 results = []
216 for loc in locations:
217 # Parse URI to path
218 target_uri = loc.uri
219 parsed = urlparse(target_uri)
220 target_path = unquote(parsed.path)
222 # Handle range
223 start_line = loc.range.start.line + 1
224 start_char = loc.range.start.character
225 results.append(f"{target_path}:{start_line}:{start_char}")
227 if results:
228 return "\n".join(results)
230 return "No definition found"
232 except Exception as e:
233 logger.error(f"LSP goto definition failed: {e}")
234 # Fall through
236 # Legacy fallback logic... (copy from existing)
237 path = Path(file_path)
238 if not path.exists():
239 return f"Error: File not found: {file_path}"
241 try:
242 if lang == "python":
243 result = subprocess.run(
244 [
245 "python",
246 "-c",
247 f"""
248import jedi
249script = jedi.Script(path='{file_path}')
250definitions = script.goto({line}, {character})
251for d in definitions:
252 logger.info(f"{{d.module_path}}:{{d.line}}:{{d.column}} - {{d.full_name}}")
253""",
254 ],
255 capture_output=True,
256 text=True,
257 timeout=10,
258 )
259 output = result.stdout.strip()
260 if output:
261 return output
262 return "No definition found"
264 elif lang in ("typescript", "javascript"):
265 return "TypeScript goto definition requires running language server. Use Claude Code's native navigation."
267 else:
268 return f"Goto definition not available for language: {lang}"
270 except FileNotFoundError:
271 return "Tool not found: Install jedi: pip install jedi"
272 except subprocess.TimeoutExpired:
273 return "Definition lookup timed out"
274 except Exception as e:
275 return f"Error: {str(e)}"
278async def lsp_find_references(
279 file_path: str, line: int, character: int, include_declaration: bool = True
280) -> str:
281 """
282 Find all references to a symbol across the workspace.
283 """
284 # USER-VISIBLE NOTIFICATION
285 print(f"🔗 LSP-REFS: {file_path}:{line}:{character}", file=sys.stderr)
287 client, uri, lang = await _get_client_and_params(file_path)
289 if client:
290 try:
291 params = ReferenceParams(
292 text_document=TextDocumentIdentifier(uri=uri),
293 position=Position(line=line - 1, character=character),
294 context=ReferenceContext(include_declaration=include_declaration),
295 )
297 response = await asyncio.wait_for(
298 client.protocol.send_request_async("textDocument/references", params), timeout=10.0
299 )
301 if response:
302 results = []
303 for loc in response:
304 # Parse URI to path
305 target_uri = loc.uri
306 parsed = urlparse(target_uri)
307 target_path = unquote(parsed.path)
309 start_line = loc.range.start.line + 1
310 start_char = loc.range.start.character
311 results.append(f"{target_path}:{start_line}:{start_char}")
313 if results:
314 # Limit output
315 if len(results) > 50:
316 return "\n".join(results[:50]) + f"\n... and {len(results) - 50} more"
317 return "\n".join(results)
319 return "No references found"
321 except Exception as e:
322 logger.error(f"LSP find references failed: {e}")
324 # Legacy fallback...
325 path = Path(file_path)
326 if not path.exists():
327 return f"Error: File not found: {file_path}"
329 try:
330 if lang == "python":
331 result = subprocess.run(
332 [
333 "python",
334 "-c",
335 f"""
336import jedi
337script = jedi.Script(path='{file_path}')
338references = script.get_references({line}, {character}, include_builtins=False)
339for r in references[:30]:
340 logger.info(f"{{r.module_path}}:{{r.line}}:{{r.column}}")
341if len(references) > 30:
342 logger.info(f"... and {{len(references) - 30}} more")
343""",
344 ],
345 capture_output=True,
346 text=True,
347 timeout=15,
348 )
349 output = result.stdout.strip()
350 if output:
351 return output
352 return "No references found"
354 else:
355 return f"Find references not available for language: {lang}"
357 except subprocess.TimeoutExpired:
358 return "Reference search timed out"
359 except Exception as e:
360 return f"Error: {str(e)}"
363async def lsp_document_symbols(file_path: str) -> str:
364 """
365 Get hierarchical outline of all symbols in a file.
366 """
367 # USER-VISIBLE NOTIFICATION
368 print(f"📋 LSP-SYMBOLS: {file_path}", file=sys.stderr)
370 client, uri, lang = await _get_client_and_params(file_path)
372 if client:
373 try:
374 params = DocumentSymbolParams(text_document=TextDocumentIdentifier(uri=uri))
376 response = await asyncio.wait_for(
377 client.protocol.send_request_async("textDocument/documentSymbol", params),
378 timeout=5.0,
379 )
381 if response:
382 lines = []
383 # response can be List[DocumentSymbol] or List[SymbolInformation]
384 # We'll handle a flat list representation for simplicity or traverse if hierarchical
385 # For output, a simple flat list with indentation is good.
387 # Helper to process symbols
388 def process_symbols(symbols, indent=0):
389 for sym in symbols:
390 name = sym.name
391 kind = str(sym.kind) # Enum integer
392 # Map some kinds to text if possible, but int is fine or name
394 # Handle location
395 if hasattr(sym, "range"): # DocumentSymbol
396 line = sym.range.start.line + 1
397 children = getattr(sym, "children", [])
398 else: # SymbolInformation
399 line = sym.location.range.start.line + 1
400 children = []
402 lines.append(f"{line:4d} | {' ' * indent}{kind:4} {name}")
404 if children:
405 process_symbols(children, indent + 1)
407 process_symbols(response)
409 if lines:
410 return (
411 f"**Symbols in {Path(file_path).name}:**\n```\nLine | Kind Name\n"
412 + "\n".join(lines)
413 + "\n```"
414 )
416 return "No symbols found"
418 except Exception as e:
419 logger.error(f"LSP document symbols failed: {e}")
421 # Legacy fallback...
422 path = Path(file_path)
423 if not path.exists():
424 return f"Error: File not found: {file_path}"
426 try:
427 if lang == "python":
428 result = subprocess.run(
429 [
430 "python",
431 "-c",
432 f"""
433import jedi
434script = jedi.Script(path='{file_path}')
435names = script.get_names(all_scopes=True, definitions=True)
436for n in names:
437 indent = " " * (n.get_line_code().count(" ") if n.get_line_code() else 0)
438 logger.info(f"{{n.line:4d}} | {{indent}}{{n.type:10}} {{n.name}}")
439""",
440 ],
441 capture_output=True,
442 text=True,
443 timeout=10,
444 )
445 output = result.stdout.strip()
446 if output:
447 return f"**Symbols in {path.name}:**\n```\nLine | Symbol\n{output}\n```"
448 return "No symbols found"
450 else:
451 # Fallback: use ctags
452 result = subprocess.run(
453 ["ctags", "-x", "--sort=no", str(path)],
454 capture_output=True,
455 text=True,
456 timeout=10,
457 )
458 output = result.stdout.strip()
459 if output:
460 return f"**Symbols in {path.name}:**\n```\n{output}\n```"
461 return "No symbols found"
463 except FileNotFoundError:
464 return "Install jedi (pip install jedi) or ctags for symbol lookup"
465 except subprocess.TimeoutExpired:
466 return "Symbol lookup timed out"
467 except Exception as e:
468 return f"Error: {str(e)}"
471async def lsp_workspace_symbols(query: str, directory: str = ".") -> str:
472 """
473 Search for symbols by name across the entire workspace.
474 """
475 # USER-VISIBLE NOTIFICATION
476 print(f"🔍 LSP-WS-SYMBOLS: query='{query}' dir={directory}", file=sys.stderr)
478 # We need any client (python/ts) to search workspace, or maybe all of them?
479 # Workspace symbols usually require a server to be initialized.
480 # We can try to get python server if available, or just fallback to ripgrep if no persistent server is appropriate.
481 # LSP 'workspace/symbol' is language-specific.
483 manager = get_lsp_manager()
484 results = []
486 # Try Python
487 client_py = await manager.get_server("python")
488 if client_py:
489 try:
490 params = WorkspaceSymbolParams(query=query)
491 response = await asyncio.wait_for(
492 client_py.protocol.send_request_async("workspace/symbol", params), timeout=5.0
493 )
494 if response:
495 for sym in response:
496 target_uri = sym.location.uri
497 parsed = urlparse(target_uri)
498 target_path = unquote(parsed.path)
499 line = sym.location.range.start.line + 1
500 results.append(f"{target_path}:{line} - {sym.name} ({sym.kind})")
501 except Exception as e:
502 logger.error(f"LSP workspace symbols (python) failed: {e}")
504 if results:
505 return "\n".join(results[:20])
507 # Fallback to legacy grep/ctags
508 try:
509 # Use ctags to index and grep for symbols
510 result = subprocess.run(
511 ["rg", "-l", query, directory, "--type", "py", "--type", "ts", "--type", "js"],
512 capture_output=True,
513 text=True,
514 timeout=15,
515 )
517 files = result.stdout.strip().split("\n")[:10] # Limit files
519 if not files or files == [""]:
520 return "No matching files found"
522 symbols = []
523 for f in files:
524 if not f:
525 continue
526 # Get symbols from each file
527 ctags_result = subprocess.run(
528 ["ctags", "-x", "--sort=no", f],
529 capture_output=True,
530 text=True,
531 timeout=5,
532 )
533 for line in ctags_result.stdout.split("\n"):
534 if query.lower() in line.lower():
535 symbols.append(line)
537 if symbols:
538 return "\n".join(symbols[:20])
539 return f"No symbols matching '{query}' found"
541 except FileNotFoundError:
542 return "Install ctags and ripgrep for workspace symbol search"
543 except subprocess.TimeoutExpired:
544 return "Search timed out"
545 except Exception as e:
546 return f"Error: {str(e)}"
549async def lsp_prepare_rename(file_path: str, line: int, character: int) -> str:
550 """
551 Check if a symbol at position can be renamed.
552 """
553 # USER-VISIBLE NOTIFICATION
554 print(f"✏️ LSP-PREP-RENAME: {file_path}:{line}:{character}", file=sys.stderr)
556 client, uri, lang = await _get_client_and_params(file_path)
558 if client:
559 try:
560 params = PrepareRenameParams(
561 text_document=TextDocumentIdentifier(uri=uri),
562 position=Position(line=line - 1, character=character),
563 )
565 response = await asyncio.wait_for(
566 client.protocol.send_request_async("textDocument/prepareRename", params),
567 timeout=5.0,
568 )
570 if response:
571 # Response can be Range, {range, placeholder}, or null
572 if hasattr(response, "placeholder"):
573 return f"✅ Rename is valid. Current name: {response.placeholder}"
574 return "✅ Rename is valid at this position"
576 # If null/false, invalid
577 return "❌ Rename not valid at this position"
579 except Exception as e:
580 logger.error(f"LSP prepare rename failed: {e}")
581 return f"Prepare rename failed: {e}"
583 # Fallback
584 path = Path(file_path)
585 if not path.exists():
586 return f"Error: File not found: {file_path}"
588 try:
589 if lang == "python":
590 result = subprocess.run(
591 [
592 "python",
593 "-c",
594 f"""
595import jedi
596script = jedi.Script(path='{file_path}')
597refs = script.get_references({line}, {character})
598if refs:
599 logger.info(f"Symbol: {{refs[0].name}}")
600 logger.info(f"Type: {{refs[0].type}}")
601 logger.info(f"References: {{len(refs)}}")
602 logger.info("✅ Rename is valid")
603else:
604 logger.info("❌ No symbol found at position")
605""",
606 ],
607 capture_output=True,
608 text=True,
609 timeout=10,
610 )
611 return result.stdout.strip() or "No symbol found at position"
613 else:
614 return f"Prepare rename not available for language: {lang}"
616 except Exception as e:
617 return f"Error: {str(e)}"
620async def lsp_rename(
621 file_path: str, line: int, character: int, new_name: str, dry_run: bool = True
622) -> str:
623 """
624 Rename a symbol across the workspace.
625 """
626 # USER-VISIBLE NOTIFICATION
627 mode = "dry-run" if dry_run else "APPLY"
628 print(f"✏️ LSP-RENAME: {file_path}:{line}:{character} → '{new_name}' [{mode}]", file=sys.stderr)
630 client, uri, lang = await _get_client_and_params(file_path)
632 if client:
633 try:
634 params = RenameParams(
635 text_document=TextDocumentIdentifier(uri=uri),
636 position=Position(line=line - 1, character=character),
637 new_name=new_name,
638 )
640 response = await asyncio.wait_for(
641 client.protocol.send_request_async("textDocument/rename", params), timeout=10.0
642 )
644 if response and response.changes:
645 # WorkspaceEdit
646 changes_summary = []
647 for file_uri, edits in response.changes.items():
648 parsed = urlparse(file_uri)
649 path_str = unquote(parsed.path)
650 changes_summary.append(f"File: {path_str}")
651 for edit in edits:
652 changes_summary.append(
653 f" Line {edit.range.start.line + 1}: {edit.new_text}"
654 )
656 output = "\n".join(changes_summary)
658 if dry_run:
659 return f"**Would rename to '{new_name}':**\n{output}"
660 else:
661 # Apply changes
662 # Since we are an MCP tool, we should ideally use the Edit tool or similar.
663 # But the 'Apply' contract implies we do it.
664 # We have file paths and edits. We should apply them.
665 # Implementation detail: Applying edits to files is complex to do robustly here without the Edit tool.
666 # However, since this tool is rewriting 'lsp_rename', we must support applying.
667 # But 'tools.py' previously used `jedi.refactoring.apply()`.
669 # For now, we'll return the diff and instruction to use Edit, OR implement a basic applier.
670 # Given the instruction "Rewrite ... to use the persistent client", implying functionality parity.
671 # Applying edits from LSP response requires careful handling.
673 # Let's try to apply if not dry_run
674 try:
675 _apply_workspace_edit(response.changes)
676 return f"✅ Renamed to '{new_name}'. Modified files:\n{output}"
677 except Exception as e:
678 return f"Failed to apply edits: {e}\nDiff:\n{output}"
680 return "No changes returned from server"
682 except Exception as e:
683 logger.error(f"LSP rename failed: {e}")
685 # Fallback
686 path = Path(file_path)
687 if not path.exists():
688 return f"Error: File not found: {file_path}"
690 try:
691 if lang == "python":
692 result = subprocess.run(
693 [
694 "python",
695 "-c",
696 f"""
697import jedi
698script = jedi.Script(path='{file_path}')
699refactoring = script.rename({line}, {character}, new_name='{new_name}')
700for path, changed in refactoring.get_changed_files().items():
701 logger.info(f"File: {{path}}")
702 logger.info(changed[:500])
703 logger.info("---")
704""",
705 ],
706 capture_output=True,
707 text=True,
708 timeout=15,
709 )
710 output = result.stdout.strip()
711 if output and not dry_run:
712 # Apply changes - Jedi handles this? No, get_changed_files returns the content.
713 return f"**Dry run** (set dry_run=False to apply):\n{output}"
714 elif output:
715 return f"**Would rename to '{new_name}':**\n{output}"
716 return "No changes needed"
718 else:
719 return f"Rename not available for language: {lang}. Use IDE refactoring."
721 except Exception as e:
722 return f"Error: {str(e)}"
725def _apply_workspace_edit(changes: dict[str, list[Any]]):
726 """Apply LSP changes to files."""
727 for file_uri, edits in changes.items():
728 parsed = urlparse(file_uri)
729 path = Path(unquote(parsed.path))
730 if not path.exists():
731 continue
733 content = path.read_text().splitlines(keepends=True)
734 # Apply edits in reverse order to preserve offsets
735 # Note: robust application requires handling multiple edits on same line, etc.
736 # This is a simplified version.
738 # Sort edits by start position descending
739 edits.sort(key=lambda e: (e.range.start.line, e.range.start.character), reverse=True)
741 for edit in edits:
742 start_line = edit.range.start.line
743 start_char = edit.range.start.character
744 end_line = edit.range.end.line
745 end_char = edit.range.end.character
746 new_text = edit.new_text
748 # This is tricky with splitlines.
749 # Convert to single string, patch, then split back?
750 # Or assume non-overlapping simple edits.
752 if start_line == end_line:
753 line_content = content[start_line]
754 content[start_line] = line_content[:start_char] + new_text + line_content[end_char:]
755 else:
756 # Multi-line edit - complex
757 # For safety, raise error for complex edits
758 raise NotImplementedError(
759 "Complex multi-line edits not safe to apply automatically yet."
760 )
762 # Write back
763 path.write_text("".join(content))
766async def lsp_code_actions(file_path: str, line: int, character: int) -> str:
767 """
768 Get available quick fixes and refactorings at a position.
769 """
770 # USER-VISIBLE NOTIFICATION
771 print(f"💡 LSP-ACTIONS: {file_path}:{line}:{character}", file=sys.stderr)
773 client, uri, lang = await _get_client_and_params(file_path)
775 if client:
776 try:
777 params = CodeActionParams(
778 text_document=TextDocumentIdentifier(uri=uri),
779 range=Range(
780 start=Position(line=line - 1, character=character),
781 end=Position(line=line - 1, character=character),
782 ),
783 context=CodeActionContext(
784 diagnostics=[]
785 ), # We should ideally provide diagnostics here
786 )
788 response = await asyncio.wait_for(
789 client.protocol.send_request_async("textDocument/codeAction", params), timeout=5.0
790 )
792 if response:
793 actions = []
794 for action in response:
795 title = action.title
796 kind = action.kind
797 actions.append(f"- {title} ({kind})")
798 return "**Available code actions:**\n" + "\n".join(actions)
799 return "No code actions available at this position"
801 except Exception as e:
802 logger.error(f"LSP code actions failed: {e}")
804 # Fallback
805 path = Path(file_path)
806 if not path.exists():
807 return f"Error: File not found: {file_path}"
809 try:
810 if lang == "python":
811 # Use ruff to suggest fixes
812 result = subprocess.run(
813 ["ruff", "check", str(path), "--output-format=json", "--show-fixes"],
814 capture_output=True,
815 text=True,
816 timeout=10,
817 )
819 try:
820 diagnostics = json.loads(result.stdout)
821 actions = []
822 for d in diagnostics:
823 if d.get("location", {}).get("row") == line:
824 code = d.get("code", "")
825 msg = d.get("message", "")
826 fix = d.get("fix", {})
827 if fix:
828 actions.append(f"- [{code}] {msg} (auto-fix available)")
829 else:
830 actions.append(f"- [{code}] {msg}")
832 if actions:
833 return "**Available code actions:**\n" + "\n".join(actions)
834 return "No code actions available at this position"
836 except json.JSONDecodeError:
837 return "No code actions available"
839 else:
840 return f"Code actions not available for language: {lang}"
842 except FileNotFoundError:
843 return "Install ruff for Python code actions: pip install ruff"
844 except Exception as e:
845 return f"Error: {str(e)}"
848async def lsp_code_action_resolve(file_path: str, action_code: str, line: int = None) -> str:
849 """
850 Apply a specific code action/fix to a file.
851 """
852 # USER-VISIBLE NOTIFICATION
853 print(f"🔧 LSP-RESOLVE: {action_code} at {file_path}", file=sys.stderr)
855 # Implementing via LSP requires 'codeAction/resolve' which is complex.
856 # We stick to Ruff fallback for now as it's more direct for Python "fixes".
857 # Unless we want to use the persistent client to trigger the action.
858 # Most LSP servers return the Edit in the CodeAction response, so resolve might not be needed if we cache the actions.
859 # But since this is a stateless call, we can't easily resolve a previous action.
861 # We'll default to the existing robust Ruff implementation for Python.
863 path = Path(file_path)
864 if not path.exists():
865 return f"Error: File not found: {file_path}"
867 lang = _get_language_for_file(file_path)
869 if lang == "python":
870 try:
871 result = subprocess.run(
872 ["ruff", "check", str(path), "--fix", "--select", action_code],
873 capture_output=True,
874 text=True,
875 timeout=15,
876 )
878 if result.returncode == 0:
879 return f"✅ Applied fix [{action_code}] to {path.name}"
880 else:
881 stderr = result.stderr.strip()
882 if stderr:
883 return f"⚠️ {stderr}"
884 return f"No changes needed for action [{action_code}]"
886 except FileNotFoundError:
887 return "Install ruff: pip install ruff"
888 except subprocess.TimeoutExpired:
889 return "Timeout applying fix"
890 except Exception as e:
891 return f"Error: {str(e)}"
893 return f"Code action resolve not implemented for language: {lang}"
896async def lsp_extract_refactor(
897 file_path: str,
898 start_line: int,
899 start_char: int,
900 end_line: int,
901 end_char: int,
902 new_name: str,
903 kind: str = "function",
904) -> str:
905 """
906 Extract code to a function or variable.
907 """
908 # USER-VISIBLE NOTIFICATION
909 print(
910 f"🔧 LSP-EXTRACT: {kind} '{new_name}' from {file_path}:{start_line}-{end_line}",
911 file=sys.stderr,
912 )
914 # This is not a standard LSP method, though some servers support it via CodeActions or commands.
915 # Jedi natively supports it via library, so we keep the fallback.
916 # CodeAction might return 'refactor.extract'.
918 path = Path(file_path)
919 if not path.exists():
920 return f"Error: File not found: {file_path}"
922 lang = _get_language_for_file(file_path)
924 if lang == "python":
925 try:
926 import jedi
928 source = path.read_text()
929 script = jedi.Script(source, path=path)
931 if kind == "function":
932 refactoring = script.extract_function(
933 line=start_line, until_line=end_line, new_name=new_name
934 )
935 else: # variable
936 refactoring = script.extract_variable(
937 line=start_line, until_line=end_line, new_name=new_name
938 )
940 # Get the diff
941 changes = refactoring.get_diff()
942 return f"✅ Extract {kind} preview:\n```diff\n{changes}\n```\n\nTo apply: use Edit tool with the changes above"
944 except AttributeError:
945 return "Jedi version doesn't support extract refactoring. Upgrade: pip install -U jedi"
946 except Exception as e:
947 return f"Extract failed: {str(e)}"
949 return f"Extract refactoring not implemented for language: {lang}"
952async def lsp_servers() -> str:
953 """
954 List available LSP servers and their installation status.
955 """
956 # USER-VISIBLE NOTIFICATION
957 print("🖥️ LSP-SERVERS: listing installed servers", file=sys.stderr)
959 servers = [
960 ("python", "jedi", "pip install jedi"),
961 ("python", "jedi-language-server", "pip install jedi-language-server"),
962 ("python", "ruff", "pip install ruff"),
963 ("typescript", "typescript-language-server", "npm i -g typescript-language-server"),
964 ("go", "gopls", "go install golang.org/x/tools/gopls@latest"),
965 ("rust", "rust-analyzer", "rustup component add rust-analyzer"),
966 ]
968 lines = ["| Language | Server | Status | Install |", "|----------|--------|--------|---------|"]
970 for lang, server, install in servers:
971 # Check if installed
972 try:
973 subprocess.run([server, "--version"], capture_output=True, timeout=2)
974 status = "✅ Installed"
975 except FileNotFoundError:
976 status = "❌ Not installed"
977 except Exception:
978 status = "⚠️ Unknown"
980 lines.append(f"| {lang} | {server} | {status} | `{install}` |")
982 return "\n".join(lines)
985async def lsp_health() -> str:
986 """
987 Check health of persistent LSP servers.
988 """
989 manager = get_lsp_manager()
990 status = manager.get_status()
992 if not status:
993 return "No LSP servers configured"
995 lines = [
996 "**LSP Server Health:**",
997 "| Language | Status | PID | Restarts | Command |",
998 "|---|---|---|---|---|",
999 ]
1001 for lang, info in status.items():
1002 state = "✅ Running" if info["running"] else "❌ Stopped"
1003 pid = info["pid"] or "-"
1004 restarts = info["restarts"]
1005 cmd = info["command"]
1007 # Truncate command if too long
1008 if len(cmd) > 30:
1009 cmd = cmd[:27] + "..."
1011 lines.append(f"| {lang} | {state} | {pid} | {restarts} | `{cmd}` |")
1013 return "\n".join(lines)