Coverage for src/dataknobs_data/backends/sql_base.py: 11%

367 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-31 15:06 -0600

1"""Base SQL functionality shared between SQL database backends.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import uuid 

7from datetime import datetime 

8from typing import Any, TYPE_CHECKING 

9 

10from ..query import Operator, Query, SortOrder 

11from ..records import Record 

12 

13if TYPE_CHECKING: 

14 from ..query_logic import ComplexQuery 

15 

16 

17class SQLRecordSerializer: 

18 """Mixin for SQL record serialization/deserialization with vector support.""" 

19 

20 @staticmethod 

21 def record_to_json(record: Record) -> str: 

22 """Convert a Record to JSON string for storage. 

23  

24 Handles VectorField serialization to preserve metadata. 

25 """ 

26 from ..fields import VectorField 

27 

28 data = {} 

29 for field_name, field_obj in record.fields.items(): 

30 # Handle VectorField - preserve full metadata 

31 if isinstance(field_obj, VectorField): 

32 data[field_name] = field_obj.to_dict() 

33 # Handle other special fields that have to_list 

34 elif hasattr(field_obj, 'to_list') and callable(field_obj.to_list): 

35 data[field_name] = field_obj.to_list() 

36 else: 

37 data[field_name] = field_obj.value 

38 return json.dumps(data) 

39 

40 @staticmethod 

41 def get_vector_extraction_sql(field_name: str, dialect: str = "postgres") -> str: 

42 """Get SQL expression to extract vector from JSON field. 

43  

44 Handles both raw arrays and VectorField dict formats. 

45  

46 Args: 

47 field_name: Name of the vector field 

48 dialect: SQL dialect (postgres, sqlite, etc.) 

49  

50 Returns: 

51 SQL expression to extract vector value 

52 """ 

53 if dialect == "postgres": 

54 # PostgreSQL: Handle both formats - raw array or VectorField dict 

55 return f"""CASE  

56 WHEN jsonb_typeof(data->'{field_name}') = 'object'  

57 THEN (data->'{field_name}'->>'value')::vector 

58 ELSE (data->>'{field_name}')::vector 

59 END""" 

60 elif dialect == "sqlite": 

61 # SQLite doesn't have native vector type, return JSON string 

62 return f"""CASE  

63 WHEN json_type(json_extract(data, '$.{field_name}')) = 'object' 

64 THEN json_extract(data, '$.{field_name}.value') 

65 ELSE json_extract(data, '$.{field_name}') 

66 END""" 

67 else: 

68 # Generic fallback 

69 return f"data->'{field_name}'" 

70 

71 @staticmethod 

72 def json_to_record(data_json: str, metadata_json: str | None = None) -> Record: 

73 """Convert JSON strings to a Record. 

74  

75 Reconstructs VectorField objects from serialized format. 

76 """ 

77 from ..fields import Field, VectorField 

78 

79 data = json.loads(data_json) if data_json else {} 

80 metadata = json.loads(metadata_json) if metadata_json and metadata_json != 'null' else {} 

81 

82 # Reconstruct fields properly, especially VectorFields 

83 fields = {} 

84 for field_name, field_value in data.items(): 

85 # Check if this is a serialized VectorField 

86 if isinstance(field_value, dict) and field_value.get("type") == "vector": 

87 # Ensure the field has a 'name' key for from_dict (in case it's missing) 

88 if "name" not in field_value: 

89 field_value["name"] = field_name 

90 # Reconstruct VectorField from dict 

91 fields[field_name] = VectorField.from_dict(field_value) 

92 else: 

93 # Regular field 

94 fields[field_name] = Field(name=field_name, value=field_value) 

95 

96 # Create Record with properly typed fields 

97 record = Record(metadata=metadata) 

98 record.fields.update(fields) 

99 return record 

100 

101 @staticmethod 

102 def row_to_record(row: dict[str, Any]) -> Record: 

103 """Convert a database row to a Record. 

104  

105 Args: 

106 row: Database row as dictionary with 'id', 'data' and optional 'metadata' fields 

107  

108 Returns: 

109 Reconstructed Record object with ID set 

