Coverage for src / dataknobs_data / backends / sql_base.py: 10%

400 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""Base SQL functionality shared between SQL database backends.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import uuid 

7from datetime import datetime 

8from typing import Any, TYPE_CHECKING 

9 

10from ..query import Operator, Query, SortOrder 

11from ..records import Record 

12 

13if TYPE_CHECKING: 

14 from ..query_logic import ComplexQuery 

15 

16 

17class SQLRecordSerializer: 

18 """Mixin for SQL record serialization/deserialization with vector support.""" 

19 

20 @staticmethod 

21 def record_to_json(record: Record) -> str: 

22 """Convert a Record to JSON string for storage. 

23  

24 Handles VectorField serialization to preserve metadata. 

25 """ 

26 from ..fields import VectorField 

27 

28 data = {} 

29 for field_name, field_obj in record.fields.items(): 

30 # Handle VectorField - preserve full metadata 

31 if isinstance(field_obj, VectorField): 

32 data[field_name] = field_obj.to_dict() 

33 # Handle other special fields that have to_list 

34 elif hasattr(field_obj, 'to_list') and callable(field_obj.to_list): 

35 data[field_name] = field_obj.to_list() 

36 else: 

37 data[field_name] = field_obj.value 

38 return json.dumps(data) 

39 

40 @staticmethod 

41 def get_vector_extraction_sql(field_name: str, dialect: str = "postgres") -> str: 

42 """Get SQL expression to extract vector from JSON field. 

43  

44 Handles both raw arrays and VectorField dict formats. 

45  

46 Args: 

47 field_name: Name of the vector field 

48 dialect: SQL dialect (postgres, sqlite, etc.) 

49  

50 Returns: 

51 SQL expression to extract vector value 

52 """ 

53 if dialect == "postgres": 

54 # PostgreSQL: Handle both formats - raw array or VectorField dict 

55 return f"""CASE  

56 WHEN jsonb_typeof(data->'{field_name}') = 'object'  

57 THEN (data->'{field_name}'->>'value')::vector 

58 ELSE (data->>'{field_name}')::vector 

59 END""" 

60 elif dialect == "sqlite": 

61 # SQLite doesn't have native vector type, return JSON string 

62 return f"""CASE  

63 WHEN json_type(json_extract(data, '$.{field_name}')) = 'object' 

64 THEN json_extract(data, '$.{field_name}.value') 

65 ELSE json_extract(data, '$.{field_name}') 

66 END""" 

67 else: 

68 # Generic fallback 

69 return f"data->'{field_name}'" 

70 

71 @staticmethod 

72 def json_to_record(data_json: str, metadata_json: str | None = None) -> Record: 

73 """Convert JSON strings to a Record. 

74  

75 Reconstructs VectorField objects from serialized format. 

76 """ 

77 from ..fields import Field, VectorField 

78 

79 data = json.loads(data_json) if data_json else {} 

80 metadata = json.loads(metadata_json) if metadata_json and metadata_json != 'null' else {} 

81 

82 # Reconstruct fields properly, especially VectorFields 

83 fields = {} 

84 for field_name, field_value in data.items(): 

85 # Check if this is a serialized VectorField 

86 if isinstance(field_value, dict) and field_value.get("type") == "vector": 

87 # Ensure the field has a 'name' key for from_dict (in case it's missing) 

88 if "name" not in field_value: 

89 field_value["name"] = field_name 

90 # Reconstruct VectorField from dict 

91 fields[field_name] = VectorField.from_dict(field_value) 

92 else: 

93 # Regular field 

94 fields[field_name] = Field(name=field_name, value=field_value) 

95 

96 # Create Record with properly typed fields 

97 record = Record(metadata=metadata) 

98 record.fields.update(fields) 

99 return record 

100 

101 @staticmethod 

102 def row_to_record(row: dict[str, Any]) -> Record: 

103 """Convert a database row to a Record. 

104  

105 Args: 

106 row: Database row as dictionary with 'id', 'data' and optional 'metadata' fields 

107  

108 Returns: 

109 Reconstructed Record object with ID set 

110 """ 

