Coverage for src/jtech_installer/installer/agent_installer.py: 89%
194 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-20 15:10 -0300
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-20 15:10 -0300
1#!/usr/bin/env python3
2"""
3Instalador de Agentes Especializados JTECH™ Core
4Copia e configura todos os agentes especializados do framework.
5"""
7import hashlib
8import shutil
9from dataclasses import dataclass, field
10from enum import Enum
11from pathlib import Path
12from typing import Any, Dict, List, Optional
14import yaml
16from ..core.models import InstallationConfig
19class AgentType(Enum):
20 """Tipos de agentes suportados."""
22 CHATMODE = "chatmode"
23 TEMPLATE = "template"
24 WORKFLOW = "workflow"
25 SPECIALIST = "specialist"
28@dataclass
29class AgentInfo:
30 """Informações sobre um agente."""
32 name: str
33 type: AgentType
34 source_path: Path
35 target_path: Path
36 description: str = ""
37 metadata: Dict[str, Any] = field(default_factory=dict)
38 checksum: str = ""
39 installed: bool = False
42class AgentInstaller:
43 """
44 Gerencia instalação de agentes especializados JTECH™.
46 Responsável por:
47 - Descobrir agentes disponíveis
48 - Copiar arquivos mantendo estrutura
49 - Verificar integridade pós-instalação
50 - Gerenciar metadados de agentes
51 """
53 def __init__(self, config: InstallationConfig):
54 """Inicializa o instalador de agentes."""
55 self.config = config
56 self.project_path = Path(config.project_path)
57 self.jtech_core_path = self.project_path / ".jtech-core"
58 self.agents_path = self.jtech_core_path / "agents"
59 self.agents_registry = self.jtech_core_path / "registry" / "agents.yml"
61 # Mapeamento de diretórios de agentes
62 self.agent_directories = {
63 AgentType.CHATMODE: "chatmodes",
64 AgentType.TEMPLATE: "templates",
65 AgentType.WORKFLOW: "workflows",
66 AgentType.SPECIALIST: "specialists",
67 }
69 self.discovered_agents: List[AgentInfo] = []
70 self.installation_log: List[str] = []
72 def discover_agents(
73 self, source_dir: Optional[Path] = None
74 ) -> List[AgentInfo]:
75 """
76 Descobre agentes disponíveis para instalação.
78 Args:
79 source_dir: Diretório fonte dos agentes (opcional)
81 Returns:
82 Lista de agentes descobertos
83 """
84 if source_dir is None:
85 # Procurar agentes no próprio projeto
86 source_dir = self.project_path / "agents"
88 if not source_dir.exists():
89 msg = f"Diretório de agentes não encontrado: {source_dir}"
90 self.installation_log.append(msg)
91 return []
93 agents = []
95 # Buscar ChatModes (.chatmode.md)
96 chatmode_pattern = "**/*.chatmode.md"
97 for chatmode_file in source_dir.glob(chatmode_pattern):
98 agent_info = self._parse_agent_file(
99 chatmode_file, AgentType.CHATMODE
100 )
101 if agent_info:
102 agents.append(agent_info)
104 # Buscar Templates (.md)
105 template_pattern = "**/templates/**/*.md"
106 for template_file in source_dir.glob(template_pattern):
107 agent_info = self._parse_agent_file(
108 template_file, AgentType.TEMPLATE
109 )
110 if agent_info:
111 agents.append(agent_info)
113 # Buscar Workflows (.yml, .yaml)
114 workflow_patterns = ["**/workflows/**/*.yml", "**/workflows/**/*.yaml"]
115 for workflow_pattern in workflow_patterns:
116 for workflow_file in source_dir.glob(workflow_pattern):
117 agent_info = self._parse_agent_file(
118 workflow_file, AgentType.WORKFLOW
119 )
120 if agent_info:
121 agents.append(agent_info)
123 # Buscar Especialistas (outros .md)
124 specialist_pattern = "**/*.md"
125 for specialist_file in source_dir.glob(specialist_pattern):
126 # Excluir chatmodes e templates já processados
127 if (
128 ".chatmode." not in specialist_file.name
129 and "/templates/" not in str(specialist_file)
130 ):
131 agent_info = self._parse_agent_file(
132 specialist_file, AgentType.SPECIALIST
133 )
134 if agent_info:
135 agents.append(agent_info)
137 self.discovered_agents = agents
138 msg = f"Descobertos {len(agents)} agentes para instalação"
139 self.installation_log.append(msg)
140 return agents
142 def _parse_agent_file(
143 self, file_path: Path, agent_type: AgentType
144 ) -> Optional[AgentInfo]:
145 """
146 Parseia arquivo de agente e extrai metadados.
148 Args:
149 file_path: Caminho do arquivo
150 agent_type: Tipo do agente
152 Returns:
153 AgentInfo ou None se erro
154 """
155 try:
156 # Calcular checksum
157 checksum = self._calculate_checksum(file_path)
159 # Determinar nome e caminho alvo
160 name = file_path.stem
161 if agent_type == AgentType.CHATMODE:
162 name = name.replace(".chatmode", "")
163 target_subdir = "chatmodes"
164 else:
165 target_subdir = self.agent_directories[agent_type]
167 target_path = self.agents_path / target_subdir / file_path.name
169 # Extrair metadados do arquivo
170 metadata = self._extract_metadata(file_path)
171 description = metadata.get(
172 "description", f"{agent_type.value} - {name}"
173 )
175 return AgentInfo(
176 name=name,
177 type=agent_type,
178 source_path=file_path,
179 target_path=target_path,
180 description=description,
181 metadata=metadata,
182 checksum=checksum,
183 )
185 except Exception as e:
186 self.installation_log.append(f"Erro ao parsear {file_path}: {e}")
187 return None
189 def _extract_metadata(self, file_path: Path) -> Dict[str, Any]:
190 """
191 Extrai metadados do arquivo de agente.
193 Args:
194 file_path: Caminho do arquivo
196 Returns:
197 Dicionário com metadados
198 """
199 metadata = {}
201 try:
202 with open(file_path, "r", encoding="utf-8") as f:
203 content = f.read()
205 # Buscar por frontmatter YAML
206 if content.startswith("---"):
207 end_pos = content.find("---", 3)
208 if end_pos > 0:
209 frontmatter = content[3:end_pos].strip()
210 try:
211 metadata = yaml.safe_load(frontmatter) or {}
212 except yaml.YAMLError:
213 pass
215 # Extrair informações básicas do conteúdo
216 lines = content.split("\n")
217 for line in lines[:10]: # Primeiras 10 linhas
218 line = line.strip()
219 if line.startswith("# "):
220 metadata.setdefault("title", line[2:].strip())
221 elif "description:" in line.lower():
222 desc = line.split(":", 1)[1].strip()
223 metadata.setdefault("description", desc)
224 elif "version:" in line.lower():
225 ver = line.split(":", 1)[1].strip()
226 metadata.setdefault("version", ver)
228 return metadata
230 except Exception as e:
231 msg = f"Erro ao extrair metadados de {file_path}: {e}"
232 self.installation_log.append(msg)
233 return {}
235 def _calculate_checksum(self, file_path: Path) -> str:
236 """
237 Calcula checksum SHA256 do arquivo.
239 Args:
240 file_path: Caminho do arquivo
242 Returns:
243 Checksum hexadecimal
244 """
245 sha256_hash = hashlib.sha256()
247 try:
248 with open(file_path, "rb") as f:
249 for chunk in iter(lambda: f.read(4096), b""):
250 sha256_hash.update(chunk)
251 return sha256_hash.hexdigest()
252 except Exception:
253 return ""
255 def install_agents(
256 self,
257 agents: Optional[List[AgentInfo]] = None,
258 verify_integrity: bool = True,
259 ) -> Dict[str, Any]:
260 """
261 Instala agentes especializados.
263 Args:
264 agents: Lista específica de agentes (opcional)
265 verify_integrity: Verificar integridade após instalação
267 Returns:
268 Resultado da instalação
269 """
270 if agents is None:
271 agents = self.discovered_agents
273 if not agents:
274 return {
275 "success": False,
276 "error": "Nenhum agente descoberto para instalação",
277 "installed_count": 0,
278 "failed_count": 0,
279 }
281 # Criar diretórios necessários
282 self._ensure_agent_directories()
284 installed_agents = []
285 failed_agents = []
287 for agent in agents:
288 try:
289 # Copiar arquivo
290 target_dir = agent.target_path.parent
291 target_dir.mkdir(parents=True, exist_ok=True)
293 shutil.copy2(agent.source_path, agent.target_path)
295 # Verificar integridade se solicitado
296 if verify_integrity:
297 if not self._verify_agent_integrity(agent):
298 failed_agents.append(agent)
299 continue
301 agent.installed = True
302 installed_agents.append(agent)
303 msg = f"Instalado: {agent.name} ({agent.type.value})"
304 self.installation_log.append(msg)
306 except Exception as e:
307 failed_agents.append(agent)
308 msg = f"Falha na instalação de {agent.name}: {e}"
309 self.installation_log.append(msg)
311 # Atualizar registro de agentes
312 if installed_agents:
313 self._update_agents_registry(installed_agents)
315 return {
316 "success": len(failed_agents) == 0,
317 "installed_count": len(installed_agents),
318 "failed_count": len(failed_agents),
319 "installed_agents": [a.name for a in installed_agents],
320 "failed_agents": [a.name for a in failed_agents],
321 "log": self.installation_log.copy(),
322 }
324 def _ensure_agent_directories(self):
325 """Garante que diretórios de agentes existem."""
326 for agent_type, subdir in self.agent_directories.items():
327 dir_path = self.agents_path / subdir
328 dir_path.mkdir(parents=True, exist_ok=True)
330 def _verify_agent_integrity(self, agent: AgentInfo) -> bool:
331 """
332 Verifica integridade do agente instalado.
334 Args:
335 agent: Informações do agente
337 Returns:
338 True se íntegro
339 """
340 if not agent.target_path.exists():
341 return False
343 # Verificar checksum
344 installed_checksum = self._calculate_checksum(agent.target_path)
345 return installed_checksum == agent.checksum
347 def _update_agents_registry(self, installed_agents: List[AgentInfo]):
348 """
349 Atualiza registro de agentes instalados.
351 Args:
352 installed_agents: Lista de agentes instalados
353 """
354 try:
355 # Criar diretório do registro
356 registry_dir = self.agents_registry.parent
357 registry_dir.mkdir(parents=True, exist_ok=True)
359 # Ler registro existente ou criar novo
360 registry_data = {}
361 if self.agents_registry.exists():
362 with open(self.agents_registry, "r", encoding="utf-8") as f:
363 registry_data = yaml.safe_load(f) or {}
365 # Adicionar agentes instalados
366 if "agents" not in registry_data:
367 registry_data["agents"] = {}
369 for agent in installed_agents:
370 relative_path = agent.target_path.relative_to(
371 self.project_path
372 )
373 registry_data["agents"][agent.name] = {
374 "type": agent.type.value,
375 "installed_path": str(relative_path),
376 "checksum": agent.checksum,
377 "metadata": agent.metadata,
378 # Será preenchido por outro sistema se necessário
379 "installed_at": None,
380 }
382 # Salvar registro atualizado
383 with open(self.agents_registry, "w", encoding="utf-8") as f:
384 yaml.dump(
385 registry_data,
386 f,
387 default_flow_style=False,
388 allow_unicode=True,
389 )
391 msg = (
392 f"Registro de agentes atualizado: {len(installed_agents)} "
393 "entradas"
394 )
395 self.installation_log.append(msg)
397 except Exception as e:
398 msg = f"Erro ao atualizar registro de agentes: {e}"
399 self.installation_log.append(msg)
401 def list_installed_agents(self) -> List[Dict[str, Any]]:
402 """
403 Lista agentes instalados.
405 Returns:
406 Lista de agentes instalados
407 """
408 installed = []
410 if not self.agents_registry.exists():
411 return installed
413 try:
414 with open(self.agents_registry, "r", encoding="utf-8") as f:
415 registry_data = yaml.safe_load(f) or {}
417 agents_data = registry_data.get("agents", {})
418 for name, info in agents_data.items():
419 installed.append(
420 {
421 "name": name,
422 "type": info.get("type"),
423 "path": info.get("installed_path"),
424 "checksum": info.get("checksum"),
425 "metadata": info.get("metadata", {}),
426 }
427 )
429 return installed
431 except Exception as e:
432 msg = f"Erro ao listar agentes instalados: {e}"
433 self.installation_log.append(msg)
434 return []
436 def get_installation_report(self) -> Dict[str, Any]:
437 """
438 Gera relatório detalhado da instalação.
440 Returns:
441 Relatório de instalação
442 """
443 return {
444 "discovered_agents_count": len(self.discovered_agents),
445 "agents_by_type": {
446 agent_type.value: len(
447 [a for a in self.discovered_agents if a.type == agent_type]
448 )
449 for agent_type in AgentType
450 },
451 "installation_log": self.installation_log,
452 "installed_agents": self.list_installed_agents(),
453 "target_directories": {
454 agent_type.value: str(self.agents_path / subdir)
455 for agent_type, subdir in self.agent_directories.items()
456 },
457 }