110 """ 

111 data_json = row.get("data", {}) 

112 if not isinstance(data_json, str): 

113 data_json = json.dumps(data_json) 

114 

115 metadata_json = row.get("metadata") 

116 if metadata_json and not isinstance(metadata_json, str): 

117 metadata_json = json.dumps(metadata_json) 

118 

119 record = SQLRecordSerializer.json_to_record(data_json, metadata_json) 

120 

121 # Ensure the record has its ID set from the row 

122 from ..database_utils import ensure_record_id 

123 if "id" in row: 

124 record = ensure_record_id(record, row["id"]) 

125 

126 return record 

127 

128 

129class SQLQueryBuilder: 

130 """Builds SQL queries from Query objects.""" 

131 

132 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard", param_style: str = "numeric"): 

133 """Initialize the SQL query builder. 

134  

135 Args: 

136 table_name: Name of the database table 

137 schema_name: Optional schema name 

138 dialect: SQL dialect ('postgres', 'sqlite', 'standard') 

139 param_style: Parameter style ('numeric' for $1, 'qmark' for ?, 'pyformat' for %(name)s) 

140 """ 

141 self.table_name = table_name 

142 self.schema_name = schema_name 

143 self.dialect = dialect 

144 self.param_style = param_style 

145 self.qualified_table = self._get_qualified_table_name() 

146 

147 def _get_qualified_table_name(self) -> str: 

148 """Get the fully qualified table name.""" 

149 if self.schema_name: 

150 return f"{self.schema_name}.{self.table_name}" 

151 return self.table_name 

152 

153 def _get_param_placeholder(self, param_num: int, param_name: str | None = None) -> str: 

154 """Get the appropriate parameter placeholder based on param_style. 

155  

156 Args: 

157 param_num: Parameter number (1-based) 

158 param_name: Optional parameter name for pyformat style 

159  

160 Returns: 

161 Parameter placeholder string 

162 """ 

163 if self.param_style == "numeric": 

164 return f"${param_num}" 

165 elif self.param_style == "qmark": 

166 return "?" 

167 elif self.param_style == "pyformat": 

168 name = param_name or f"p{param_num - 1}" # 0-based for pyformat 

169 return f"%({name})s" 

170 else: 

171 # Default to numeric for postgres dialect, qmark for others 

172 if self.dialect == "postgres": 

173 return f"${param_num}" 

174 else: 

175 return "?" 

176 

177 def build_create_query(self, record: Record, record_id: str | None = None) -> tuple[str, list[Any]]: 

178 """Build an INSERT query for creating a record. 

179  

180 Args: 

181 record: The record to insert 

182 record_id: Optional ID (will generate if not provided) 

183  

184 Returns: 

185 Tuple of (SQL query, parameters) 

186 """ 

187 record_id = record_id or str(uuid.uuid4()) 

188 data = SQLRecordSerializer.record_to_json(record) 

189 metadata = json.dumps(record.metadata) if record.metadata else None 

190 

191 p1 = self._get_param_placeholder(1) 

192 p2 = self._get_param_placeholder(2) 

193 p3 = self._get_param_placeholder(3) 

194 

195 query = f""" 

196 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at) 

