Coverage for src/dataknobs_data/backends/sql_base.py: 11%

374 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:14 -0600

1"""Base SQL functionality shared between SQL database backends.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import uuid 

7from datetime import datetime 

8from typing import Any, TYPE_CHECKING 

9 

10from ..query import Operator, Query, SortOrder 

11from ..records import Record 

12 

13if TYPE_CHECKING: 

14 from ..query_logic import ComplexQuery 

15 

16 

17class SQLRecordSerializer: 

18 """Mixin for SQL record serialization/deserialization with vector support.""" 

19 

20 @staticmethod 

21 def record_to_json(record: Record) -> str: 

22 """Convert a Record to JSON string for storage. 

23  

24 Handles VectorField serialization to preserve metadata. 

25 """ 

26 from ..fields import VectorField 

27 

28 data = {} 

29 for field_name, field_obj in record.fields.items(): 

30 # Handle VectorField - preserve full metadata 

31 if isinstance(field_obj, VectorField): 

32 data[field_name] = field_obj.to_dict() 

33 # Handle other special fields that have to_list 

34 elif hasattr(field_obj, 'to_list') and callable(field_obj.to_list): 

35 data[field_name] = field_obj.to_list() 

36 else: 

37 data[field_name] = field_obj.value 

38 return json.dumps(data) 

39 

40 @staticmethod 

41 def get_vector_extraction_sql(field_name: str, dialect: str = "postgres") -> str: 

42 """Get SQL expression to extract vector from JSON field. 

43  

44 Handles both raw arrays and VectorField dict formats. 

45  

46 Args: 

47 field_name: Name of the vector field 

48 dialect: SQL dialect (postgres, sqlite, etc.) 

49  

50 Returns: 

51 SQL expression to extract vector value 

52 """ 

53 if dialect == "postgres": 

54 # PostgreSQL: Handle both formats - raw array or VectorField dict 

55 return f"""CASE  

56 WHEN jsonb_typeof(data->'{field_name}') = 'object'  

57 THEN (data->'{field_name}'->>'value')::vector 

58 ELSE (data->>'{field_name}')::vector 

59 END""" 

60 elif dialect == "sqlite": 

61 # SQLite doesn't have native vector type, return JSON string 

62 return f"""CASE  

63 WHEN json_type(json_extract(data, '$.{field_name}')) = 'object' 

64 THEN json_extract(data, '$.{field_name}.value') 

65 ELSE json_extract(data, '$.{field_name}') 

66 END""" 

67 else: 

68 # Generic fallback 

69 return f"data->'{field_name}'" 

70 

71 @staticmethod 

72 def json_to_record(data_json: str, metadata_json: str | None = None) -> Record: 

73 """Convert JSON strings to a Record. 

74  

75 Reconstructs VectorField objects from serialized format. 

76 """ 

77 from ..fields import Field, VectorField 

78 

79 data = json.loads(data_json) if data_json else {} 

80 metadata = json.loads(metadata_json) if metadata_json and metadata_json != 'null' else {} 

81 

82 # Reconstruct fields properly, especially VectorFields 

83 fields = {} 

84 for field_name, field_value in data.items(): 

85 # Check if this is a serialized VectorField 

86 if isinstance(field_value, dict) and field_value.get("type") == "vector": 

87 # Ensure the field has a 'name' key for from_dict (in case it's missing) 

88 if "name" not in field_value: 

89 field_value["name"] = field_name 

90 # Reconstruct VectorField from dict 

91 fields[field_name] = VectorField.from_dict(field_value) 

92 else: 

93 # Regular field 

94 fields[field_name] = Field(name=field_name, value=field_value) 

95 

96 # Create Record with properly typed fields 

97 record = Record(metadata=metadata) 

98 record.fields.update(fields) 

99 return record 

100 

101 @staticmethod 

102 def row_to_record(row: dict[str, Any]) -> Record: 

103 """Convert a database row to a Record. 

104  

105 Args: 

106 row: Database row as dictionary with 'id', 'data' and optional 'metadata' fields 

107  

108 Returns: 

109 Reconstructed Record object with ID set 