111 data_json = row.get("data", {}) 

112 if not isinstance(data_json, str): 

113 data_json = json.dumps(data_json) 

114 

115 metadata_json = row.get("metadata") 

116 if metadata_json and not isinstance(metadata_json, str): 

117 metadata_json = json.dumps(metadata_json) 

118 

119 record = SQLRecordSerializer.json_to_record(data_json, metadata_json) 

120 

121 # Ensure the record has its ID set from the row 

122 from ..database_utils import ensure_record_id 

123 if "id" in row: 

124 record = ensure_record_id(record, row["id"]) 

125 

126 return record 

127 

128 

129class SQLQueryBuilder: 

130 """Builds SQL queries from Query objects.""" 

131 

132 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard", param_style: str = "numeric"): 

133 """Initialize the SQL query builder. 

134  

135 Args: 

136 table_name: Name of the database table 

137 schema_name: Optional schema name 

138 dialect: SQL dialect ('postgres', 'sqlite', 'standard') 

139 param_style: Parameter style ('numeric' for $1, 'qmark' for ?, 'pyformat' for %(name)s) 

140 """ 

141 self.table_name = table_name 

142 self.schema_name = schema_name 

143 self.dialect = dialect 

144 self.param_style = param_style 

145 self.qualified_table = self._get_qualified_table_name() 

146 

147 def _get_qualified_table_name(self) -> str: 

148 """Get the fully qualified table name.""" 

149 if self.schema_name: 

150 return f"{self.schema_name}.{self.table_name}" 

151 return self.table_name 

152 

153 def _get_param_placeholder(self, param_num: int, param_name: str | None = None) -> str: 

154 """Get the appropriate parameter placeholder based on param_style. 

155  

156 Args: 

157 param_num: Parameter number (1-based) 

158 param_name: Optional parameter name for pyformat style 

159  

160 Returns: 

161 Parameter placeholder string 

162 """ 

163 if self.param_style == "numeric": 

164 return f"${param_num}" 

165 elif self.param_style == "qmark": 

166 return "?" 

167 elif self.param_style == "pyformat": 

168 name = param_name or f"p{param_num - 1}" # 0-based for pyformat 

169 return f"%({name})s" 

170 else: 

171 # Default to numeric for postgres dialect, qmark for others 

172 if self.dialect == "postgres": 

173 return f"${param_num}" 

174 else: 

175 return "?" 

176 

177 def build_create_query(self, record: Record, record_id: str | None = None) -> tuple[str, list[Any]]: 

178 """Build an INSERT query for creating a record. 

179 

180 Args: 

181 record: The record to insert 

182 record_id: Optional ID (will generate if not provided) 

183 

184 Returns: 

185 Tuple of (SQL query, parameters) 

186 """ 

187 record_id = record_id or record.storage_id or str(uuid.uuid4()) 

188 data = SQLRecordSerializer.record_to_json(record) 

189 metadata = json.dumps(record.metadata) if record.metadata else None 

190 

191 p1 = self._get_param_placeholder(1) 

192 p2 = self._get_param_placeholder(2) 

193 p3 = self._get_param_placeholder(3) 

194 

195 query = f""" 

196 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at) 

197 VALUES ({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) 

198 """ 

199 if self.dialect == "postgres": 

200 query += " RETURNING id" 

201 

202 params = [record_id, data, metadata] 

203 

204 return query, params 

205 

206 def build_read_query(self, record_id: str) -> tuple[str, list[Any]]: 

207 """Build a SELECT query for reading a record by ID. 

208  

209 Args: 

210 record_id: The record ID 

211  

212 Returns: 

213 Tuple of (SQL query, parameters) 

214 """ 

215 p1 = self._get_param_placeholder(1) 

216 query = f"SELECT * FROM {self.qualified_table} WHERE id = {p1}" 

217 

218 return query, [record_id] 

219 

220 def build_update_query(self, record_id: str, record: Record) -> tuple[str, list[Any]]: 