197 VALUES ({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) 

198 """ 

199 if self.dialect == "postgres": 

200 query += " RETURNING id" 

201 

202 params = [record_id, data, metadata] 

203 

204 return query, params 

205 

206 def build_read_query(self, record_id: str) -> tuple[str, list[Any]]: 

207 """Build a SELECT query for reading a record by ID. 

208  

209 Args: 

210 record_id: The record ID 

211  

212 Returns: 

213 Tuple of (SQL query, parameters) 

214 """ 

215 p1 = self._get_param_placeholder(1) 

216 query = f"SELECT * FROM {self.qualified_table} WHERE id = {p1}" 

217 

218 return query, [record_id] 

219 

220 def build_update_query(self, record_id: str, record: Record) -> tuple[str, list[Any]]: 

221 """Build an UPDATE query for updating a record. 

222  

223 Args: 

224 record_id: The record ID 

225 record: The updated record 

226  

227 Returns: 

228 Tuple of (SQL query, parameters) 

229 """ 

230 data = self._record_to_json(record) 

231 metadata = json.dumps(record.metadata) if record.metadata else None 

232 

233 if self.param_style == "qmark": 

234 # SQLite: data, metadata, then id 

235 query = f""" 

236 UPDATE {self.qualified_table} 

237 SET data = ?, metadata = ?, updated_at = CURRENT_TIMESTAMP 

238 WHERE id = ? 

239 """ 

240 params = [data, metadata, record_id] 

241 else: 

242 # PostgreSQL: id first, then data, metadata 

243 p1 = self._get_param_placeholder(1) 

244 p2 = self._get_param_placeholder(2) 

245 p3 = self._get_param_placeholder(3) 

246 query = f""" 

247 UPDATE {self.qualified_table} 

248 SET data = {p2}, metadata = {p3}, updated_at = CURRENT_TIMESTAMP 

249 WHERE id = {p1} 

250 """ 

251 params = [record_id, data, metadata] 

252 

253 return query, params 

254 

255 def build_delete_query(self, record_id: str) -> tuple[str, list[Any]]: 

256 """Build a DELETE query for deleting a record. 

257  

258 Args: 

259 record_id: The record ID 

260  

261 Returns: 

262 Tuple of (SQL query, parameters) 

263 """ 

264 p1 = self._get_param_placeholder(1) 

265 query = f"DELETE FROM {self.qualified_table} WHERE id = {p1}" 

266 

267 return query, [record_id] 

268 

269 def build_exists_query(self, record_id: str) -> tuple[str, list[Any]]: 

270 """Build a query to check if a record exists. 

271  

272 Args: 

273 record_id: The record ID 

274  

275 Returns: 

276 Tuple of (SQL query, parameters) 

277 """ 

278 p1 = self._get_param_placeholder(1) 

279 query = f"SELECT 1 FROM {self.qualified_table} WHERE id = {p1} LIMIT 1" 

280 

281 return query, [record_id] 

282 

283 def build_complex_search_query(self, query: ComplexQuery) -> tuple[str, list[Any]]: 

284 """Build a SELECT query from a ComplexQuery object with boolean logic. 

285  

286 Args: 

287 query: The ComplexQuery object 

288  

289 Returns: 

290 Tuple of (SQL query, parameters) 

291 """ 

292 sql_parts = [f"SELECT * FROM {self.qualified_table}"] 

293 params = [] 

294 

295 # Build WHERE clause from complex conditions 

296 if query.condition: 

297 where_clause, where_params = self._build_complex_condition(query.condition, 1) 

298 if where_clause: 

299 sql_parts.append(f"WHERE {where_clause}") 

300 params.extend(where_params) 

301 

302 # Add ORDER BY 

303 if query.sort_specs: 

304 order_parts = [] 

305 for sort_spec in query.sort_specs: 

306 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC" 

307 if self.dialect == "postgres": 

308 order_parts.append(f"data->'{sort_spec.field}' {direction}") 

309 elif self.dialect == "sqlite": 

310 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}") 

311 else: 

312 order_parts.append(f"data {direction}") 

313 sql_parts.append("ORDER BY " + ", ".join(order_parts)) 

314 

315 # Add LIMIT and OFFSET 

316 if query.limit_value: 

317 sql_parts.append(f"LIMIT {query.limit_value}") 

318 if query.offset_value: 

319 sql_parts.append(f"OFFSET {query.offset_value}") 

320 

321 return " ".join(sql_parts), params 

322 

323 def _build_complex_condition(self, condition: Any, param_start: int) -> tuple[str, list[Any]]: 

324 """Build WHERE clause for complex boolean logic conditions. 

325  

326 Args: 

327 condition: The Condition object (LogicCondition or FilterCondition) 

328 param_start: Starting parameter number 

329  

330 Returns: 

331 Tuple of (SQL clause, parameters) 

332 """ 

333 from ..query_logic import FilterCondition, LogicCondition, LogicOperator 

334 

335 params = [] 

336 

337 # Handle FilterCondition (leaf node) 

338 if isinstance(condition, FilterCondition): 

339 clause, filter_params = self._build_filter_clause(condition.filter, param_start) 

340 return clause, filter_params 

341 

342 # Handle LogicCondition (branch node) 

343 elif isinstance(condition, LogicCondition): 

344 if condition.operator == LogicOperator.AND: 

345 clauses = [] 

346 current_param = param_start 

347 for sub_condition in condition.conditions: 

348 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param) 

349 if sub_clause: 

350 clauses.append(sub_clause) 

351 params.extend(sub_params) 

352 current_param += len(sub_params) 

353 return (f"({' AND '.join(clauses)})", params) if clauses else ("", []) 

354 

355 elif condition.operator == LogicOperator.OR: 

356 clauses = [] 

357 current_param = param_start 

358 for sub_condition in condition.conditions: 

359 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param) 

360 if sub_clause: 

361 clauses.append(sub_clause) 

362 params.extend(sub_params) 

363 current_param += len(sub_params) 

364 return (f"({' OR '.join(clauses)})", params) if clauses else ("", []) 

365 

366 elif condition.operator == LogicOperator.NOT: 

367 sub_clause, sub_params = self._build_complex_condition(condition.conditions[0], param_start) 

368 params.extend(sub_params) 

369 return (f"NOT ({sub_clause})", params) if sub_clause else ("", []) 

370 

371 return ("", []) 

372 

373 def build_where_clause(self, query: Query | None, param_start: int = 1) -> tuple[str, list[Any]]: 

374 """Build just the WHERE clause from a Query object. 

375  

376 Args: 

377 query: The Query object (can be None) 

378 param_start: Starting parameter number for placeholders 

379  

380 Returns: 

381 Tuple of (WHERE clause SQL, parameters) 

382 Returns empty string and empty list if no filters 

383 """ 

384 if not query or not query.filters: 

385 return "", [] 

386 

387 where_clauses = [] 

388 params = [] 

389 param_count = param_start - 1 

390 

391 for filter_spec in query.filters: 

392 param_count += 1 

393 clause, new_params = self._build_filter_clause(filter_spec, param_count) 

394 where_clauses.append(clause) 

395 params.extend(new_params) 

396 param_count += len(new_params) - 1 # Adjust for multiple params 

397 

398 if where_clauses: 

399 return " AND " + " AND ".join(where_clauses), params 

400 return "", [] 

401 

402 def build_search_query(self, query: Query) -> tuple[str, list[Any]]: 

403 """Build a SELECT query from a Query object. 

404  

405 Args: 

406 query: The Query object 

407  

408 Returns: 

409 Tuple of (SQL query, parameters) 

410 """ 

411 sql_parts = [f"SELECT * FROM {self.qualified_table}"] 

412 params = [] 

413 param_count = 0 

414 

415 # Build WHERE clause 

416 where_clauses = [] 

417 for filter_spec in query.filters: 

418 param_count += 1 

419 clause, new_params = self._build_filter_clause(filter_spec, param_count) 

420 where_clauses.append(clause) 

421 params.extend(new_params) 

422 param_count += len(new_params) - 1 # Adjust for multiple params 

423 

424 if where_clauses: 

425 sql_parts.append("WHERE " + " AND ".join(where_clauses)) 

426 

427 # Add ORDER BY 

428 if query.sort_specs: 

429 order_parts = [] 

430 for sort_spec in query.sort_specs: 

431 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC" 

432 # Use JSON extraction based on dialect 

433 if self.dialect == "postgres": 

434 order_parts.append(f"data->'{sort_spec.field}' {direction}") 

435 elif self.dialect == "sqlite": 

436 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}") 

437 else: 

438 order_parts.append(f"data {direction}") # Fallback 

439 sql_parts.append("ORDER BY " + ", ".join(order_parts)) 

440 

441 # Add LIMIT and OFFSET 

442 if query.limit_value: 

443 sql_parts.append(f"LIMIT {query.limit_value}") 

444 if query.offset_value: 

445 sql_parts.append(f"OFFSET {query.offset_value}") 

446 

447 return " ".join(sql_parts), params 

448 

449 def build_batch_update_query(self, updates: list[tuple[str, Record]]) -> tuple[str, list[Any]]: 

450 """Build a batch UPDATE query using CASE expressions. 

451  

452 This provides efficient batch updates for both PostgreSQL and SQLite. 

453  

454 Args: 

455 updates: List of (id, record) tuples to update 

456  

457 Returns: 

458 Tuple of (SQL query, parameters) 

459 """ 

460 if not updates: 

461 return "", [] 

462 

463 update_ids = [] 

464 data_cases = [] 

465 metadata_cases = [] 

466 params = [] 

467 

468 # Build CASE expressions 

469 for i, (record_id, record) in enumerate(updates): 

470 update_ids.append(record_id) 

471 data_json = self._record_to_json(record) 

472 metadata_json = json.dumps(record.metadata) if record.metadata else None 

473 

474 # Generate placeholders for this update 

475 if self.param_style == "qmark": 

476 # SQLite uses ? placeholders and needs repeated IDs 

477 data_cases.append("WHEN id = ? THEN ?") 

478 metadata_cases.append("WHEN id = ? THEN ?") 

479 params.extend([record_id, data_json, record_id, metadata_json]) 

480 else: 

481 # PostgreSQL uses numbered/named placeholders 

482 param_idx = i * 3 + 1 

483 p1 = self._get_param_placeholder(param_idx) 

484 p2 = self._get_param_placeholder(param_idx + 1) 

485 p3 = self._get_param_placeholder(param_idx + 2) 

486 data_cases.append(f"WHEN id = {p1} THEN {p2}") 

487 metadata_cases.append(f"WHEN id = {p1} THEN {p3}") 

488 params.extend([record_id, data_json, metadata_json]) 

489 

490 # Build WHERE IN clause 

491 id_param_start = len(updates) * 3 + 1 if self.param_style != "qmark" else 0 

492 if self.param_style == "qmark": 

493 # SQLite: add IDs for WHERE IN clause 

494 id_placeholders = ["?" for _ in update_ids] 

495 else: 

496 # PostgreSQL with numbered/named placeholders 

497 id_placeholders = [self._get_param_placeholder(i) for i in range(id_param_start, id_param_start + len(update_ids))] 

498 params.extend(update_ids) 

499 

500 # Build the UPDATE query 

501 # Add ELSE to preserve original value when no CASE matches 

502 query = f""" 

503 UPDATE {self.qualified_table} 

504 SET  

505 data = CASE {' '.join(data_cases)} ELSE data END, 

506 metadata = CASE {' '.join(metadata_cases)} ELSE metadata END, 

507 updated_at = CURRENT_TIMESTAMP 

508 WHERE id IN ({', '.join(id_placeholders)}) 

509 """ 

510 

511 # PostgreSQL can use RETURNING 

512 if self.dialect == "postgres": 

513 query += " RETURNING id" 

514 

515 return query, params 

516 

517 def build_batch_create_query(self, records: list[Record]) -> tuple[str, list[Any], list[str]]: 

518 """Build a batch INSERT query for multiple records. 

519  

520 Generates efficient multi-value INSERT statements. 

521  

522 Args: 

523 records: List of records to insert 

524  

525 Returns: 

526 Tuple of (SQL query, parameters, generated IDs) 

527 """ 

528 if not records: 

529 return "", [], [] 

530 

531 import uuid 

532 

533 # Generate IDs and prepare values 

534 ids = [] 

535 values_clauses = [] 

536 params = [] 

537 

538 for i, record in enumerate(records): 

539 record_id = str(uuid.uuid4()) 

540 ids.append(record_id) 

541 data_json = self._record_to_json(record) 

542 metadata_json = json.dumps(record.metadata) if record.metadata else None 

543 

544 # Generate placeholders for this row 

545 param_idx = i * 3 + 1 

546 p1 = self._get_param_placeholder(param_idx) 

547 p2 = self._get_param_placeholder(param_idx + 1) 

548 p3 = self._get_param_placeholder(param_idx + 2) 

549 values_clauses.append(f"({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") 

550 params.extend([record_id, data_json, metadata_json]) 

551 

552 # Build the INSERT query 

553 query = f""" 

554 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at) 

