Coverage for src/dataknobs_data/migration_old_backup/migrator.py: 0%
166 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:32 -0500
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-15 12:32 -0500
1"""Backend-to-backend data migration utilities."""
3import asyncio
4import logging
5from dataclasses import dataclass, field
6from datetime import datetime
7from typing import Any, Callable, Dict, List, Optional, Union
8from concurrent.futures import ThreadPoolExecutor
10from dataknobs_data.database import Database, SyncDatabase
11from dataknobs_data.query import Query
12from dataknobs_data.records import Record
14logger = logging.getLogger(__name__)
17@dataclass
18class MigrationProgress:
19 """Track migration progress."""
20 total_records: int = 0
21 processed_records: int = 0
22 successful_records: int = 0
23 failed_records: int = 0
24 start_time: datetime = field(default_factory=datetime.now)
25 end_time: Optional[datetime] = None
26 errors: List[Dict[str, Any]] = field(default_factory=list)
28 @property
29 def progress_percentage(self) -> float:
30 """Calculate progress percentage."""
31 if self.total_records == 0:
32 return 0.0
33 return (self.processed_records / self.total_records) * 100
35 @property
36 def duration(self) -> Optional[float]:
37 """Calculate migration duration in seconds."""
38 if not self.end_time:
39 return None
40 return (self.end_time - self.start_time).total_seconds()
42 @property
43 def records_per_second(self) -> float:
44 """Calculate processing rate."""
45 duration = self.duration
46 if duration and duration > 0:
47 return self.processed_records / duration
48 return 0.0
51@dataclass
52class MigrationResult:
53 """Migration operation result."""
54 success: bool
55 progress: MigrationProgress
56 message: str = ""
59class DataMigrator:
60 """Migrate data between database backends."""
62 def __init__(
63 self,
64 source: Union[Database, SyncDatabase],
65 target: Union[Database, SyncDatabase],
66 max_workers: int = 4
67 ):
68 """Initialize data migrator.
70 Args:
71 source: Source database to migrate from
72 target: Target database to migrate to
73 max_workers: Maximum concurrent workers for batch operations
74 """
75 self.source = source
76 self.target = target
77 self.max_workers = max_workers
78 self._is_async = isinstance(source, Database)
80 async def migrate_async(
81 self,
82 query: Optional[Query] = None,
83 batch_size: int = 1000,
84 transform: Optional[Callable[[Record], Optional[Record]]] = None,
85 progress_callback: Optional[Callable[[MigrationProgress], None]] = None,
86 error_handler: Optional[Callable[[Exception, Record], bool]] = None,
87 preserve_ids: bool = False
88 ) -> MigrationResult:
89 """Migrate data asynchronously.
91 Args:
92 query: Optional query to filter source records
93 batch_size: Number of records to process in each batch
94 transform: Optional transformation function for records
95 progress_callback: Optional callback for progress updates
96 error_handler: Optional error handler (return True to continue, False to stop)
97 preserve_ids: Whether to preserve record IDs from source
99 Returns:
100 MigrationResult with success status and statistics
101 """
102 if not isinstance(self.source, Database):
103 raise TypeError("Source must be an async Database for async migration")
104 if not isinstance(self.target, Database):
105 raise TypeError("Target must be an async Database for async migration")
107 progress = MigrationProgress()
108 progress.start_time = datetime.now()
110 try:
111 # Count total records
112 if query:
113 all_records = await self.source.search(query)
114 progress.total_records = len(all_records)
115 else:
116 all_records = await self.source.search(Query())
117 progress.total_records = len(all_records)
119 logger.info(f"Starting migration of {progress.total_records} records")
121 # Process in batches
122 for i in range(0, progress.total_records, batch_size):
123 batch = all_records[i:i + batch_size]
124 batch_tasks = []
126 for record in batch:
127 try:
128 # Apply transformation if provided
129 if transform:
130 transformed = transform(record)
131 if transformed is None:
132 progress.processed_records += 1
133 continue # Skip this record
134 record = transformed
136 # Create or upsert in target
137 if preserve_ids and hasattr(record, '_id'):
138 task = self.target.upsert(record._id, record)
139 else:
140 task = self.target.create(record)
142 batch_tasks.append(task)
144 except Exception as e:
145 progress.failed_records += 1
146 progress.errors.append({
147 'record': str(record),
148 'error': str(e)
149 })
151 if error_handler and not error_handler(e, record):
152 raise
154 # Execute batch operations
155 if batch_tasks:
156 results = await asyncio.gather(*batch_tasks, return_exceptions=True)
158 for result in results:
159 progress.processed_records += 1
160 if isinstance(result, Exception):
161 progress.failed_records += 1
162 progress.errors.append({'error': str(result)})
163 else:
164 progress.successful_records += 1
166 # Report progress
167 if progress_callback:
168 progress_callback(progress)
170 logger.debug(f"Processed {progress.processed_records}/{progress.total_records} records")
172 progress.end_time = datetime.now()
174 success = progress.failed_records == 0
175 message = f"Migration completed: {progress.successful_records} successful, {progress.failed_records} failed"
176 logger.info(message)
178 return MigrationResult(success=success, progress=progress, message=message)
180 except Exception as e:
181 progress.end_time = datetime.now()
182 message = f"Migration failed: {str(e)}"
183 logger.error(message)
184 return MigrationResult(success=False, progress=progress, message=message)
186 def migrate_sync(
187 self,
188 query: Optional[Query] = None,
189 batch_size: int = 1000,
190 transform: Optional[Callable[[Record], Optional[Record]]] = None,
191 progress_callback: Optional[Callable[[MigrationProgress], None]] = None,
192 error_handler: Optional[Callable[[Exception, Record], bool]] = None,
193 preserve_ids: bool = False
194 ) -> MigrationResult:
195 """Migrate data synchronously.
197 Args:
198 query: Optional query to filter source records
199 batch_size: Number of records to process in each batch
200 transform: Optional transformation function for records
201 progress_callback: Optional callback for progress updates
202 error_handler: Optional error handler (return True to continue, False to stop)
203 preserve_ids: Whether to preserve record IDs from source
205 Returns:
206 MigrationResult with success status and statistics
207 """
208 if not isinstance(self.source, SyncDatabase):
209 raise TypeError("Source must be a SyncDatabase for sync migration")
210 if not isinstance(self.target, SyncDatabase):
211 raise TypeError("Target must be a SyncDatabase for sync migration")
213 progress = MigrationProgress()
214 progress.start_time = datetime.now()
216 try:
217 # Count total records
218 if query:
219 all_records = self.source.search(query)
220 progress.total_records = len(all_records)
221 else:
222 all_records = self.source.search(Query())
223 progress.total_records = len(all_records)
225 logger.info(f"Starting migration of {progress.total_records} records")
227 # Process in batches with thread pool
228 with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
229 for i in range(0, progress.total_records, batch_size):
230 batch = all_records[i:i + batch_size]
231 futures = []
233 for record in batch:
234 try:
235 # Apply transformation if provided
236 if transform:
237 transformed = transform(record)
238 if transformed is None:
239 progress.processed_records += 1
240 continue # Skip this record
241 record = transformed
243 # Submit to thread pool
244 if preserve_ids and hasattr(record, '_id'):
245 future = executor.submit(self.target.upsert, record._id, record)
246 else:
247 future = executor.submit(self.target.create, record)
249 futures.append(future)
251 except Exception as e:
252 progress.failed_records += 1
253 progress.errors.append({
254 'record': str(record),
255 'error': str(e)
256 })
258 if error_handler and not error_handler(e, record):
259 raise
261 # Wait for batch completion
262 for future in futures:
263 try:
264 future.result()
265 progress.processed_records += 1
266 progress.successful_records += 1
267 except Exception as e:
268 progress.processed_records += 1
269 progress.failed_records += 1
270 progress.errors.append({'error': str(e)})
272 # Report progress
273 if progress_callback:
274 progress_callback(progress)
276 logger.debug(f"Processed {progress.processed_records}/{progress.total_records} records")
278 progress.end_time = datetime.now()
280 success = progress.failed_records == 0
281 message = f"Migration completed: {progress.successful_records} successful, {progress.failed_records} failed"
282 logger.info(message)
284 return MigrationResult(success=success, progress=progress, message=message)
286 except Exception as e:
287 progress.end_time = datetime.now()
288 message = f"Migration failed: {str(e)}"
289 logger.error(message)
290 return MigrationResult(success=False, progress=progress, message=message)
292 def migrate(self, **kwargs) -> MigrationResult:
293 """Migrate data (auto-detects sync/async).
295 Automatically uses the appropriate migration method based on
296 the database types.
298 Args:
299 **kwargs: Arguments passed to migrate_async or migrate_sync
301 Returns:
302 MigrationResult with success status and statistics
303 """
304 if self._is_async:
305 # Run async migration in event loop
306 loop = asyncio.get_event_loop()
307 if loop.is_running():
308 # Already in async context
309 task = self.migrate_async(**kwargs)
310 return asyncio.create_task(task)
311 else:
312 # Run in new event loop
313 return loop.run_until_complete(self.migrate_async(**kwargs))
314 else:
315 return self.migrate_sync(**kwargs)