221 """Build an UPDATE query for updating a record. 

222  

223 Args: 

224 record_id: The record ID 

225 record: The updated record 

226  

227 Returns: 

228 Tuple of (SQL query, parameters) 

229 """ 

230 data = self._record_to_json(record) 

231 metadata = json.dumps(record.metadata) if record.metadata else None 

232 

233 if self.param_style == "qmark": 

234 # SQLite: data, metadata, then id 

235 query = f""" 

236 UPDATE {self.qualified_table} 

237 SET data = ?, metadata = ?, updated_at = CURRENT_TIMESTAMP 

238 WHERE id = ? 

239 """ 

240 params = [data, metadata, record_id] 

241 else: 

242 # PostgreSQL: id first, then data, metadata 

243 p1 = self._get_param_placeholder(1) 

244 p2 = self._get_param_placeholder(2) 

245 p3 = self._get_param_placeholder(3) 

246 query = f""" 

247 UPDATE {self.qualified_table} 

248 SET data = {p2}, metadata = {p3}, updated_at = CURRENT_TIMESTAMP 

249 WHERE id = {p1} 

250 """ 

251 params = [record_id, data, metadata] 

252 

253 return query, params 

254 

255 def build_delete_query(self, record_id: str) -> tuple[str, list[Any]]: 

256 """Build a DELETE query for deleting a record. 

257  

258 Args: 

259 record_id: The record ID 

260  

261 Returns: 

262 Tuple of (SQL query, parameters) 

263 """ 

264 p1 = self._get_param_placeholder(1) 

265 query = f"DELETE FROM {self.qualified_table} WHERE id = {p1}" 

266 

267 return query, [record_id] 

268 

269 def build_exists_query(self, record_id: str) -> tuple[str, list[Any]]: 

270 """Build a query to check if a record exists. 

271  

272 Args: 

273 record_id: The record ID 

274  

275 Returns: 

276 Tuple of (SQL query, parameters) 

277 """ 

278 p1 = self._get_param_placeholder(1) 

279 query = f"SELECT 1 FROM {self.qualified_table} WHERE id = {p1} LIMIT 1" 

280 

281 return query, [record_id] 

282 

283 def build_complex_search_query(self, query: ComplexQuery) -> tuple[str, list[Any]]: 

284 """Build a SELECT query from a ComplexQuery object with boolean logic. 

285  

286 Args: 

287 query: The ComplexQuery object 

288  

289 Returns: 

290 Tuple of (SQL query, parameters) 

291 """ 

292 sql_parts = [f"SELECT * FROM {self.qualified_table}"] 

293 params = [] 

294 

295 # Build WHERE clause from complex conditions 

296 if query.condition: 

297 where_clause, where_params = self._build_complex_condition(query.condition, 1) 

298 if where_clause: 

299 sql_parts.append(f"WHERE {where_clause}") 

300 params.extend(where_params) 

301 

302 # Add ORDER BY 

303 if query.sort_specs: 

304 order_parts = [] 

305 for sort_spec in query.sort_specs: 

306 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC" 

307 if self.dialect == "postgres": 

308 order_parts.append(f"data->'{sort_spec.field}' {direction}") 

309 elif self.dialect == "sqlite": 

310 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}") 

311 elif self.dialect == "duckdb": 

312 order_parts.append(f"json_extract_string(data, '$.{sort_spec.field}') {direction}") 

313 else: 

314 order_parts.append(f"data {direction}") 

315 sql_parts.append("ORDER BY " + ", ".join(order_parts)) 

316 

317 # Add LIMIT and OFFSET 

318 if query.limit_value: 

319 sql_parts.append(f"LIMIT {query.limit_value}") 

320 if query.offset_value: 

321 sql_parts.append(f"OFFSET {query.offset_value}") 

322 

323 return " ".join(sql_parts), params 

324 

325 def _build_complex_condition(self, condition: Any, param_start: int) -> tuple[str, list[Any]]: 

326 """Build WHERE clause for complex boolean logic conditions. 

327  

328 Args: 

329 condition: The Condition object (LogicCondition or FilterCondition) 

330 param_start: Starting parameter number 

331  

332 Returns: 

333 Tuple of (SQL clause, parameters) 

334 """ 