555 VALUES {', '.join(values_clauses)} 

556 """ 

557 

558 # PostgreSQL can use RETURNING 

559 if self.dialect == "postgres": 

560 query += " RETURNING id" 

561 

562 return query, params, ids 

563 

564 def build_batch_delete_query(self, ids: list[str]) -> tuple[str, list[Any]]: 

565 """Build a batch DELETE query for multiple records. 

566  

567 Deletes multiple records in a single query. 

568  

569 Args: 

570 ids: List of record IDs to delete 

571  

572 Returns: 

573 Tuple of (SQL query, parameters) 

574 """ 

575 if not ids: 

576 return "", [] 

577 

578 # Generate placeholders for IDs 

579 placeholders = [self._get_param_placeholder(i) for i in range(1, len(ids) + 1)] 

580 

581 query = f""" 

582 DELETE FROM {self.qualified_table} 

583 WHERE id IN ({', '.join(placeholders)}) 

584 """ 

585 

586 # PostgreSQL can use RETURNING 

587 if self.dialect == "postgres": 

588 query += " RETURNING id" 

589 

590 return query, ids 

591 

592 def build_count_query(self, query: Query | None = None) -> tuple[str, list[Any]]: 

593 """Build a COUNT query. 

594  

595 Args: 

596 query: Optional Query object for filtering 

