Coverage for src/jtech_installer/installer/agent_installer.py: 89%

194 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-20 15:10 -0300

1#!/usr/bin/env python3 

2""" 

3Instalador de Agentes Especializados JTECH™ Core 

4Copia e configura todos os agentes especializados do framework. 

5""" 

6 

7import hashlib 

8import shutil 

9from dataclasses import dataclass, field 

10from enum import Enum 

11from pathlib import Path 

12from typing import Any, Dict, List, Optional 

13 

14import yaml 

15 

16from ..core.models import InstallationConfig 

17 

18 

19class AgentType(Enum): 

20 """Tipos de agentes suportados.""" 

21 

22 CHATMODE = "chatmode" 

23 TEMPLATE = "template" 

24 WORKFLOW = "workflow" 

25 SPECIALIST = "specialist" 

26 

27 

28@dataclass 

29class AgentInfo: 

30 """Informações sobre um agente.""" 

31 

32 name: str 

33 type: AgentType 

34 source_path: Path 

35 target_path: Path 

36 description: str = "" 

37 metadata: Dict[str, Any] = field(default_factory=dict) 

38 checksum: str = "" 

39 installed: bool = False 

40 

41 

42class AgentInstaller: 

43 """ 

44 Gerencia instalação de agentes especializados JTECH™. 

45 

46 Responsável por: 

47 - Descobrir agentes disponíveis 

48 - Copiar arquivos mantendo estrutura 

49 - Verificar integridade pós-instalação 

50 - Gerenciar metadados de agentes 

51 """ 

52 

53 def __init__(self, config: InstallationConfig): 

54 """Inicializa o instalador de agentes.""" 

55 self.config = config 

56 self.project_path = Path(config.project_path) 

57 self.jtech_core_path = self.project_path / ".jtech-core" 

58 self.agents_path = self.jtech_core_path / "agents" 

59 self.agents_registry = self.jtech_core_path / "registry" / "agents.yml" 

60 

61 # Mapeamento de diretórios de agentes 

62 self.agent_directories = { 

63 AgentType.CHATMODE: "chatmodes", 

64 AgentType.TEMPLATE: "templates", 

65 AgentType.WORKFLOW: "workflows", 

66 AgentType.SPECIALIST: "specialists", 

67 } 

68 

69 self.discovered_agents: List[AgentInfo] = [] 

70 self.installation_log: List[str] = [] 

71 

72 def discover_agents( 

73 self, source_dir: Optional[Path] = None 

74 ) -> List[AgentInfo]: 

75 """ 

76 Descobre agentes disponíveis para instalação. 

77 

78 Args: 

79 source_dir: Diretório fonte dos agentes (opcional) 

80 

81 Returns: 

82 Lista de agentes descobertos 

83 """ 

84 if source_dir is None: 

85 # Procurar agentes no próprio projeto 

86 source_dir = self.project_path / "agents" 

87 

88 if not source_dir.exists(): 

89 msg = f"Diretório de agentes não encontrado: {source_dir}" 

90 self.installation_log.append(msg) 

91 return [] 

92 

93 agents = [] 

94 

95 # Buscar ChatModes (.chatmode.md) 

96 chatmode_pattern = "**/*.chatmode.md" 

97 for chatmode_file in source_dir.glob(chatmode_pattern): 

98 agent_info = self._parse_agent_file( 

99 chatmode_file, AgentType.CHATMODE 

100 ) 

101 if agent_info: 

102 agents.append(agent_info) 

103 

104 # Buscar Templates (.md) 

105 template_pattern = "**/templates/**/*.md" 

106 for template_file in source_dir.glob(template_pattern): 

107 agent_info = self._parse_agent_file( 

108 template_file, AgentType.TEMPLATE 

109 ) 

110 if agent_info: 

111 agents.append(agent_info) 

112 

113 # Buscar Workflows (.yml, .yaml) 

114 workflow_patterns = ["**/workflows/**/*.yml", "**/workflows/**/*.yaml"] 

115 for workflow_pattern in workflow_patterns: 

116 for workflow_file in source_dir.glob(workflow_pattern): 

117 agent_info = self._parse_agent_file( 

118 workflow_file, AgentType.WORKFLOW 

119 ) 

120 if agent_info: 

121 agents.append(agent_info) 

122 

123 # Buscar Especialistas (outros .md) 

124 specialist_pattern = "**/*.md" 

125 for specialist_file in source_dir.glob(specialist_pattern): 

126 # Excluir chatmodes e templates já processados 

127 if ( 

128 ".chatmode." not in specialist_file.name 

129 and "/templates/" not in str(specialist_file) 

130 ): 

131 agent_info = self._parse_agent_file( 

132 specialist_file, AgentType.SPECIALIST 

133 ) 

134 if agent_info: 

135 agents.append(agent_info) 

136 

137 self.discovered_agents = agents 

138 msg = f"Descobertos {len(agents)} agentes para instalação" 

139 self.installation_log.append(msg) 

140 return agents 

141 