335 from ..query_logic import FilterCondition, LogicCondition, LogicOperator 

336 

337 params = [] 

338 

339 # Handle FilterCondition (leaf node) 

340 if isinstance(condition, FilterCondition): 

341 clause, filter_params = self._build_filter_clause(condition.filter, param_start) 

342 return clause, filter_params 

343 

344 # Handle LogicCondition (branch node) 

345 elif isinstance(condition, LogicCondition): 

346 if condition.operator == LogicOperator.AND: 

347 clauses = [] 

348 current_param = param_start 

349 for sub_condition in condition.conditions: 

350 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param) 

351 if sub_clause: 

352 clauses.append(sub_clause) 

353 params.extend(sub_params) 

354 current_param += len(sub_params) 

355 return (f"({' AND '.join(clauses)})", params) if clauses else ("", []) 

356 

357 elif condition.operator == LogicOperator.OR: 

358 clauses = [] 

359 current_param = param_start 

360 for sub_condition in condition.conditions: 

361 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param) 

362 if sub_clause: 

363 clauses.append(sub_clause) 

364 params.extend(sub_params) 

365 current_param += len(sub_params) 

366 return (f"({' OR '.join(clauses)})", params) if clauses else ("", []) 

367 

368 elif condition.operator == LogicOperator.NOT: 

369 sub_clause, sub_params = self._build_complex_condition(condition.conditions[0], param_start) 

370 params.extend(sub_params) 

371 return (f"NOT ({sub_clause})", params) if sub_clause else ("", []) 

372 

373 return ("", []) 

374 

375 def build_where_clause(self, query: Query | None, param_start: int = 1) -> tuple[str, list[Any]]: 

376 """Build just the WHERE clause from a Query object. 

377  

378 Args: 

379 query: The Query object (can be None) 

380 param_start: Starting parameter number for placeholders 

381  

382 Returns: 

383 Tuple of (WHERE clause SQL, parameters) 

384 Returns empty string and empty list if no filters 

385 """ 

386 if not query or not query.filters: 

387 return "", [] 

388 

389 where_clauses = [] 

390 params = [] 

391 param_count = param_start - 1 

392 

393 for filter_spec in query.filters: 

394 param_count += 1 

395 clause, new_params = self._build_filter_clause(filter_spec, param_count) 

396 where_clauses.append(clause) 

397 params.extend(new_params) 

398 param_count += len(new_params) - 1 # Adjust for multiple params 

399 

400 if where_clauses: 

401 return " AND " + " AND ".join(where_clauses), params 

402 return "", [] 

403 

404 def build_search_query(self, query: Query) -> tuple[str, list[Any]]: 

405 """Build a SELECT query from a Query object. 

406  

407 Args: 

408 query: The Query object 

409  

410 Returns: 

411 Tuple of (SQL query, parameters) 

412 """ 

413 sql_parts = [f"SELECT * FROM {self.qualified_table}"] 

414 params = [] 

415 param_count = 0 

416 

417 # Build WHERE clause 

418 where_clauses = [] 

419 for filter_spec in query.filters: 

420 param_count += 1 

421 clause, new_params = self._build_filter_clause(filter_spec, param_count) 

422 where_clauses.append(clause) 

423 params.extend(new_params) 

424 param_count += len(new_params) - 1 # Adjust for multiple params 

425 

426 if where_clauses: 

427 sql_parts.append("WHERE " + " AND ".join(where_clauses)) 

428 

429 # Add ORDER BY 

430 if query.sort_specs: 

431 order_parts = [] 

432 for sort_spec in query.sort_specs: 

433 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC" 

434 # Special handling for 'id' field - it's in a separate column 

435 if sort_spec.field == 'id': 

436 order_parts.append(f"id {direction}") 

437 # Use JSON extraction based on dialect 

438 elif self.dialect == "postgres": 

439 order_parts.append(f"data->'{sort_spec.field}' {direction}") 

440 elif self.dialect == "sqlite": 

441 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}") 

442 elif self.dialect == "duckdb": 