110 """ 

111 data_json = row.get("data", {}) 

112 if not isinstance(data_json, str): 

113 data_json = json.dumps(data_json) 

114 

115 metadata_json = row.get("metadata") 

116 if metadata_json and not isinstance(metadata_json, str): 

117 metadata_json = json.dumps(metadata_json) 

118 

119 record = SQLRecordSerializer.json_to_record(data_json, metadata_json) 

120 

121 # Ensure the record has its ID set from the row 

122 from ..database_utils import ensure_record_id 

123 if "id" in row: 

124 record = ensure_record_id(record, row["id"]) 

125 

126 return record 

127 

128 

129class SQLQueryBuilder: 

130 """Builds SQL queries from Query objects.""" 

131 

132 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard", param_style: str = "numeric"): 

133 """Initialize the SQL query builder. 

134  

135 Args: 

136 table_name: Name of the database table 

137 schema_name: Optional schema name 

138 dialect: SQL dialect ('postgres', 'sqlite', 'standard') 

139 param_style: Parameter style ('numeric' for $1, 'qmark' for ?, 'pyformat' for %(name)s) 

140 """ 

141 self.table_name = table_name 

142 self.schema_name = schema_name 

143 self.dialect = dialect 

144 self.param_style = param_style 

145 self.qualified_table = self._get_qualified_table_name() 

146 

147 def _get_qualified_table_name(self) -> str: 

148 """Get the fully qualified table name.""" 

149 if self.schema_name: 

150 return f"{self.schema_name}.{self.table_name}" 

151 return self.table_name 

152 

153 def _get_param_placeholder(self, param_num: int, param_name: str | None = None) -> str: 

154 """Get the appropriate parameter placeholder based on param_style. 

155  

156 Args: 

157 param_num: Parameter number (1-based) 

158 param_name: Optional parameter name for pyformat style 

159  

160 Returns: 

161 Parameter placeholder string 

162 """ 

163 if self.param_style == "numeric": 

164 return f"${param_num}" 

165 elif self.param_style == "qmark": 

166 return "?" 

167 elif self.param_style == "pyformat": 

168 name = param_name or f"p{param_num - 1}" # 0-based for pyformat 

169 return f"%({name})s" 

170 else: 

171 # Default to numeric for postgres dialect, qmark for others 

172 if self.dialect == "postgres": 

173 return f"${param_num}" 

174 else: 

175 return "?" 

176 

177 def build_create_query(self, record: Record, record_id: str | None = None) -> tuple[str, list[Any]]: 

178 """Build an INSERT query for creating a record. 

179  

180 Args: 

181 record: The record to insert 

182 record_id: Optional ID (will generate if not provided) 

183  

184 Returns: 

185 Tuple of (SQL query, parameters) 

186 """ 

187 record_id = record_id or str(uuid.uuid4()) 

188 data = SQLRecordSerializer.record_to_json(record) 

189 metadata = json.dumps(record.metadata) if record.metadata else None 

190 

191 p1 = self._get_param_placeholder(1) 

192 p2 = self._get_param_placeholder(2) 

193 p3 = self._get_param_placeholder(3) 

194 

195 query = f""" 

196 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at) 