597  

598 Returns: 

599 Tuple of (SQL query, parameters) 

600 """ 

601 if query and query.filters: 

602 search_query, params = self.build_search_query(query) 

603 # Replace SELECT * with SELECT COUNT(*) 

604 count_query = search_query.replace("SELECT *", "SELECT COUNT(*)", 1) 

605 # Remove ORDER BY, LIMIT, OFFSET clauses 

606 for clause in ["ORDER BY", "LIMIT", "OFFSET"]: 

607 if clause in count_query: 

608 count_query = count_query[:count_query.index(clause)] 

609 return count_query.strip(), params 

610 else: 

611 return f"SELECT COUNT(*) FROM {self.qualified_table}", [] 

612 

613 def _build_filter_clause(self, filter_spec: Any, param_start: int) -> tuple[str, list[Any]]: 

614 """Build a WHERE clause for a filter. 

615  

616 Args: 

617 filter_spec: The filter specification 

618 param_start: Starting parameter number 

619  

620 Returns: 

621 Tuple of (SQL clause, parameters) 

622 """ 

623 field = filter_spec.field 

624 op = filter_spec.operator 

625 value = filter_spec.value 

626 

627 # JSON field extraction with type casting for PostgreSQL 

628 if self.dialect == "postgres": 

629 # For PostgreSQL, we need to cast JSONB text to appropriate types for comparisons 

630 base_field_expr = f"data->>'{field}'" 

631 

632 # Determine if we need type casting based on operator and value type 

633 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]: 

634 # These operators need numeric comparison 

635 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))): 

636 field_expr = f"({base_field_expr})::numeric" 

637 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)): 

638 field_expr = f"({base_field_expr})::timestamp" 

639 else: 

640 field_expr = base_field_expr 

641 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]: 

642 # For equality and IN operations, cast based on value type 

643 if isinstance(value, bool): 

644 field_expr = f"({base_field_expr})::boolean" 

645 elif isinstance(value, (int, float)): 

646 field_expr = f"({base_field_expr})::numeric" 

647 elif isinstance(value, list) and value and isinstance(value[0], (int, float)): 

648 # IN/NOT_IN with numeric values 

649 field_expr = f"({base_field_expr})::numeric" 

650 else: 

651 field_expr = base_field_expr 

652 else: 

653 field_expr = base_field_expr 

654 

655 param_placeholder = self._get_param_placeholder(param_start) 

656 elif self.dialect == "sqlite": 

657 field_expr = f"json_extract(data, '$.{field}')" 

658 param_placeholder = self._get_param_placeholder(param_start) 

659 else: 

660 field_expr = field 

661 param_placeholder = self._get_param_placeholder(param_start) 

662 

663 # Build clause based on operator 

664 if op == Operator.EQ: 

665 return f"{field_expr} = {param_placeholder}", [value] 

666 elif op == Operator.NEQ: 

667 return f"{field_expr} != {param_placeholder}", [value] 

668 elif op == Operator.GT: 

669 return f"{field_expr} > {param_placeholder}", [value] 

670 elif op == Operator.GTE: 

671 return f"{field_expr} >= {param_placeholder}", [value] 

672 elif op == Operator.LT: 

673 return f"{field_expr} < {param_placeholder}", [value] 

674 elif op == Operator.LTE: 

675 return f"{field_expr} <= {param_placeholder}", [value] 

676 elif op == Operator.LIKE: 

677 return f"{field_expr} LIKE {param_placeholder}", [value] 

678 elif op == Operator.IN: 

679 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))]) 

680 return f"{field_expr} IN ({placeholders})", list(value) 

681 elif op == Operator.NOT_IN: 

682 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))]) 

683 return f"{field_expr} NOT IN ({placeholders})", list(value) 

684 elif op == Operator.BETWEEN: 

685 placeholder1 = self._get_param_placeholder(param_start) 

686 placeholder2 = self._get_param_placeholder(param_start + 1) 

687 return f"{field_expr} BETWEEN {placeholder1} AND {placeholder2}", list(value) 

688 elif op == Operator.NOT_BETWEEN: 

689 placeholder1 = self._get_param_placeholder(param_start) 

690 placeholder2 = self._get_param_placeholder(param_start + 1) 

691 return f"{field_expr} NOT BETWEEN {placeholder1} AND {placeholder2}", list(value) 

692 elif op == Operator.EXISTS: 

693 return f"{field_expr} IS NOT NULL", [] 

694 elif op == Operator.NOT_EXISTS: 

695 return f"{field_expr} IS NULL", [] 

696 elif op == Operator.REGEX: 

697 # PostgreSQL uses ~ for regex, SQLite would use REGEXP 

698 if self.dialect == "postgres": 

699 return f"{field_expr} ~ {param_placeholder}", [value] 

700 else: 

701 return f"{field_expr} REGEXP {param_placeholder}", [value] 

702 else: 

703 raise ValueError(f"Unsupported operator: {op}") 

704 

705 def _record_to_json(self, record: Record) -> str: 

706 """Convert a Record to JSON string for storage.""" 

707 return SQLRecordSerializer.record_to_json(record) 

708 

709 @staticmethod 

710 def row_to_record(row: dict[str, Any]) -> Record: 

711 """Convert a database row to a Record. 