443 order_parts.append(f"json_extract_string(data, '$.{sort_spec.field}') {direction}") 

444 else: 

445 order_parts.append(f"data {direction}") # Fallback 

446 sql_parts.append("ORDER BY " + ", ".join(order_parts)) 

447 

448 # Add LIMIT and OFFSET 

449 if query.limit_value: 

450 sql_parts.append(f"LIMIT {query.limit_value}") 

451 if query.offset_value: 

452 sql_parts.append(f"OFFSET {query.offset_value}") 

453 

454 return " ".join(sql_parts), params 

455 

456 def build_batch_update_query(self, updates: list[tuple[str, Record]]) -> tuple[str, list[Any]]: 

457 """Build a batch UPDATE query using CASE expressions. 

458  

459 This provides efficient batch updates for both PostgreSQL and SQLite. 

460  

461 Args: 

462 updates: List of (id, record) tuples to update 

463  

464 Returns: 

465 Tuple of (SQL query, parameters) 

466 """ 

467 if not updates: 

468 return "", [] 

469 

470 update_ids = [] 

471 data_cases = [] 

472 metadata_cases = [] 

473 params = [] 

474 

475 # Build CASE expressions 

476 for i, (record_id, record) in enumerate(updates): 

477 update_ids.append(record_id) 

478 data_json = self._record_to_json(record) 

479 metadata_json = json.dumps(record.metadata) if record.metadata else None 

480 

481 # Generate placeholders for this update 

482 if self.param_style == "qmark": 

483 # SQLite uses ? placeholders and needs repeated IDs 

484 data_cases.append("WHEN id = ? THEN ?") 

485 metadata_cases.append("WHEN id = ? THEN ?") 

486 params.extend([record_id, data_json, record_id, metadata_json]) 

487 else: 

488 # PostgreSQL uses numbered/named placeholders 

489 param_idx = i * 3 + 1 

490 p1 = self._get_param_placeholder(param_idx) 

491 p2 = self._get_param_placeholder(param_idx + 1) 

492 p3 = self._get_param_placeholder(param_idx + 2) 

493 data_cases.append(f"WHEN id = {p1} THEN {p2}") 

494 metadata_cases.append(f"WHEN id = {p1} THEN {p3}") 

495 params.extend([record_id, data_json, metadata_json]) 

496 

497 # Build WHERE IN clause 

498 id_param_start = len(updates) * 3 + 1 if self.param_style != "qmark" else 0 

499 if self.param_style == "qmark": 

500 # SQLite: add IDs for WHERE IN clause 

501 id_placeholders = ["?" for _ in update_ids] 

502 else: 

503 # PostgreSQL with numbered/named placeholders 

504 id_placeholders = [self._get_param_placeholder(i) for i in range(id_param_start, id_param_start + len(update_ids))] 

505 params.extend(update_ids) 

506 

507 # Build the UPDATE query 

508 # Add ELSE to preserve original value when no CASE matches 

509 query = f""" 

510 UPDATE {self.qualified_table} 

511 SET  

512 data = CASE {' '.join(data_cases)} ELSE data END, 

513 metadata = CASE {' '.join(metadata_cases)} ELSE metadata END, 

514 updated_at = CURRENT_TIMESTAMP 

515 WHERE id IN ({', '.join(id_placeholders)}) 

516 """ 

517 

518 # PostgreSQL can use RETURNING 

519 if self.dialect == "postgres": 

520 query += " RETURNING id" 

521 

522 return query, params 

523 

524 def build_batch_create_query(self, records: list[Record]) -> tuple[str, list[Any], list[str]]: 

525 """Build a batch INSERT query for multiple records. 

526  

527 Generates efficient multi-value INSERT statements. 

528  

529 Args: 

530 records: List of records to insert 

531  

532 Returns: 

533 Tuple of (SQL query, parameters, generated IDs) 

534 """ 

535 if not records: 

536 return "", [], [] 

537 

538 import uuid 

539 

540 # Generate IDs and prepare values 

541 ids = [] 

542 values_clauses = [] 

543 params = [] 

544 