197 VALUES ({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) 

198 """ 

199 if self.dialect == "postgres": 

200 query += " RETURNING id" 

201 

202 params = [record_id, data, metadata] 

203 

204 return query, params 

205 

206 def build_read_query(self, record_id: str) -> tuple[str, list[Any]]: 

207 """Build a SELECT query for reading a record by ID. 

208  

209 Args: 

210 record_id: The record ID 

211  

212 Returns: 

213 Tuple of (SQL query, parameters) 

214 """ 

215 p1 = self._get_param_placeholder(1) 

216 query = f"SELECT * FROM {self.qualified_table} WHERE id = {p1}" 

217 

218 return query, [record_id] 

219 

220 def build_update_query(self, record_id: str, record: Record) -> tuple[str, list[Any]]: 

221 """Build an UPDATE query for updating a record. 

222  

223 Args: 

224 record_id: The record ID 

225 record: The updated record 

226  

227 Returns: 

228 Tuple of (SQL query, parameters) 

229 """ 

230 data = self._record_to_json(record) 

231 metadata = json.dumps(record.metadata) if record.metadata else None 

232 

233 if self.param_style == "qmark": 

234 # SQLite: data, metadata, then id 

235 query = f""" 

236 UPDATE {self.qualified_table} 

237 SET data = ?, metadata = ?, updated_at = CURRENT_TIMESTAMP 

238 WHERE id = ? 

239 """ 

240 params = [data, metadata, record_id] 

241 else: 

242 # PostgreSQL: id first, then data, metadata 

243 p1 = self._get_param_placeholder(1) 

244 p2 = self._get_param_placeholder(2) 

245 p3 = self._get_param_placeholder(3) 

246 query = f""" 

247 UPDATE {self.qualified_table} 

248 SET data = {p2}, metadata = {p3}, updated_at = CURRENT_TIMESTAMP 

249 WHERE id = {p1} 

250 """ 

251 params = [record_id, data, metadata] 

252 

253 return query, params 

254 

255 def build_delete_query(self, record_id: str) -> tuple[str, list[Any]]: 

256 """Build a DELETE query for deleting a record. 

257  

258 Args: 

259 record_id: The record ID 

260  

261 Returns: 

262 Tuple of (SQL query, parameters) 

263 """ 

264 p1 = self._get_param_placeholder(1) 

265 query = f"DELETE FROM {self.qualified_table} WHERE id = {p1}" 

266 

267 return query, [record_id] 

268 

269 def build_exists_query(self, record_id: str) -> tuple[str, list[Any]]: 

270 """Build a query to check if a record exists. 

271  

272 Args: 

273 record_id: The record ID 

274  

275 Returns: 

276 Tuple of (SQL query, parameters) 

277 """ 

278 p1 = self._get_param_placeholder(1) 

279 query = f"SELECT 1 FROM {self.qualified_table} WHERE id = {p1} LIMIT 1" 

280 

281 return query, [record_id] 

282 

283 def build_complex_search_query(self, query: ComplexQuery) -> tuple[str, list[Any]]: 

284 """Build a SELECT query from a ComplexQuery object with boolean logic. 

285  

286 Args: 

287 query: The ComplexQuery object 

288  

289 Returns: 

290 Tuple of (SQL query, parameters) 

291 """ 

292 sql_parts = [f"SELECT * FROM {self.qualified_table}"] 

293 params = [] 

294 

295 # Build WHERE clause from complex conditions 

296 if query.condition: 

297 where_clause, where_params = self._build_complex_condition(query.condition, 1) 

298 if where_clause: 

299 sql_parts.append(f"WHERE {where_clause}") 

300 params.extend(where_params) 

301 

302 # Add ORDER BY 

303 if query.sort_specs: 

304 order_parts = [] 

305 for sort_spec in query.sort_specs: 

306 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC" 

307 if self.dialect == "postgres": 

308 order_parts.append(f"data->'{sort_spec.field}' {direction}") 

309 elif self.dialect == "sqlite": 

310 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}") 

311 else: 

312 order_parts.append(f"data {direction}") 

313 sql_parts.append("ORDER BY " + ", ".join(order_parts)) 

314 

315 # Add LIMIT and OFFSET 

316 if query.limit_value: 

317 sql_parts.append(f"LIMIT {query.limit_value}") 

318 if query.offset_value: 

319 sql_parts.append(f"OFFSET {query.offset_value}") 

320 

321 return " ".join(sql_parts), params 

322 

323 def _build_complex_condition(self, condition: Any, param_start: int) -> tuple[str, list[Any]]: 

324 """Build WHERE clause for complex boolean logic conditions. 

325  

326 Args: 

327 condition: The Condition object (LogicCondition or FilterCondition) 

328 param_start: Starting parameter number 

329  

330 Returns: 

331 Tuple of (SQL clause, parameters) 

332 """ 

333 from ..query_logic import FilterCondition, LogicCondition, LogicOperator 

334 

335 params = [] 

336 

337 # Handle FilterCondition (leaf node) 

338 if isinstance(condition, FilterCondition): 

339 clause, filter_params = self._build_filter_clause(condition.filter, param_start) 

340 return clause, filter_params 

341 

342 # Handle LogicCondition (branch node) 

343 elif isinstance(condition, LogicCondition): 

344 if condition.operator == LogicOperator.AND: 

345 clauses = [] 

346 current_param = param_start 

347 for sub_condition in condition.conditions: 

348 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param) 

349 if sub_clause: 

350 clauses.append(sub_clause) 

351 params.extend(sub_params) 

352 current_param += len(sub_params) 

353 return (f"({' AND '.join(clauses)})", params) if clauses else ("", []) 

354 

355 elif condition.operator == LogicOperator.OR: 

356 clauses = [] 

357 current_param = param_start 

358 for sub_condition in condition.conditions: 

359 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param) 

360 if sub_clause: 

361 clauses.append(sub_clause) 

362 params.extend(sub_params) 

363 current_param += len(sub_params) 

364 return (f"({' OR '.join(clauses)})", params) if clauses else ("", []) 

365 

366 elif condition.operator == LogicOperator.NOT: 

367 sub_clause, sub_params = self._build_complex_condition(condition.conditions[0], param_start) 

368 params.extend(sub_params) 

369 return (f"NOT ({sub_clause})", params) if sub_clause else ("", []) 

370 

371 return ("", []) 

372 

373 def build_where_clause(self, query: Query | None, param_start: int = 1) -> tuple[str, list[Any]]: 

374 """Build just the WHERE clause from a Query object. 

375  

376 Args: 

377 query: The Query object (can be None) 

378 param_start: Starting parameter number for placeholders 

379  

380 Returns: 

381 Tuple of (WHERE clause SQL, parameters) 

382 Returns empty string and empty list if no filters 

383 """ 

384 if not query or not query.filters: 

385 return "", [] 

386 

387 where_clauses = [] 

388 params = [] 

389 param_count = param_start - 1 

390 

391 for filter_spec in query.filters: 

392 param_count += 1 

393 clause, new_params = self._build_filter_clause(filter_spec, param_count) 

394 where_clauses.append(clause) 

395 params.extend(new_params) 

396 param_count += len(new_params) - 1 # Adjust for multiple params 

397 

398 if where_clauses: 

399 return " AND " + " AND ".join(where_clauses), params 

400 return "", [] 

401 

402 def build_search_query(self, query: Query) -> tuple[str, list[Any]]: 

403 """Build a SELECT query from a Query object. 

404  

405 Args: 

406 query: The Query object 

407  

408 Returns: 

409 Tuple of (SQL query, parameters) 

410 """ 

411 sql_parts = [f"SELECT * FROM {self.qualified_table}"] 

412 params = [] 

413 param_count = 0 

414 

415 # Build WHERE clause 

416 where_clauses = [] 

417 for filter_spec in query.filters: 

418 param_count += 1 

419 clause, new_params = self._build_filter_clause(filter_spec, param_count) 

420 where_clauses.append(clause) 

421 params.extend(new_params) 

422 param_count += len(new_params) - 1 # Adjust for multiple params 

423 

424 if where_clauses: 

425 sql_parts.append("WHERE " + " AND ".join(where_clauses)) 

426 

427 # Add ORDER BY 

428 if query.sort_specs: 

429 order_parts = [] 

430 for sort_spec in query.sort_specs: 

431 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC" 

432 # Special handling for 'id' field - it's in a separate column 

433 if sort_spec.field == 'id': 

434 order_parts.append(f"id {direction}") 

435 # Use JSON extraction based on dialect 

436 elif self.dialect == "postgres": 

437 order_parts.append(f"data->'{sort_spec.field}' {direction}") 

438 elif self.dialect == "sqlite": 

439 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}") 

440 else: 

441 order_parts.append(f"data {direction}") # Fallback 

442 sql_parts.append("ORDER BY " + ", ".join(order_parts)) 

443 

444 # Add LIMIT and OFFSET 

445 if query.limit_value: 

446 sql_parts.append(f"LIMIT {query.limit_value}") 

447 if query.offset_value: 

448 sql_parts.append(f"OFFSET {query.offset_value}") 

449 

450 return " ".join(sql_parts), params 

451 

452 def build_batch_update_query(self, updates: list[tuple[str, Record]]) -> tuple[str, list[Any]]: 

453 """Build a batch UPDATE query using CASE expressions. 

454  

455 This provides efficient batch updates for both PostgreSQL and SQLite. 

456  

457 Args: 

458 updates: List of (id, record) tuples to update 

459  

460 Returns: 

461 Tuple of (SQL query, parameters) 

462 """ 

463 if not updates: 

464 return "", [] 

465 

466 update_ids = [] 

467 data_cases = [] 

468 metadata_cases = [] 

469 params = [] 

470 

471 # Build CASE expressions 

472 for i, (record_id, record) in enumerate(updates): 

473 update_ids.append(record_id) 

474 data_json = self._record_to_json(record) 

475 metadata_json = json.dumps(record.metadata) if record.metadata else None 

476 

477 # Generate placeholders for this update 

478 if self.param_style == "qmark": 

479 # SQLite uses ? placeholders and needs repeated IDs 

480 data_cases.append("WHEN id = ? THEN ?") 

481 metadata_cases.append("WHEN id = ? THEN ?") 

482 params.extend([record_id, data_json, record_id, metadata_json]) 

483 else: 

484 # PostgreSQL uses numbered/named placeholders 

485 param_idx = i * 3 + 1 

486 p1 = self._get_param_placeholder(param_idx) 

487 p2 = self._get_param_placeholder(param_idx + 1) 

488 p3 = self._get_param_placeholder(param_idx + 2) 

489 data_cases.append(f"WHEN id = {p1} THEN {p2}") 

490 metadata_cases.append(f"WHEN id = {p1} THEN {p3}") 

491 params.extend([record_id, data_json, metadata_json]) 

492 

493 # Build WHERE IN clause 

494 id_param_start = len(updates) * 3 + 1 if self.param_style != "qmark" else 0 

495 if self.param_style == "qmark": 

496 # SQLite: add IDs for WHERE IN clause 

497 id_placeholders = ["?" for _ in update_ids] 

498 else: 

499 # PostgreSQL with numbered/named placeholders 

500 id_placeholders = [self._get_param_placeholder(i) for i in range(id_param_start, id_param_start + len(update_ids))] 

501 params.extend(update_ids) 

502 

503 # Build the UPDATE query 

504 # Add ELSE to preserve original value when no CASE matches 

505 query = f""" 

506 UPDATE {self.qualified_table} 

507 SET  

508 data = CASE {' '.join(data_cases)} ELSE data END, 

509 metadata = CASE {' '.join(metadata_cases)} ELSE metadata END, 

510 updated_at = CURRENT_TIMESTAMP 

511 WHERE id IN ({', '.join(id_placeholders)}) 

512 """ 

513 

514 # PostgreSQL can use RETURNING 

515 if self.dialect == "postgres": 

516 query += " RETURNING id" 

517 

518 return query, params 

519 

520 def build_batch_create_query(self, records: list[Record]) -> tuple[str, list[Any], list[str]]: 

521 """Build a batch INSERT query for multiple records. 

522  

523 Generates efficient multi-value INSERT statements. 

524  

525 Args: 

526 records: List of records to insert 

527  

528 Returns: 

529 Tuple of (SQL query, parameters, generated IDs) 

530 """ 

531 if not records: 

532 return "", [], [] 

533 

534 import uuid 

535 

536 # Generate IDs and prepare values 

537 ids = [] 

538 values_clauses = [] 

539 params = [] 

540 

541 for i, record in enumerate(records): 

542 record_id = str(uuid.uuid4()) 

543 ids.append(record_id) 

544 data_json = self._record_to_json(record) 

545 metadata_json = json.dumps(record.metadata) if record.metadata else None 

546 

547 # Generate placeholders for this row 

548 param_idx = i * 3 + 1 

549 p1 = self._get_param_placeholder(param_idx) 

550 p2 = self._get_param_placeholder(param_idx + 1) 

551 p3 = self._get_param_placeholder(param_idx + 2) 

552 values_clauses.append(f"({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") 

553 params.extend([record_id, data_json, metadata_json]) 

554 

555 # Build the INSERT query 

556 query = f""" 

557 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at) 