142 def _parse_agent_file( 

143 self, file_path: Path, agent_type: AgentType 

144 ) -> Optional[AgentInfo]: 

145 """ 

146 Parseia arquivo de agente e extrai metadados. 

147 

148 Args: 

149 file_path: Caminho do arquivo 

150 agent_type: Tipo do agente 

151 

152 Returns: 

153 AgentInfo ou None se erro 

154 """ 

155 try: 

156 # Calcular checksum 

157 checksum = self._calculate_checksum(file_path) 

158 

159 # Determinar nome e caminho alvo 

160 name = file_path.stem 

161 if agent_type == AgentType.CHATMODE: 

162 name = name.replace(".chatmode", "") 

163 target_subdir = "chatmodes" 

164 else: 

165 target_subdir = self.agent_directories[agent_type] 

166 

167 target_path = self.agents_path / target_subdir / file_path.name 

168 

169 # Extrair metadados do arquivo 

170 metadata = self._extract_metadata(file_path) 

171 description = metadata.get( 

172 "description", f"{agent_type.value} - {name}" 

173 ) 

174 

175 return AgentInfo( 

176 name=name, 

177 type=agent_type, 

178 source_path=file_path, 

179 target_path=target_path, 

180 description=description, 

181 metadata=metadata, 

182 checksum=checksum, 

183 ) 

184 

185 except Exception as e: 

186 self.installation_log.append(f"Erro ao parsear {file_path}: {e}") 

187 return None 

188 

189 def _extract_metadata(self, file_path: Path) -> Dict[str, Any]: 

190 """ 

191 Extrai metadados do arquivo de agente. 

192 

193 Args: 

194 file_path: Caminho do arquivo 

195 

196 Returns: 

197 Dicionário com metadados 

198 """ 

199 metadata = {} 

200 

201 try: 

202 with open(file_path, "r", encoding="utf-8") as f: 

203 content = f.read() 

204 

205 # Buscar por frontmatter YAML 

206 if content.startswith("---"): 

207 end_pos = content.find("---", 3) 

208 if end_pos > 0: 

209 frontmatter = content[3:end_pos].strip() 

210 try: 

211 metadata = yaml.safe_load(frontmatter) or {} 

212 except yaml.YAMLError: 

213 pass 

214 

215 # Extrair informações básicas do conteúdo 

216 lines = content.split("\n") 

217 for line in lines[:10]: # Primeiras 10 linhas 

218 line = line.strip() 

219 if line.startswith("# "): 

220 metadata.setdefault("title", line[2:].strip()) 

221 elif "description:" in line.lower(): 

222 desc = line.split(":", 1)[1].strip() 

223 metadata.setdefault("description", desc) 

224 elif "version:" in line.lower(): 

225 ver = line.split(":", 1)[1].strip() 

226 metadata.setdefault("version", ver) 

227 

228 return metadata 

229 

230 except Exception as e: 

231 msg = f"Erro ao extrair metadados de {file_path}: {e}" 

232 self.installation_log.append(msg) 

233 return {} 

234 

235 def _calculate_checksum(self, file_path: Path) -> str: 

236 """ 

237 Calcula checksum SHA256 do arquivo. 

238 

239 Args: 

240 file_path: Caminho do arquivo 

241 

242 Returns: 

243 Checksum hexadecimal 

244 """ 

245 sha256_hash = hashlib.sha256() 

246 

247 try: 

248 with open(file_path, "rb") as f: 

249 for chunk in iter(lambda: f.read(4096), b""): 

250 sha256_hash.update(chunk) 

251 return sha256_hash.hexdigest() 

252 except Exception: 

253 return "" 

254 

255 def install_agents( 

256 self, 

257 agents: Optional[List[AgentInfo]] = None, 

258 verify_integrity: bool = True, 

259 ) -> Dict[str, Any]: 

260 """ 

261 Instala agentes especializados. 

262 

263 Args: 

264 agents: Lista específica de agentes (opcional) 

265 verify_integrity: Verificar integridade após instalação 

266 

267 Returns: 

268 Resultado da instalação 

269 """ 

270 if agents is None: 

271 agents = self.discovered_agents 

272 

273 if not agents: 

274 return { 

275 "success": False, 

276 "error": "Nenhum agente descoberto para instalação", 

277 "installed_count": 0, 

278 "failed_count": 0, 

279 } 

280 

281 # Criar diretórios necessários 

282 self._ensure_agent_directories() 

283 

284 installed_agents = [] 

285 failed_agents = [] 

286 

287 for agent in agents: 

288 try: 

289 # Copiar arquivo 

290 target_dir = agent.target_path.parent 

291 target_dir.mkdir(parents=True, exist_ok=True) 

292 

293 shutil.copy2(agent.source_path, agent.target_path) 

294 

295 # Verificar integridade se solicitado 

296 if verify_integrity: 

297 if not self._verify_agent_integrity(agent): 

298 failed_agents.append(agent) 

299 continue 

300 

301 agent.installed = True 

302 installed_agents.append(agent) 

303 msg = f"Instalado: {agent.name} ({agent.type.value})" 