545 for i, record in enumerate(records): 

546 record_id = str(uuid.uuid4()) 

547 ids.append(record_id) 

548 data_json = self._record_to_json(record) 

549 metadata_json = json.dumps(record.metadata) if record.metadata else None 

550 

551 # Generate placeholders for this row 

552 param_idx = i * 3 + 1 

553 p1 = self._get_param_placeholder(param_idx) 

554 p2 = self._get_param_placeholder(param_idx + 1) 

555 p3 = self._get_param_placeholder(param_idx + 2) 

556 values_clauses.append(f"({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") 

557 params.extend([record_id, data_json, metadata_json]) 

558 

559 # Build the INSERT query 

560 query = f""" 

561 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at) 

562 VALUES {', '.join(values_clauses)} 

563 """ 

564 

565 # PostgreSQL can use RETURNING 

566 if self.dialect == "postgres": 

567 query += " RETURNING id" 

568 

569 return query, params, ids 

570 

571 def build_batch_delete_query(self, ids: list[str]) -> tuple[str, list[Any]]: 

572 """Build a batch DELETE query for multiple records. 

573  

574 Deletes multiple records in a single query. 

575  

576 Args: 

577 ids: List of record IDs to delete 

578  

579 Returns: 

580 Tuple of (SQL query, parameters) 

581 """ 

582 if not ids: 

583 return "", [] 

584 

585 # Generate placeholders for IDs 

586 placeholders = [self._get_param_placeholder(i) for i in range(1, len(ids) + 1)] 

587 

588 query = f""" 

589 DELETE FROM {self.qualified_table} 

590 WHERE id IN ({', '.join(placeholders)}) 

591 """ 

592 

593 # PostgreSQL can use RETURNING 

594 if self.dialect == "postgres": 

595 query += " RETURNING id" 

596 

597 return query, ids 

598 

599 def build_count_query(self, query: Query | None = None) -> tuple[str, list[Any]]: 

600 """Build a COUNT query. 

601  

602 Args: 

603 query: Optional Query object for filtering 

604  

605 Returns: 

606 Tuple of (SQL query, parameters) 

607 """ 

608 if query and query.filters: 

609 search_query, params = self.build_search_query(query) 

610 # Replace SELECT * with SELECT COUNT(*) 

611 count_query = search_query.replace("SELECT *", "SELECT COUNT(*)", 1) 

612 # Remove ORDER BY, LIMIT, OFFSET clauses 

613 for clause in ["ORDER BY", "LIMIT", "OFFSET"]: 

614 if clause in count_query: 

615 count_query = count_query[:count_query.index(clause)] 

616 return count_query.strip(), params 

617 else: 

618 return f"SELECT COUNT(*) FROM {self.qualified_table}", [] 

619 

620 def _build_filter_clause(self, filter_spec: Any, param_start: int) -> tuple[str, list[Any]]: 

621 """Build a WHERE clause for a filter. 

622  

623 Args: 

624 filter_spec: The filter specification 

625 param_start: Starting parameter number 

626  

627 Returns: 

628 Tuple of (SQL clause, parameters) 

629 """ 

630 field = filter_spec.field 

631 op = filter_spec.operator 

632 value = filter_spec.value 

633 

634 # Special handling for 'id' field - it's in a separate column 

635 if field == 'id': 

636 field_expr = 'id' 

637 param_placeholder = self._get_param_placeholder(param_start) 

638 # JSON field extraction with type casting for PostgreSQL 

639 elif self.dialect == "postgres": 

640 # For PostgreSQL, we need to cast JSONB text to appropriate types for comparisons 

641 base_field_expr = f"data->>'{field}'" 

642 

643 # Determine if we need type casting based on operator and value type 

644 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]: 

645 # These operators need numeric comparison 

646 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))): 

647 field_expr = f"({base_field_expr})::numeric" 

648 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)): 

649 field_expr = f"({base_field_expr})::timestamp" 

650 else: 

651 field_expr = base_field_expr 

652 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]: 

653 # For equality and IN operations, cast based on value type 

654 if isinstance(value, bool): 

