Coverage for src/jtech_installer/rollback/manager.py: 88%
331 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-20 15:10 -0300
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-20 15:10 -0300
1"""Sistema de rollback para JTECH™ Core Installer."""
3import json
4import shutil
5import time
6from dataclasses import asdict, dataclass
7from datetime import datetime
8from enum import Enum
9from pathlib import Path
10from typing import Any, Dict, List, Optional
12from ..core.exceptions import JTechInstallerException
13from ..core.models import InstallationConfig
16class RollbackType(Enum):
17 """Tipos de rollback disponíveis."""
19 AUTOMATIC = "automatic"
20 MANUAL = "manual"
21 EMERGENCY = "emergency"
24class BackupType(Enum):
25 """Tipos de backup."""
27 FULL = "full"
28 INCREMENTAL = "incremental"
29 CONFIG_ONLY = "config_only"
32@dataclass
33class BackupEntry:
34 """Entrada de backup individual."""
36 source_path: str
37 backup_path: str
38 file_type: str
39 timestamp: str
40 checksum: Optional[str] = None
43@dataclass
44class RollbackPoint:
45 """Ponto de rollback com informações completas."""
47 id: str
48 timestamp: str
49 config: Dict[str, Any]
50 backup_type: BackupType
51 backup_entries: List[BackupEntry]
52 installation_state: Dict[str, Any]
53 metadata: Dict[str, Any]
56@dataclass
57class RollbackResult:
58 """Resultado de operação de rollback."""
60 success: bool
61 rollback_point_id: str
62 restored_files: List[str]
63 failed_files: List[str]
64 errors: List[str]
65 warnings: List[str]
66 duration: float
69class RollbackManager:
70 """Gerenciador de rollback para instalações JTECH™ Core."""
72 def __init__(
73 self, config: InstallationConfig, backup_dir: Optional[Path] = None
74 ):
75 """
76 Inicializa o gerenciador de rollback.
78 Args:
79 config: Configuração de instalação
80 backup_dir: Diretório para backups (opcional)
81 """
82 self.config = config
83 self.project_path = config.project_path
84 self.backup_dir = backup_dir or (
85 self.project_path / ".jtech-core" / "backups"
86 )
87 self.rollback_log_file = self.backup_dir / "rollback.log"
88 self.rollback_points_file = self.backup_dir / "rollback_points.json"
90 # Garantir que diretório de backup existe
91 self.backup_dir.mkdir(parents=True, exist_ok=True)
93 def create_rollback_point(
94 self,
95 backup_type: BackupType = BackupType.FULL,
96 description: Optional[str] = None,
97 ) -> str:
98 """
99 Cria um ponto de rollback.
101 Args:
102 backup_type: Tipo de backup a ser criado
103 description: Descrição opcional do ponto de rollback
105 Returns:
106 ID do ponto de rollback criado
107 """
108 rollback_id = self._generate_rollback_id()
109 timestamp = datetime.now().isoformat()
111 self._log_operation(
112 f"Criando ponto de rollback {rollback_id} ({backup_type.value})"
113 )
115 try:
116 # Criar backup baseado no tipo
117 backup_entries = self._create_backup(rollback_id, backup_type)
119 # Capturar estado atual da instalação
120 installation_state = self._capture_installation_state()
122 # Criar ponto de rollback
123 rollback_point = RollbackPoint(
124 id=rollback_id,
125 timestamp=timestamp,
126 config=self._serialize_config(self.config),
127 backup_type=backup_type,
128 backup_entries=backup_entries,
129 installation_state=installation_state,
130 metadata={
131 "description": description
132 or f"Rollback point {rollback_id}",
133 "created_by": "JTECH™ Installer",
134 "backup_size": self._calculate_backup_size(backup_entries),
135 "file_count": len(backup_entries),
136 },
137 )
139 # Salvar ponto de rollback
140 self._save_rollback_point(rollback_point)
142 self._log_operation(
143 f"Ponto de rollback {rollback_id} criado com sucesso "
144 f"({len(backup_entries)} arquivos)"
145 )
147 return rollback_id
149 except Exception as e:
150 self._log_operation(
151 f"Erro ao criar ponto de rollback {rollback_id}: {e}",
152 level="ERROR",
153 )
154 raise JTechInstallerException(
155 f"Falha ao criar ponto de rollback: {e}"
156 )
158 def rollback_to_point(
159 self,
160 rollback_id: str,
161 rollback_type: RollbackType = RollbackType.MANUAL,
162 ) -> RollbackResult:
163 """
164 Executa rollback para um ponto específico.
166 Args:
167 rollback_id: ID do ponto de rollback
168 rollback_type: Tipo de rollback
170 Returns:
171 Resultado do rollback
172 """
173 start_time = time.time()
174 restored_files = []
175 failed_files = []
176 errors = []
177 warnings = []
179 self._log_operation(
180 f"Iniciando rollback para ponto {rollback_id} ({rollback_type.value})"
181 )
183 try:
184 # Carregar ponto de rollback
185 rollback_point = self._load_rollback_point(rollback_id)
186 if not rollback_point:
187 raise JTechInstallerException(
188 f"Ponto de rollback {rollback_id} não encontrado"
189 )
191 # Criar ponto de segurança antes do rollback
192 if rollback_type != RollbackType.EMERGENCY:
193 safety_point = self.create_rollback_point(
194 BackupType.CONFIG_ONLY,
195 f"Safety point before rollback to {rollback_id}",
196 )
197 self._log_operation(
198 f"Ponto de segurança criado: {safety_point}"
199 )
201 # Executar rollback
202 for entry in rollback_point.backup_entries:
203 try:
204 self._restore_file(entry)
205 restored_files.append(entry.source_path)
206 except Exception as e:
207 failed_files.append(entry.source_path)
208 errors.append(
209 f"Falha ao restaurar {entry.source_path}: {e}"
210 )
212 # Restaurar estado da instalação
213 try:
214 self._restore_installation_state(
215 rollback_point.installation_state
216 )
217 except Exception as e:
218 warnings.append(
219 f"Falha ao restaurar estado completo da instalação: {e}"
220 )
222 # Verificar integridade pós-rollback
223 integrity_issues = self._verify_rollback_integrity(rollback_point)
224 if integrity_issues:
225 warnings.extend(integrity_issues)
227 duration = time.time() - start_time
228 success = len(failed_files) == 0
230 result = RollbackResult(
231 success=success,
232 rollback_point_id=rollback_id,
233 restored_files=restored_files,
234 failed_files=failed_files,
235 errors=errors,
236 warnings=warnings,
237 duration=duration,
238 )
240 status = "sucesso" if success else "parcial"
241 self._log_operation(
242 f"Rollback para {rollback_id} concluído com {status} "
243 f"({len(restored_files)} restaurados, {len(failed_files)} falharam)"
244 )
246 return result
248 except Exception as e:
249 duration = time.time() - start_time
250 error_msg = f"Erro durante rollback: {e}"
251 self._log_operation(error_msg, level="ERROR")
253 return RollbackResult(
254 success=False,
255 rollback_point_id=rollback_id,
256 restored_files=restored_files,
257 failed_files=failed_files,
258 errors=[error_msg],
259 warnings=warnings,
260 duration=duration,
261 )
263 def list_rollback_points(self) -> List[RollbackPoint]:
264 """
265 Lista todos os pontos de rollback disponíveis.
267 Returns:
268 Lista de pontos de rollback
269 """
270 try:
271 if not self.rollback_points_file.exists():
272 return []
274 with open(self.rollback_points_file, "r", encoding="utf-8") as f:
275 data = json.load(f)
277 rollback_points = []
278 for point_data in data.get("rollback_points", []):
279 # Converter backup_entries
280 backup_entries = [
281 BackupEntry(**entry)
282 for entry in point_data["backup_entries"]
283 ]
285 point = RollbackPoint(
286 id=point_data["id"],
287 timestamp=point_data["timestamp"],
288 config=point_data["config"],
289 backup_type=BackupType(point_data["backup_type"]),
290 backup_entries=backup_entries,
291 installation_state=point_data["installation_state"],
292 metadata=point_data["metadata"],
293 )
294 rollback_points.append(point)
296 # Ordenar por timestamp (mais recente primeiro)
297 rollback_points.sort(key=lambda x: x.timestamp, reverse=True)
299 return rollback_points
301 except Exception as e:
302 self._log_operation(
303 f"Erro ao listar pontos de rollback: {e}", level="ERROR"
304 )
305 return []
307 def delete_rollback_point(self, rollback_id: str) -> bool:
308 """
309 Remove um ponto de rollback.
311 Args:
312 rollback_id: ID do ponto de rollback
314 Returns:
315 True se removido com sucesso
316 """
317 try:
318 rollback_points = self.list_rollback_points()
319 point_to_delete = None
321 for point in rollback_points:
322 if point.id == rollback_id:
323 point_to_delete = point
324 break
326 if not point_to_delete:
327 return False
329 # Remover arquivos de backup
330 for entry in point_to_delete.backup_entries:
331 backup_path = Path(entry.backup_path)
332 if backup_path.exists():
333 backup_path.unlink()
335 # Remover ponto da lista
336 rollback_points = [
337 p for p in rollback_points if p.id != rollback_id
338 ]
340 # Salvar lista atualizada
341 self._save_rollback_points(rollback_points)
343 self._log_operation(f"Ponto de rollback {rollback_id} removido")
344 return True
346 except Exception as e:
347 self._log_operation(
348 f"Erro ao remover ponto de rollback {rollback_id}: {e}",
349 level="ERROR",
350 )
351 return False
353 def cleanup_old_rollback_points(self, keep_count: int = 5) -> int:
354 """
355 Remove pontos de rollback antigos, mantendo apenas os mais recentes.
357 Args:
358 keep_count: Número de pontos para manter
360 Returns:
361 Número de pontos removidos
362 """
363 rollback_points = self.list_rollback_points()
365 if len(rollback_points) <= keep_count:
366 return 0
368 points_to_remove = rollback_points[keep_count:]
369 removed_count = 0
371 for point in points_to_remove:
372 if self.delete_rollback_point(point.id):
373 removed_count += 1
375 self._log_operation(
376 f"Limpeza concluída: {removed_count} pontos removidos"
377 )
378 return removed_count
380 def get_rollback_statistics(self) -> Dict[str, Any]:
381 """
382 Obtém estatísticas dos pontos de rollback.
384 Returns:
385 Dicionário com estatísticas
386 """
387 rollback_points = self.list_rollback_points()
389 if not rollback_points:
390 return {
391 "total_points": 0,
392 "total_size": 0,
393 "oldest_point": None,
394 "newest_point": None,
395 "backup_types": {},
396 }
398 total_size = sum(
399 point.metadata.get("backup_size", 0) for point in rollback_points
400 )
402 backup_types = {}
403 for point in rollback_points:
404 backup_type = point.backup_type.value
405 backup_types[backup_type] = backup_types.get(backup_type, 0) + 1
407 return {
408 "total_points": len(rollback_points),
409 "total_size": total_size,
410 "oldest_point": (
411 rollback_points[-1].timestamp if rollback_points else None
412 ),
413 "newest_point": (
414 rollback_points[0].timestamp if rollback_points else None
415 ),
416 "backup_types": backup_types,
417 }
419 # Métodos privados
420 def _generate_rollback_id(self) -> str:
421 """Gera ID único para ponto de rollback."""
422 import time
424 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
425 microseconds = int(time.time() * 1000000) % 1000000
426 return f"rollback_{timestamp}_{microseconds:06d}"
428 def _create_backup(
429 self, rollback_id: str, backup_type: BackupType
430 ) -> List[BackupEntry]:
431 """Cria backup baseado no tipo."""
432 backup_entries = []
433 backup_root = self.backup_dir / rollback_id
434 backup_root.mkdir(exist_ok=True)
436 if backup_type == BackupType.FULL:
437 # Backup completo do projeto
438 files_to_backup = self._get_project_files()
439 elif backup_type == BackupType.CONFIG_ONLY:
440 # Apenas arquivos de configuração
441 files_to_backup = self._get_config_files()
442 else: # INCREMENTAL
443 # Arquivos modificados desde último backup
444 files_to_backup = self._get_modified_files()
446 for source_file in files_to_backup:
447 try:
448 backup_entry = self._backup_file(source_file, backup_root)
449 backup_entries.append(backup_entry)
450 except Exception as e:
451 self._log_operation(
452 f"Erro ao fazer backup de {source_file}: {e}",
453 level="WARNING",
454 )
456 return backup_entries
458 def _backup_file(
459 self, source_path: Path, backup_root: Path
460 ) -> BackupEntry:
461 """Faz backup de um arquivo individual."""
462 relative_path = source_path.relative_to(self.project_path)
463 backup_path = backup_root / relative_path
465 # Criar diretórios necessários
466 backup_path.parent.mkdir(parents=True, exist_ok=True)
468 # Copiar arquivo
469 if source_path.is_file():
470 shutil.copy2(source_path, backup_path)
471 file_type = "file"
472 elif source_path.is_dir():
473 shutil.copytree(source_path, backup_path, dirs_exist_ok=True)
474 file_type = "directory"
475 else:
476 raise JTechInstallerException(
477 f"Tipo de arquivo não suportado: {source_path}"
478 )
480 # Calcular checksum para arquivos
481 checksum = None
482 if source_path.is_file():
483 checksum = self._calculate_checksum(source_path)
485 return BackupEntry(
486 source_path=str(relative_path),
487 backup_path=str(backup_path),
488 file_type=file_type,
489 timestamp=datetime.now().isoformat(),
490 checksum=checksum,
491 )
493 def _restore_file(self, entry: BackupEntry) -> None:
494 """Restaura um arquivo do backup."""
495 source_path = self.project_path / entry.source_path
496 backup_path = Path(entry.backup_path)
498 if not backup_path.exists():
499 raise JTechInstallerException(
500 f"Arquivo de backup não encontrado: {backup_path}"
501 )
503 # Criar diretórios necessários
504 source_path.parent.mkdir(parents=True, exist_ok=True)
506 # Restaurar arquivo
507 if entry.file_type == "file":
508 shutil.copy2(backup_path, source_path)
509 elif entry.file_type == "directory":
510 if source_path.exists():
511 shutil.rmtree(source_path)
512 shutil.copytree(backup_path, source_path)
514 # Verificar checksum se disponível
515 if entry.checksum and entry.file_type == "file":
516 restored_checksum = self._calculate_checksum(source_path)
517 if restored_checksum != entry.checksum:
518 self._log_operation(
519 f"Aviso: Checksum não confere para {source_path}",
520 level="WARNING",
521 )
523 def _get_project_files(self) -> List[Path]:
524 """Obtém lista de arquivos do projeto para backup."""
525 files = []
526 ignore_patterns = [
527 "*/node_modules/*",
528 "*/.git/*",
529 "*/__pycache__/*",
530 "*/venv/*",
531 "*/.env/*",
532 "*/dist/*",
533 "*/build/*",
534 "*/.jtech-core/backups/*", # Não fazer backup dos próprios backups
535 ]
537 for file_path in self.project_path.rglob("*"):
538 if file_path.is_file():
539 # Verificar se deve ser ignorado
540 should_ignore = any(
541 file_path.match(pattern) for pattern in ignore_patterns
542 )
543 if not should_ignore:
544 files.append(file_path)
546 return files
548 def _get_config_files(self) -> List[Path]:
549 """Obtém lista de arquivos de configuração."""
550 config_patterns = [
551 "*.json",
552 "*.yml",
553 "*.yaml",
554 "*.toml",
555 "*.ini",
556 "*.cfg",
557 ".gitignore",
558 ".env*",
559 "requirements.txt",
560 "package.json",
561 "Dockerfile",
562 "docker-compose.yml",
563 "Makefile",
564 ]
566 files = []
567 # Busca apenas na raiz do projeto e primeiros 2 níveis para evitar travamento
568 for pattern in config_patterns:
569 files.extend(self.project_path.glob(pattern))
570 # Limitar busca recursiva para evitar travamento em CI/CD
571 try:
572 # Apenas 2 níveis de profundidade para performance
573 for level1 in self.project_path.glob("*/"):
574 if level1.is_dir() and not level1.name.startswith("."):
575 files.extend(level1.glob(pattern))
576 for level2 in level1.glob("*/"):
577 if level2.is_dir():
578 files.extend(level2.glob(pattern))
579 except (OSError, PermissionError):
580 # Ignorar erros de permissão/acesso
581 pass
583 # Adicionar diretórios de configuração
584 config_dirs = [".jtech-core", ".vscode", ".github"]
585 for dir_name in config_dirs:
586 config_dir = self.project_path / dir_name
587 if config_dir.exists():
588 files.append(config_dir)
590 return list(set(files)) # Remover duplicatas
592 def _get_modified_files(self) -> List[Path]:
593 """Obtém arquivos modificados desde último backup."""
594 # Por simplicidade, retorna arquivos de configuração
595 # Em implementação completa, usaria timestamps de modificação
596 return self._get_config_files()
598 def _capture_installation_state(self) -> Dict[str, Any]:
599 """Captura estado atual da instalação."""
600 state = {
601 "jtech_core_exists": (self.project_path / ".jtech-core").exists(),
602 "vscode_config_exists": (self.project_path / ".vscode").exists(),
603 "core_config_exists": (
604 self.project_path / ".jtech-core" / "core-config.yml"
605 ).exists(),
606 }
608 # Capturar informações de configuração se existir
609 core_config_file = (
610 self.project_path / ".jtech-core" / "core-config.yml"
611 )
612 if core_config_file.exists():
613 try:
614 state["core_config_content"] = core_config_file.read_text(
615 encoding="utf-8"
616 )
617 except Exception:
618 pass
620 return state
622 def _restore_installation_state(self, state: Dict[str, Any]) -> None:
623 """Restaura estado da instalação."""
624 # Restaurar core-config.yml se tiver no estado
625 if "core_config_content" in state:
626 core_config_file = (
627 self.project_path / ".jtech-core" / "core-config.yml"
628 )
629 core_config_file.parent.mkdir(parents=True, exist_ok=True)
630 core_config_file.write_text(
631 state["core_config_content"], encoding="utf-8"
632 )
634 def _verify_rollback_integrity(
635 self, rollback_point: RollbackPoint
636 ) -> List[str]:
637 """Verifica integridade após rollback."""
638 issues = []
640 for entry in rollback_point.backup_entries:
641 source_path = self.project_path / entry.source_path
643 if not source_path.exists():
644 issues.append(
645 f"Arquivo não foi restaurado: {entry.source_path}"
646 )
647 continue
649 # Verificar checksum se disponível
650 if entry.checksum and entry.file_type == "file":
651 current_checksum = self._calculate_checksum(source_path)
652 if current_checksum != entry.checksum:
653 issues.append(f"Checksum não confere: {entry.source_path}")
655 return issues
657 def _calculate_checksum(self, file_path: Path) -> str:
658 """Calcula checksum MD5 de um arquivo."""
659 import hashlib
661 hash_md5 = hashlib.md5()
662 with open(file_path, "rb") as f:
663 for chunk in iter(lambda: f.read(4096), b""):
664 hash_md5.update(chunk)
665 return hash_md5.hexdigest()
667 def _calculate_backup_size(self, backup_entries: List[BackupEntry]) -> int:
668 """Calcula tamanho total do backup."""
669 total_size = 0
670 for entry in backup_entries:
671 backup_path = Path(entry.backup_path)
672 if backup_path.exists():
673 if backup_path.is_file():
674 total_size += backup_path.stat().st_size
675 elif backup_path.is_dir():
676 for file_path in backup_path.rglob("*"):
677 if file_path.is_file():
678 total_size += file_path.stat().st_size
679 return total_size
681 def _save_rollback_point(self, rollback_point: RollbackPoint) -> None:
682 """Salva ponto de rollback no arquivo."""
683 rollback_points = self.list_rollback_points()
684 rollback_points.append(rollback_point)
685 self._save_rollback_points(rollback_points)
687 def _save_rollback_points(
688 self, rollback_points: List[RollbackPoint]
689 ) -> None:
690 """Salva lista de pontos de rollback."""
691 data = {
692 "version": "1.0",
693 "rollback_points": [
694 {
695 "id": point.id,
696 "timestamp": point.timestamp,
697 "config": point.config,
698 "backup_type": point.backup_type.value,
699 "backup_entries": [
700 asdict(entry) for entry in point.backup_entries
701 ],
702 "installation_state": point.installation_state,
703 "metadata": point.metadata,
704 }
705 for point in rollback_points
706 ],
707 }
709 # Converter recursivamente todos os objetos Path para string
710 data = self._deep_convert_paths(data)
712 with open(self.rollback_points_file, "w", encoding="utf-8") as f:
713 json.dump(data, f, indent=2, ensure_ascii=False)
715 def _deep_convert_paths(self, obj):
716 """Converte recursivamente objetos Path para string."""
717 if isinstance(obj, Path):
718 return str(obj)
719 elif isinstance(obj, dict):
720 return {
721 key: self._deep_convert_paths(value)
722 for key, value in obj.items()
723 }
724 elif isinstance(obj, list):
725 return [self._deep_convert_paths(item) for item in obj]
726 elif hasattr(obj, "value"): # Enum
727 return obj.value
728 else:
729 return obj
731 def _load_rollback_point(
732 self, rollback_id: str
733 ) -> Optional[RollbackPoint]:
734 """Carrega ponto de rollback específico."""
735 rollback_points = self.list_rollback_points()
736 for point in rollback_points:
737 if point.id == rollback_id:
738 return point
739 return None
741 def _log_operation(self, message: str, level: str = "INFO") -> None:
742 """Registra operação no log."""
743 timestamp = datetime.now().isoformat()
744 log_entry = f"[{timestamp}] {level}: {message}\n"
746 with open(self.rollback_log_file, "a", encoding="utf-8") as f:
747 f.write(log_entry)
749 def _serialize_config(self, config: InstallationConfig) -> Dict[str, Any]:
750 """Serializa configuração para JSON."""
751 config_dict = asdict(config)
753 # Converter Path para string
754 if "project_path" in config_dict:
755 config_dict["project_path"] = str(config_dict["project_path"])
756 if (
757 "framework_source_path" in config_dict
758 and config_dict["framework_source_path"]
759 ):
760 config_dict["framework_source_path"] = str(
761 config_dict["framework_source_path"]
762 )
764 # Converter enum para string
765 if "install_type" in config_dict:
766 config_dict["install_type"] = config_dict["install_type"].value
767 if "team_type" in config_dict:
768 config_dict["team_type"] = config_dict["team_type"].value
770 return config_dict