Coverage for src/dataknobs_data/migration_old_backup/migrator.py: 0%

166 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-15 12:32 -0500

1"""Backend-to-backend data migration utilities.""" 

2 

3import asyncio 

4import logging 

5from dataclasses import dataclass, field 

6from datetime import datetime 

7from typing import Any, Callable, Dict, List, Optional, Union 

8from concurrent.futures import ThreadPoolExecutor 

9 

10from dataknobs_data.database import Database, SyncDatabase 

11from dataknobs_data.query import Query 

12from dataknobs_data.records import Record 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17@dataclass 

18class MigrationProgress: 

19 """Track migration progress.""" 

20 total_records: int = 0 

21 processed_records: int = 0 

22 successful_records: int = 0 

23 failed_records: int = 0 

24 start_time: datetime = field(default_factory=datetime.now) 

25 end_time: Optional[datetime] = None 

26 errors: List[Dict[str, Any]] = field(default_factory=list) 

27 

28 @property 

29 def progress_percentage(self) -> float: 

30 """Calculate progress percentage.""" 

31 if self.total_records == 0: 

32 return 0.0 

33 return (self.processed_records / self.total_records) * 100 

34 

35 @property 

36 def duration(self) -> Optional[float]: 

37 """Calculate migration duration in seconds.""" 

38 if not self.end_time: 

39 return None 

40 return (self.end_time - self.start_time).total_seconds() 

41 

42 @property 

43 def records_per_second(self) -> float: 

44 """Calculate processing rate.""" 

45 duration = self.duration 

46 if duration and duration > 0: 

47 return self.processed_records / duration 

48 return 0.0 

49 

50 

51@dataclass 

52class MigrationResult: 

53 """Migration operation result.""" 

54 success: bool 

55 progress: MigrationProgress 

56 message: str = "" 

57 

58 

59class DataMigrator: 

60 """Migrate data between database backends.""" 

61 

62 def __init__( 

63 self, 

64 source: Union[Database, SyncDatabase], 

65 target: Union[Database, SyncDatabase], 

66 max_workers: int = 4 

67 ): 

68 """Initialize data migrator. 

69  

70 Args: 

71 source: Source database to migrate from 

72 target: Target database to migrate to 

73 max_workers: Maximum concurrent workers for batch operations 

74 """ 

75 self.source = source 

76 self.target = target 

77 self.max_workers = max_workers 

78 self._is_async = isinstance(source, Database) 

79 

80 async def migrate_async( 

81 self, 

82 query: Optional[Query] = None, 

83 batch_size: int = 1000, 

84 transform: Optional[Callable[[Record], Optional[Record]]] = None, 

85 progress_callback: Optional[Callable[[MigrationProgress], None]] = None, 

86 error_handler: Optional[Callable[[Exception, Record], bool]] = None, 

87 preserve_ids: bool = False 

88 ) -> MigrationResult: 

89 """Migrate data asynchronously. 

90  

91 Args: 

92 query: Optional query to filter source records 

93 batch_size: Number of records to process in each batch 

94 transform: Optional transformation function for records 

95 progress_callback: Optional callback for progress updates 

96 error_handler: Optional error handler (return True to continue, False to stop) 

97 preserve_ids: Whether to preserve record IDs from source 

98  

99 Returns: 

100 MigrationResult with success status and statistics 

101 """ 

102 if not isinstance(self.source, Database): 

103 raise TypeError("Source must be an async Database for async migration") 

104 if not isinstance(self.target, Database): 

105 raise TypeError("Target must be an async Database for async migration") 

106 

107 progress = MigrationProgress() 

108 progress.start_time = datetime.now() 

109 

110 try: 

111 # Count total records 

112 if query: 

113 all_records = await self.source.search(query) 

114 progress.total_records = len(all_records) 

115 else: 

116 all_records = await self.source.search(Query()) 

117 progress.total_records = len(all_records) 

118 

119 logger.info(f"Starting migration of {progress.total_records} records") 

120 

121 # Process in batches 

122 for i in range(0, progress.total_records, batch_size): 

123 batch = all_records[i:i + batch_size] 

124 batch_tasks = [] 

125 

126 for record in batch: 

127 try: 

128 # Apply transformation if provided 

129 if transform: 

130 transformed = transform(record) 

131 if transformed is None: 

132 progress.processed_records += 1 

133 continue # Skip this record 

134 record = transformed 

135 

136 # Create or upsert in target 

137 if preserve_ids and hasattr(record, '_id'): 

138 task = self.target.upsert(record._id, record) 

139 else: 

140 task = self.target.create(record) 

141 

142 batch_tasks.append(task) 

143 

144 except Exception as e: 

145 progress.failed_records += 1 

146 progress.errors.append({ 

147 'record': str(record), 

148 'error': str(e) 

149 }) 

150 

151 if error_handler and not error_handler(e, record): 

152 raise 

153 

154 # Execute batch operations 

155 if batch_tasks: 

156 results = await asyncio.gather(*batch_tasks, return_exceptions=True) 

157 

158 for result in results: 

159 progress.processed_records += 1 

160 if isinstance(result, Exception): 

161 progress.failed_records += 1 