558 VALUES {', '.join(values_clauses)} 

559 """ 

560 

561 # PostgreSQL can use RETURNING 

562 if self.dialect == "postgres": 

563 query += " RETURNING id" 

564 

565 return query, params, ids 

566 

567 def build_batch_delete_query(self, ids: list[str]) -> tuple[str, list[Any]]: 

568 """Build a batch DELETE query for multiple records. 

569  

570 Deletes multiple records in a single query. 

571  

572 Args: 

573 ids: List of record IDs to delete 

574  

575 Returns: 

576 Tuple of (SQL query, parameters) 

577 """ 

578 if not ids: 

579 return "", [] 

580 

581 # Generate placeholders for IDs 

582 placeholders = [self._get_param_placeholder(i) for i in range(1, len(ids) + 1)] 

583 

584 query = f""" 

585 DELETE FROM {self.qualified_table} 

586 WHERE id IN ({', '.join(placeholders)}) 

587 """ 

588 

589 # PostgreSQL can use RETURNING 

590 if self.dialect == "postgres": 

591 query += " RETURNING id" 

592 

593 return query, ids 

594 

595 def build_count_query(self, query: Query | None = None) -> tuple[str, list[Any]]: 

596 """Build a COUNT query. 

597  

598 Args: 

599 query: Optional Query object for filtering 