655 field_expr = f"({base_field_expr})::boolean" 

656 elif isinstance(value, (int, float)): 

657 field_expr = f"({base_field_expr})::numeric" 

658 elif isinstance(value, list) and value and isinstance(value[0], (int, float)): 

659 # IN/NOT_IN with numeric values 

660 field_expr = f"({base_field_expr})::numeric" 

661 else: 

662 field_expr = base_field_expr 

663 else: 

664 field_expr = base_field_expr 

665 

666 param_placeholder = self._get_param_placeholder(param_start) 

667 # JSON field extraction with type casting for DuckDB 

668 elif self.dialect == "duckdb": 

669 # DuckDB uses json_extract_string for text extraction, with CAST for type conversion 

670 base_field_expr = f"json_extract_string(data, '$.{field}')" 

671 

672 # Determine if we need type casting based on operator and value type 

673 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]: 

674 # These operators need numeric comparison 

675 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))): 

676 field_expr = f"CAST({base_field_expr} AS DOUBLE)" 

677 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)): 

678 field_expr = f"CAST({base_field_expr} AS TIMESTAMP)" 

679 else: 

680 field_expr = base_field_expr 

681 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]: 

682 # For equality and IN operations, cast based on value type 

683 if isinstance(value, bool): 

684 field_expr = f"CAST({base_field_expr} AS BOOLEAN)" 

685 elif isinstance(value, (int, float)): 

686 field_expr = f"CAST({base_field_expr} AS DOUBLE)" 

687 elif isinstance(value, list) and value and isinstance(value[0], (int, float)): 

688 # IN/NOT_IN with numeric values 

689 field_expr = f"CAST({base_field_expr} AS DOUBLE)" 

690 else: 

691 field_expr = base_field_expr 

692 else: 

693 field_expr = base_field_expr 

694 

695 param_placeholder = self._get_param_placeholder(param_start) 

696 elif self.dialect == "sqlite": 

697 field_expr = f"json_extract(data, '$.{field}')" 

698 param_placeholder = self._get_param_placeholder(param_start) 

699 else: 

700 field_expr = field 

701 param_placeholder = self._get_param_placeholder(param_start) 

702 

703 # Build clause based on operator 

704 if op == Operator.EQ: 

705 return f"{field_expr} = {param_placeholder}", [value] 

706 elif op == Operator.NEQ: 

707 return f"{field_expr} != {param_placeholder}", [value] 

708 elif op == Operator.GT: 

709 return f"{field_expr} > {param_placeholder}", [value] 

710 elif op == Operator.GTE: 

711 return f"{field_expr} >= {param_placeholder}", [value] 

712 elif op == Operator.LT: 

713 return f"{field_expr} < {param_placeholder}", [value] 

714 elif op == Operator.LTE: 

715 return f"{field_expr} <= {param_placeholder}", [value] 

716 elif op == Operator.LIKE: 

717 return f"{field_expr} LIKE {param_placeholder}", [value] 

718 elif op == Operator.NOT_LIKE: 

719 return f"{field_expr} NOT LIKE {param_placeholder}", [value] 

720 elif op == Operator.IN: 

721 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))]) 

722 return f"{field_expr} IN ({placeholders})", list(value) 

723 elif op == Operator.NOT_IN: 

724 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))]) 

725 return f"{field_expr} NOT IN ({placeholders})", list(value) 

726 elif op == Operator.BETWEEN: 

727 placeholder1 = self._get_param_placeholder(param_start) 

728 placeholder2 = self._get_param_placeholder(param_start + 1) 

729 return f"{field_expr} BETWEEN {placeholder1} AND {placeholder2}", list(value) 

730 elif op == Operator.NOT_BETWEEN: 

731 placeholder1 = self._get_param_placeholder(param_start) 

732 placeholder2 = self._get_param_placeholder(param_start + 1) 

733 return f"{field_expr} NOT BETWEEN {placeholder1} AND {placeholder2}", list(value) 

734 elif op == Operator.EXISTS: 

735 return f"{field_expr} IS NOT NULL", [] 