162 progress.errors.append({'error': str(result)}) 

163 else: 

164 progress.successful_records += 1 

165 

166 # Report progress 

167 if progress_callback: 

168 progress_callback(progress) 

169 

170 logger.debug(f"Processed {progress.processed_records}/{progress.total_records} records") 

171 

172 progress.end_time = datetime.now() 

173 

174 success = progress.failed_records == 0 

175 message = f"Migration completed: {progress.successful_records} successful, {progress.failed_records} failed" 

176 logger.info(message) 

177 

178 return MigrationResult(success=success, progress=progress, message=message) 

179 

180 except Exception as e: 

181 progress.end_time = datetime.now() 

182 message = f"Migration failed: {str(e)}" 

183 logger.error(message) 

184 return MigrationResult(success=False, progress=progress, message=message) 

185 

186 def migrate_sync( 

187 self, 

188 query: Optional[Query] = None, 

189 batch_size: int = 1000, 

190 transform: Optional[Callable[[Record], Optional[Record]]] = None, 

191 progress_callback: Optional[Callable[[MigrationProgress], None]] = None, 

192 error_handler: Optional[Callable[[Exception, Record], bool]] = None, 

193 preserve_ids: bool = False 

194 ) -> MigrationResult: 

195 """Migrate data synchronously. 

196  

197 Args: 

198 query: Optional query to filter source records 

199 batch_size: Number of records to process in each batch 

200 transform: Optional transformation function for records 

201 progress_callback: Optional callback for progress updates 

202 error_handler: Optional error handler (return True to continue, False to stop) 

203 preserve_ids: Whether to preserve record IDs from source 

204  

205 Returns: 

206 MigrationResult with success status and statistics 

207 """ 

208 if not isinstance(self.source, SyncDatabase): 

209 raise TypeError("Source must be a SyncDatabase for sync migration") 

210 if not isinstance(self.target, SyncDatabase): 

211 raise TypeError("Target must be a SyncDatabase for sync migration") 

212 

213 progress = MigrationProgress() 

214 progress.start_time = datetime.now() 

215 

216 try: 

217 # Count total records 

218 if query: 

219 all_records = self.source.search(query) 

220 progress.total_records = len(all_records) 

221 else: 

222 all_records = self.source.search(Query()) 

223 progress.total_records = len(all_records) 

224 

225 logger.info(f"Starting migration of {progress.total_records} records") 

226 

227 # Process in batches with thread pool 

228 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: 

229 for i in range(0, progress.total_records, batch_size): 

230 batch = all_records[i:i + batch_size] 

231 futures = [] 

232 

233 for record in batch: 

234 try: 

235 # Apply transformation if provided 

236 if transform: 

237 transformed = transform(record) 

238 if transformed is None: 

239 progress.processed_records += 1 

240 continue # Skip this record 

241 record = transformed 

242 

243 # Submit to thread pool 

244 if preserve_ids and hasattr(record, '_id'): 

245 future = executor.submit(self.target.upsert, record._id, record) 

246 else: 

247 future = executor.submit(self.target.create, record) 

248 

249 futures.append(future) 

250 

251 except Exception as e: 

252 progress.failed_records += 1 

253 progress.errors.append({ 

254 'record': str(record), 

255 'error': str(e) 

256 }) 

257 

258 if error_handler and not error_handler(e, record): 

259 raise 

260 

261 # Wait for batch completion 

262 for future in futures: 

263 try: 

264 future.result() 

265 progress.processed_records += 1 

266 progress.successful_records += 1 

267 except Exception as e: 

268 progress.processed_records += 1 

269 progress.failed_records += 1 

270 progress.errors.append({'error': str(e)}) 

271 

272 # Report progress 

273 if progress_callback: 

274 progress_callback(progress) 

275 

276 logger.debug(f"Processed {progress.processed_records}/{progress.total_records} records") 

277 

278 progress.end_time = datetime.now() 

279 

280 success = progress.failed_records == 0 

281 message = f"Migration completed: {progress.successful_records} successful, {progress.failed_records} failed" 

282 logger.info(message) 

283 

284 return MigrationResult(success=success, progress=progress, message=message) 

285 

286 except Exception as e: 

287 progress.end_time = datetime.now() 

288 message = f"Migration failed: {str(e)}" 

289 logger.error(message) 

290 return MigrationResult(success=False, progress=progress, message=message) 

291 

292 def migrate(self, **kwargs) -> MigrationResult: 

293 """Migrate data (auto-detects sync/async). 

294  

295 Automatically uses the appropriate migration method based on 

296 the database types. 

297  

298 Args: 

299 **kwargs: Arguments passed to migrate_async or migrate_sync 

300  

301 Returns: 

302 MigrationResult with success status and statistics 

303 """ 

304 if self._is_async: 

305 # Run async migration in event loop 

306 loop = asyncio.get_event_loop() 

307 if loop.is_running(): 

308 # Already in async context 

309 task = self.migrate_async(**kwargs) 

310 return asyncio.create_task(task) 

311 else: 

312 # Run in new event loop 

313 return loop.run_until_complete(self.migrate_async(**kwargs)) 

314 else: 

315 return self.migrate_sync(**kwargs)