600  

601 Returns: 

602 Tuple of (SQL query, parameters) 

603 """ 

604 if query and query.filters: 

605 search_query, params = self.build_search_query(query) 

606 # Replace SELECT * with SELECT COUNT(*) 

607 count_query = search_query.replace("SELECT *", "SELECT COUNT(*)", 1) 

608 # Remove ORDER BY, LIMIT, OFFSET clauses 

609 for clause in ["ORDER BY", "LIMIT", "OFFSET"]: 

610 if clause in count_query: 

611 count_query = count_query[:count_query.index(clause)] 

612 return count_query.strip(), params 

613 else: 

614 return f"SELECT COUNT(*) FROM {self.qualified_table}", [] 

615 

616 def _build_filter_clause(self, filter_spec: Any, param_start: int) -> tuple[str, list[Any]]: 

617 """Build a WHERE clause for a filter. 

618  

619 Args: 

620 filter_spec: The filter specification 

621 param_start: Starting parameter number 

622  

623 Returns: 

624 Tuple of (SQL clause, parameters) 

625 """ 

626 field = filter_spec.field 

627 op = filter_spec.operator 

628 value = filter_spec.value 

629 

630 # Special handling for 'id' field - it's in a separate column 

631 if field == 'id': 

632 field_expr = 'id' 

633 param_placeholder = self._get_param_placeholder(param_start) 

634 # JSON field extraction with type casting for PostgreSQL 

635 elif self.dialect == "postgres": 

636 # For PostgreSQL, we need to cast JSONB text to appropriate types for comparisons 

637 base_field_expr = f"data->>'{field}'" 

638 

639 # Determine if we need type casting based on operator and value type 

640 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]: 

641 # These operators need numeric comparison 

642 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))): 

643 field_expr = f"({base_field_expr})::numeric" 

644 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)): 

645 field_expr = f"({base_field_expr})::timestamp" 

646 else: 

647 field_expr = base_field_expr 

648 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]: 

649 # For equality and IN operations, cast based on value type 

650 if isinstance(value, bool): 

651 field_expr = f"({base_field_expr})::boolean" 

652 elif isinstance(value, (int, float)): 

653 field_expr = f"({base_field_expr})::numeric" 

654 elif isinstance(value, list) and value and isinstance(value[0], (int, float)): 

655 # IN/NOT_IN with numeric values 

656 field_expr = f"({base_field_expr})::numeric" 

657 else: 

658 field_expr = base_field_expr 

659 else: 

660 field_expr = base_field_expr 

661 

662 param_placeholder = self._get_param_placeholder(param_start) 

663 elif self.dialect == "sqlite": 

664 field_expr = f"json_extract(data, '$.{field}')" 

665 param_placeholder = self._get_param_placeholder(param_start) 

666 else: 

667 field_expr = field 

668 param_placeholder = self._get_param_placeholder(param_start) 

669 

670 # Build clause based on operator 

671 if op == Operator.EQ: 

672 return f"{field_expr} = {param_placeholder}", [value] 

673 elif op == Operator.NEQ: 

674 return f"{field_expr} != {param_placeholder}", [value] 

675 elif op == Operator.GT: 

676 return f"{field_expr} > {param_placeholder}", [value] 

677 elif op == Operator.GTE: 

678 return f"{field_expr} >= {param_placeholder}", [value] 

679 elif op == Operator.LT: 

680 return f"{field_expr} < {param_placeholder}", [value] 

681 elif op == Operator.LTE: 

682 return f"{field_expr} <= {param_placeholder}", [value] 

683 elif op == Operator.LIKE: 

684 return f"{field_expr} LIKE {param_placeholder}", [value] 

685 elif op == Operator.NOT_LIKE: 

686 return f"{field_expr} NOT LIKE {param_placeholder}", [value] 

687 elif op == Operator.IN: 

688 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))]) 

689 return f"{field_expr} IN ({placeholders})", list(value) 

690 elif op == Operator.NOT_IN: 

691 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))]) 

692 return f"{field_expr} NOT IN ({placeholders})", list(value) 

693 elif op == Operator.BETWEEN: 

694 placeholder1 = self._get_param_placeholder(param_start) 

695 placeholder2 = self._get_param_placeholder(param_start + 1) 

696 return f"{field_expr} BETWEEN {placeholder1} AND {placeholder2}", list(value) 

697 elif op == Operator.NOT_BETWEEN: 

698 placeholder1 = self._get_param_placeholder(param_start) 

699 placeholder2 = self._get_param_placeholder(param_start + 1) 

700 return f"{field_expr} NOT BETWEEN {placeholder1} AND {placeholder2}", list(value) 

701 elif op == Operator.EXISTS: 

702 return f"{field_expr} IS NOT NULL", [] 

703 elif op == Operator.NOT_EXISTS: 

704 return f"{field_expr} IS NULL", [] 

705 elif op == Operator.REGEX: 

706 # PostgreSQL uses ~ for regex, SQLite would use REGEXP 

707 if self.dialect == "postgres": 

708 return f"{field_expr} ~ {param_placeholder}", [value] 

709 else: 

710 return f"{field_expr} REGEXP {param_placeholder}", [value] 

711 else: 

712 raise ValueError(f"Unsupported operator: {op}") 

713 

714 def _record_to_json(self, record: Record) -> str: 

715 """Convert a Record to JSON string for storage.""" 

716 return SQLRecordSerializer.record_to_json(record) 

717 

718 @staticmethod 

719 def row_to_record(row: dict[str, Any]) -> Record: 

720 """Convert a database row to a Record. 