304 self.installation_log.append(msg) 

305 

306 except Exception as e: 

307 failed_agents.append(agent) 

308 msg = f"Falha na instalação de {agent.name}: {e}" 

309 self.installation_log.append(msg) 

310 

311 # Atualizar registro de agentes 

312 if installed_agents: 

313 self._update_agents_registry(installed_agents) 

314 

315 return { 

316 "success": len(failed_agents) == 0, 

317 "installed_count": len(installed_agents), 

318 "failed_count": len(failed_agents), 

319 "installed_agents": [a.name for a in installed_agents], 

320 "failed_agents": [a.name for a in failed_agents], 

321 "log": self.installation_log.copy(), 

322 } 

323 

324 def _ensure_agent_directories(self): 

325 """Garante que diretórios de agentes existem.""" 

326 for agent_type, subdir in self.agent_directories.items(): 

327 dir_path = self.agents_path / subdir 

328 dir_path.mkdir(parents=True, exist_ok=True) 

329 

330 def _verify_agent_integrity(self, agent: AgentInfo) -> bool: 

331 """ 

332 Verifica integridade do agente instalado. 

333 

334 Args: 

335 agent: Informações do agente 

336 

337 Returns: 

338 True se íntegro 

339 """ 

340 if not agent.target_path.exists(): 

341 return False 

342 

343 # Verificar checksum 

344 installed_checksum = self._calculate_checksum(agent.target_path) 

345 return installed_checksum == agent.checksum 

346 

347 def _update_agents_registry(self, installed_agents: List[AgentInfo]): 

348 """ 

349 Atualiza registro de agentes instalados. 

350 

351 Args: 

352 installed_agents: Lista de agentes instalados 

353 """ 

354 try: 

355 # Criar diretório do registro 

356 registry_dir = self.agents_registry.parent 

357 registry_dir.mkdir(parents=True, exist_ok=True) 

358 

359 # Ler registro existente ou criar novo 

360 registry_data = {} 

361 if self.agents_registry.exists(): 

362 with open(self.agents_registry, "r", encoding="utf-8") as f: 

363 registry_data = yaml.safe_load(f) or {} 

364 

365 # Adicionar agentes instalados 

366 if "agents" not in registry_data: 

367 registry_data["agents"] = {} 

368 

369 for agent in installed_agents: 

370 relative_path = agent.target_path.relative_to( 

371 self.project_path 

372 ) 

373 registry_data["agents"][agent.name] = { 

374 "type": agent.type.value, 

375 "installed_path": str(relative_path), 

376 "checksum": agent.checksum, 

377 "metadata": agent.metadata, 

378 # Será preenchido por outro sistema se necessário 

379 "installed_at": None, 

380 } 

381 

382 # Salvar registro atualizado 

383 with open(self.agents_registry, "w", encoding="utf-8") as f: 

384 yaml.dump( 

385 registry_data, 

386 f, 

387 default_flow_style=False, 

388 allow_unicode=True, 

389 ) 

390 

391 msg = ( 

392 f"Registro de agentes atualizado: {len(installed_agents)} " 

393 "entradas" 

394 ) 

395 self.installation_log.append(msg) 

396 

397 except Exception as e: 

398 msg = f"Erro ao atualizar registro de agentes: {e}" 

399 self.installation_log.append(msg) 

400 

401 def list_installed_agents(self) -> List[Dict[str, Any]]: 

402 """ 

403 Lista agentes instalados. 

404 

405 Returns: 

406 Lista de agentes instalados 

407 """ 

408 installed = [] 

409 

410 if not self.agents_registry.exists(): 

411 return installed 

412 

413 try: 

414 with open(self.agents_registry, "r", encoding="utf-8") as f: 

415 registry_data = yaml.safe_load(f) or {} 

416 

417 agents_data = registry_data.get("agents", {}) 

418 for name, info in agents_data.items(): 

419 installed.append( 

420 { 

421 "name": name, 

422 "type": info.get("type"), 

423 "path": info.get("installed_path"), 

424 "checksum": info.get("checksum"), 

425 "metadata": info.get("metadata", {}), 

426 } 

427 ) 

428 

429 return installed 

430 

431 except Exception as e: 

432 msg = f"Erro ao listar agentes instalados: {e}" 

433 self.installation_log.append(msg) 

434 return [] 

435 

436 def get_installation_report(self) -> Dict[str, Any]: 

437 """ 

438 Gera relatório detalhado da instalação. 

439 

440 Returns: 

441 Relatório de instalação 

442 """ 

443 return { 

444 "discovered_agents_count": len(self.discovered_agents), 

445 "agents_by_type": { 

446 agent_type.value: len( 

447 [a for a in self.discovered_agents if a.type == agent_type] 

448 ) 

449 for agent_type in AgentType 

450 }, 

451 "installation_log": self.installation_log, 

452 "installed_agents": self.list_installed_agents(), 

453 "target_directories": { 

454 agent_type.value: str(self.agents_path / subdir) 

455 for agent_type, subdir in self.agent_directories.items() 

456 }, 

457 }