712  

713 Args: 

714 row: Database row as dictionary 

715  

716 Returns: 

717 Record object 

718 """ 

719 return SQLRecordSerializer.row_to_record(row) 

720 

721 

722class SQLTableManager: 

723 """Manages SQL table creation and schema.""" 

724 

725 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard"): 

726 """Initialize the table manager. 

727  

728 Args: 

729 table_name: Name of the database table 

730 schema_name: Optional schema name 

731 dialect: SQL dialect ('postgres', 'sqlite', 'standard') 

732 """ 

733 self.table_name = table_name 

734 self.schema_name = schema_name 

735 self.dialect = dialect 

736 self.qualified_table = self._get_qualified_table_name() 

737 

738 def _get_qualified_table_name(self) -> str: 

739 """Get the fully qualified table name.""" 

740 if self.schema_name: 

741 return f"{self.schema_name}.{self.table_name}" 

742 return self.table_name 

743 

744 def get_create_table_sql(self) -> str: 

745 """Get the CREATE TABLE SQL statement. 

746  

747 Returns: 

748 SQL statement for creating the table 

749 """ 

750 if self.dialect == "postgres": 

751 return f""" 

752 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

753 id VARCHAR(255) PRIMARY KEY, 

754 data JSONB NOT NULL, 

755 metadata JSONB, 