721  

722 Args: 

723 row: Database row as dictionary 

724  

725 Returns: 

726 Record object 

727 """ 

728 return SQLRecordSerializer.row_to_record(row) 

729 

730 

731class SQLTableManager: 

732 """Manages SQL table creation and schema.""" 

733 

734 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard"): 

735 """Initialize the table manager. 

736  

737 Args: 

738 table_name: Name of the database table 

739 schema_name: Optional schema name 

740 dialect: SQL dialect ('postgres', 'sqlite', 'standard') 

741 """ 

742 self.table_name = table_name 

743 self.schema_name = schema_name 

744 self.dialect = dialect 

745 self.qualified_table = self._get_qualified_table_name() 

746 

747 def _get_qualified_table_name(self) -> str: 

748 """Get the fully qualified table name.""" 

749 if self.schema_name: 

750 return f"{self.schema_name}.{self.table_name}" 

751 return self.table_name 

752 

753 def get_create_table_sql(self) -> str: 

754 """Get the CREATE TABLE SQL statement. 

755  

756 Returns: 

757 SQL statement for creating the table 

758 """ 

759 if self.dialect == "postgres": 

760 return f""" 

761 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

762 id VARCHAR(255) PRIMARY KEY, 

763 data JSONB NOT NULL, 

764 metadata JSONB, 