736 elif op == Operator.NOT_EXISTS: 

737 return f"{field_expr} IS NULL", [] 

738 elif op == Operator.REGEX: 

739 # PostgreSQL uses ~ for regex, SQLite and DuckDB use REGEXP 

740 if self.dialect == "postgres": 

741 return f"{field_expr} ~ {param_placeholder}", [value] 

742 elif self.dialect == "duckdb": 

743 return f"regexp_matches({field_expr}, {param_placeholder})", [value] 

744 else: 

745 return f"{field_expr} REGEXP {param_placeholder}", [value] 

746 else: 

747 raise ValueError(f"Unsupported operator: {op}") 

748 

749 def _record_to_json(self, record: Record) -> str: 

750 """Convert a Record to JSON string for storage.""" 

751 return SQLRecordSerializer.record_to_json(record) 

752 

753 @staticmethod 

754 def row_to_record(row: dict[str, Any]) -> Record: 

755 """Convert a database row to a Record. 

756  

757 Args: 

758 row: Database row as dictionary 

759  

760 Returns: 

761 Record object 

762 """ 

763 return SQLRecordSerializer.row_to_record(row) 

764 

765 

766class SQLTableManager: 

767 """Manages SQL table creation and schema.""" 

768 

769 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard"): 

770 """Initialize the table manager. 

771  

772 Args: 

773 table_name: Name of the database table 

774 schema_name: Optional schema name 

775 dialect: SQL dialect ('postgres', 'sqlite', 'standard') 

776 """ 

777 self.table_name = table_name 

778 self.schema_name = schema_name 

779 self.dialect = dialect 

780 self.qualified_table = self._get_qualified_table_name() 

781 

782 def _get_qualified_table_name(self) -> str: 

783 """Get the fully qualified table name.""" 

784 if self.schema_name: 

785 return f"{self.schema_name}.{self.table_name}" 

786 return self.table_name 

787 

788 def get_create_table_sql(self) -> str: 

789 """Get the CREATE TABLE SQL statement. 

790 

791 Returns: 

792 SQL statement for creating the table 

793 """ 

794 if self.dialect == "postgres": 

795 return f""" 

796 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

797 id VARCHAR(255) PRIMARY KEY, 

798 data JSONB NOT NULL, 

799 metadata JSONB, 

800 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

801 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

802 ); 

803 

804 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data 

805 ON {self.qualified_table} USING GIN (data); 

806 

807 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_metadata 

808 ON {self.qualified_table} USING GIN (metadata); 

809 """ 

810 elif self.dialect == "sqlite": 

811 # SQLite doesn't have JSONB, uses TEXT for JSON storage 

812 return f""" 

813 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

814 id VARCHAR(255) PRIMARY KEY, 

815 data TEXT NOT NULL CHECK (json_valid(data)), 

816 metadata TEXT CHECK (metadata IS NULL OR json_valid(metadata)), 

817 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

818 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

819 ); 

820 

821 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created 

822 ON {self.qualified_table} (created_at); 

823 

824 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated 

825 ON {self.qualified_table} (updated_at); 

826 """ 

827 elif self.dialect == "duckdb": 

828 # DuckDB has native JSON type for efficient JSON storage and querying 

829 return f""" 

830 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

831 id VARCHAR(255) PRIMARY KEY, 

832 data JSON NOT NULL, 

833 metadata JSON, 

834 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

835 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

836 ); 

837 

838 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created 

839 ON {self.qualified_table} (created_at); 

840 

841 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated 

842 ON {self.qualified_table} (updated_at); 

843 """ 

844 else: 

845 # Generic SQL 

846 return f""" 

847 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

848 id VARCHAR(255) PRIMARY KEY, 

849 data TEXT NOT NULL, 

850 metadata TEXT, 

851 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

852 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

853 ); 

854 """ 

855 

856 def get_drop_table_sql(self) -> str: 

857 """Get the DROP TABLE SQL statement. 

858  

859 Returns: 

860 SQL statement for dropping the table 

861 """ 

862 return f"DROP TABLE IF EXISTS {self.qualified_table}"