756 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

757 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

758 ); 

759  

760 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data  

761 ON {self.qualified_table} USING GIN (data); 

762  

763 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_metadata 

764 ON {self.qualified_table} USING GIN (metadata); 

765 """ 

766 elif self.dialect == "sqlite": 

767 # SQLite doesn't have JSONB, uses TEXT for JSON storage 

768 return f""" 

769 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

770 id VARCHAR(255) PRIMARY KEY, 

771 data TEXT NOT NULL CHECK (json_valid(data)), 

772 metadata TEXT CHECK (metadata IS NULL OR json_valid(metadata)), 

773 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

774 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

775 ); 

776  

777 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created 

778 ON {self.qualified_table} (created_at); 

779  

780 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated 

781 ON {self.qualified_table} (updated_at); 

782 """ 

783 else: 

784 # Generic SQL 

785 return f""" 

786 CREATE TABLE IF NOT EXISTS {self.qualified_table} ( 

787 id VARCHAR(255) PRIMARY KEY, 

788 data TEXT NOT NULL, 

789 metadata TEXT, 

790 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

791 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

792 ); 

793 """ 

794 

795 def get_drop_table_sql(self) -> str: 

796 """Get the DROP TABLE SQL statement. 

797  

798 Returns: 

799 SQL statement for dropping the table 

800 """ 

801 return f"DROP TABLE IF EXISTS {self.qualified_table}"