765 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

766 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

767 ); 

768  

769 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data  

770 ON {self.qualified_table} USING GIN (data); 

771  

772 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_metadata 

773 ON {self.qualified_table} USING GIN (metadata); 

774 """ 

775 elif self.dialect == "sqlite": 

776 # SQLite doesn't have JSONB, uses TEXT for JSON storage 

777 return f""" 

778 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

779 id VARCHAR(255) PRIMARY KEY, 

780 data TEXT NOT NULL CHECK (json_valid(data)), 

781 metadata TEXT CHECK (metadata IS NULL OR json_valid(metadata)), 

782 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

783 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

784 ); 

785  

786 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created 

787 ON {self.qualified_table} (created_at); 

788  

789 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated 

790 ON {self.qualified_table} (updated_at); 

791 """ 

792 else: 

793 # Generic SQL 

794 return f""" 

795 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

796 id VARCHAR(255) PRIMARY KEY, 

797 data TEXT NOT NULL, 

798 metadata TEXT, 

799 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

800 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

801 ); 

802 """ 

803 

804 def get_drop_table_sql(self) -> str: 

805 """Get the DROP TABLE SQL statement. 

806  

807 Returns: 

808 SQL statement for dropping the table 

809 """ 

810 return f"DROP TABLE IF EXISTS {self.qualified_table}"