Coverage for src/dataknobs_data/backends/sql_base.py: 11%
367 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1"""Base SQL functionality shared between SQL database backends."""
3from __future__ import annotations
5import json
6import uuid
7from datetime import datetime
8from typing import Any, TYPE_CHECKING
10from ..query import Operator, Query, SortOrder
11from ..records import Record
13if TYPE_CHECKING:
14 from ..query_logic import ComplexQuery
17class SQLRecordSerializer:
18 """Mixin for SQL record serialization/deserialization with vector support."""
20 @staticmethod
21 def record_to_json(record: Record) -> str:
22 """Convert a Record to JSON string for storage.
24 Handles VectorField serialization to preserve metadata.
25 """
26 from ..fields import VectorField
28 data = {}
29 for field_name, field_obj in record.fields.items():
30 # Handle VectorField - preserve full metadata
31 if isinstance(field_obj, VectorField):
32 data[field_name] = field_obj.to_dict()
33 # Handle other special fields that have to_list
34 elif hasattr(field_obj, 'to_list') and callable(field_obj.to_list):
35 data[field_name] = field_obj.to_list()
36 else:
37 data[field_name] = field_obj.value
38 return json.dumps(data)
40 @staticmethod
41 def get_vector_extraction_sql(field_name: str, dialect: str = "postgres") -> str:
42 """Get SQL expression to extract vector from JSON field.
44 Handles both raw arrays and VectorField dict formats.
46 Args:
47 field_name: Name of the vector field
48 dialect: SQL dialect (postgres, sqlite, etc.)
50 Returns:
51 SQL expression to extract vector value
52 """
53 if dialect == "postgres":
54 # PostgreSQL: Handle both formats - raw array or VectorField dict
55 return f"""CASE
56 WHEN jsonb_typeof(data->'{field_name}') = 'object'
57 THEN (data->'{field_name}'->>'value')::vector
58 ELSE (data->>'{field_name}')::vector
59 END"""
60 elif dialect == "sqlite":
61 # SQLite doesn't have native vector type, return JSON string
62 return f"""CASE
63 WHEN json_type(json_extract(data, '$.{field_name}')) = 'object'
64 THEN json_extract(data, '$.{field_name}.value')
65 ELSE json_extract(data, '$.{field_name}')
66 END"""
67 else:
68 # Generic fallback
69 return f"data->'{field_name}'"
71 @staticmethod
72 def json_to_record(data_json: str, metadata_json: str | None = None) -> Record:
73 """Convert JSON strings to a Record.
75 Reconstructs VectorField objects from serialized format.
76 """
77 from ..fields import Field, VectorField
79 data = json.loads(data_json) if data_json else {}
80 metadata = json.loads(metadata_json) if metadata_json and metadata_json != 'null' else {}
82 # Reconstruct fields properly, especially VectorFields
83 fields = {}
84 for field_name, field_value in data.items():
85 # Check if this is a serialized VectorField
86 if isinstance(field_value, dict) and field_value.get("type") == "vector":
87 # Ensure the field has a 'name' key for from_dict (in case it's missing)
88 if "name" not in field_value:
89 field_value["name"] = field_name
90 # Reconstruct VectorField from dict
91 fields[field_name] = VectorField.from_dict(field_value)
92 else:
93 # Regular field
94 fields[field_name] = Field(name=field_name, value=field_value)
96 # Create Record with properly typed fields
97 record = Record(metadata=metadata)
98 record.fields.update(fields)
99 return record
101 @staticmethod
102 def row_to_record(row: dict[str, Any]) -> Record:
103 """Convert a database row to a Record.
105 Args:
106 row: Database row as dictionary with 'id', 'data' and optional 'metadata' fields
108 Returns:
109 Reconstructed Record object with ID set
110 """
111 data_json = row.get("data", {})
112 if not isinstance(data_json, str):
113 data_json = json.dumps(data_json)
115 metadata_json = row.get("metadata")
116 if metadata_json and not isinstance(metadata_json, str):
117 metadata_json = json.dumps(metadata_json)
119 record = SQLRecordSerializer.json_to_record(data_json, metadata_json)
121 # Ensure the record has its ID set from the row
122 from ..database_utils import ensure_record_id
123 if "id" in row:
124 record = ensure_record_id(record, row["id"])
126 return record
129class SQLQueryBuilder:
130 """Builds SQL queries from Query objects."""
132 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard", param_style: str = "numeric"):
133 """Initialize the SQL query builder.
135 Args:
136 table_name: Name of the database table
137 schema_name: Optional schema name
138 dialect: SQL dialect ('postgres', 'sqlite', 'standard')
139 param_style: Parameter style ('numeric' for $1, 'qmark' for ?, 'pyformat' for %(name)s)
140 """
141 self.table_name = table_name
142 self.schema_name = schema_name
143 self.dialect = dialect
144 self.param_style = param_style
145 self.qualified_table = self._get_qualified_table_name()
147 def _get_qualified_table_name(self) -> str:
148 """Get the fully qualified table name."""
149 if self.schema_name:
150 return f"{self.schema_name}.{self.table_name}"
151 return self.table_name
153 def _get_param_placeholder(self, param_num: int, param_name: str | None = None) -> str:
154 """Get the appropriate parameter placeholder based on param_style.
156 Args:
157 param_num: Parameter number (1-based)
158 param_name: Optional parameter name for pyformat style
160 Returns:
161 Parameter placeholder string
162 """
163 if self.param_style == "numeric":
164 return f"${param_num}"
165 elif self.param_style == "qmark":
166 return "?"
167 elif self.param_style == "pyformat":
168 name = param_name or f"p{param_num - 1}" # 0-based for pyformat
169 return f"%({name})s"
170 else:
171 # Default to numeric for postgres dialect, qmark for others
172 if self.dialect == "postgres":
173 return f"${param_num}"
174 else:
175 return "?"
177 def build_create_query(self, record: Record, record_id: str | None = None) -> tuple[str, list[Any]]:
178 """Build an INSERT query for creating a record.
180 Args:
181 record: The record to insert
182 record_id: Optional ID (will generate if not provided)
184 Returns:
185 Tuple of (SQL query, parameters)
186 """
187 record_id = record_id or str(uuid.uuid4())
188 data = SQLRecordSerializer.record_to_json(record)
189 metadata = json.dumps(record.metadata) if record.metadata else None
191 p1 = self._get_param_placeholder(1)
192 p2 = self._get_param_placeholder(2)
193 p3 = self._get_param_placeholder(3)
195 query = f"""
196 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at)
197 VALUES ({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
198 """
199 if self.dialect == "postgres":
200 query += " RETURNING id"
202 params = [record_id, data, metadata]
204 return query, params
206 def build_read_query(self, record_id: str) -> tuple[str, list[Any]]:
207 """Build a SELECT query for reading a record by ID.
209 Args:
210 record_id: The record ID
212 Returns:
213 Tuple of (SQL query, parameters)
214 """
215 p1 = self._get_param_placeholder(1)
216 query = f"SELECT * FROM {self.qualified_table} WHERE id = {p1}"
218 return query, [record_id]
220 def build_update_query(self, record_id: str, record: Record) -> tuple[str, list[Any]]:
221 """Build an UPDATE query for updating a record.
223 Args:
224 record_id: The record ID
225 record: The updated record
227 Returns:
228 Tuple of (SQL query, parameters)
229 """
230 data = self._record_to_json(record)
231 metadata = json.dumps(record.metadata) if record.metadata else None
233 if self.param_style == "qmark":
234 # SQLite: data, metadata, then id
235 query = f"""
236 UPDATE {self.qualified_table}
237 SET data = ?, metadata = ?, updated_at = CURRENT_TIMESTAMP
238 WHERE id = ?
239 """
240 params = [data, metadata, record_id]
241 else:
242 # PostgreSQL: id first, then data, metadata
243 p1 = self._get_param_placeholder(1)
244 p2 = self._get_param_placeholder(2)
245 p3 = self._get_param_placeholder(3)
246 query = f"""
247 UPDATE {self.qualified_table}
248 SET data = {p2}, metadata = {p3}, updated_at = CURRENT_TIMESTAMP
249 WHERE id = {p1}
250 """
251 params = [record_id, data, metadata]
253 return query, params
255 def build_delete_query(self, record_id: str) -> tuple[str, list[Any]]:
256 """Build a DELETE query for deleting a record.
258 Args:
259 record_id: The record ID
261 Returns:
262 Tuple of (SQL query, parameters)
263 """
264 p1 = self._get_param_placeholder(1)
265 query = f"DELETE FROM {self.qualified_table} WHERE id = {p1}"
267 return query, [record_id]
269 def build_exists_query(self, record_id: str) -> tuple[str, list[Any]]:
270 """Build a query to check if a record exists.
272 Args:
273 record_id: The record ID
275 Returns:
276 Tuple of (SQL query, parameters)
277 """
278 p1 = self._get_param_placeholder(1)
279 query = f"SELECT 1 FROM {self.qualified_table} WHERE id = {p1} LIMIT 1"
281 return query, [record_id]
283 def build_complex_search_query(self, query: ComplexQuery) -> tuple[str, list[Any]]:
284 """Build a SELECT query from a ComplexQuery object with boolean logic.
286 Args:
287 query: The ComplexQuery object
289 Returns:
290 Tuple of (SQL query, parameters)
291 """
292 sql_parts = [f"SELECT * FROM {self.qualified_table}"]
293 params = []
295 # Build WHERE clause from complex conditions
296 if query.condition:
297 where_clause, where_params = self._build_complex_condition(query.condition, 1)
298 if where_clause:
299 sql_parts.append(f"WHERE {where_clause}")
300 params.extend(where_params)
302 # Add ORDER BY
303 if query.sort_specs:
304 order_parts = []
305 for sort_spec in query.sort_specs:
306 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC"
307 if self.dialect == "postgres":
308 order_parts.append(f"data->'{sort_spec.field}' {direction}")
309 elif self.dialect == "sqlite":
310 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}")
311 else:
312 order_parts.append(f"data {direction}")
313 sql_parts.append("ORDER BY " + ", ".join(order_parts))
315 # Add LIMIT and OFFSET
316 if query.limit_value:
317 sql_parts.append(f"LIMIT {query.limit_value}")
318 if query.offset_value:
319 sql_parts.append(f"OFFSET {query.offset_value}")
321 return " ".join(sql_parts), params
323 def _build_complex_condition(self, condition: Any, param_start: int) -> tuple[str, list[Any]]:
324 """Build WHERE clause for complex boolean logic conditions.
326 Args:
327 condition: The Condition object (LogicCondition or FilterCondition)
328 param_start: Starting parameter number
330 Returns:
331 Tuple of (SQL clause, parameters)
332 """
333 from ..query_logic import FilterCondition, LogicCondition, LogicOperator
335 params = []
337 # Handle FilterCondition (leaf node)
338 if isinstance(condition, FilterCondition):
339 clause, filter_params = self._build_filter_clause(condition.filter, param_start)
340 return clause, filter_params
342 # Handle LogicCondition (branch node)
343 elif isinstance(condition, LogicCondition):
344 if condition.operator == LogicOperator.AND:
345 clauses = []
346 current_param = param_start
347 for sub_condition in condition.conditions:
348 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param)
349 if sub_clause:
350 clauses.append(sub_clause)
351 params.extend(sub_params)
352 current_param += len(sub_params)
353 return (f"({' AND '.join(clauses)})", params) if clauses else ("", [])
355 elif condition.operator == LogicOperator.OR:
356 clauses = []
357 current_param = param_start
358 for sub_condition in condition.conditions:
359 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param)
360 if sub_clause:
361 clauses.append(sub_clause)
362 params.extend(sub_params)
363 current_param += len(sub_params)
364 return (f"({' OR '.join(clauses)})", params) if clauses else ("", [])
366 elif condition.operator == LogicOperator.NOT:
367 sub_clause, sub_params = self._build_complex_condition(condition.conditions[0], param_start)
368 params.extend(sub_params)
369 return (f"NOT ({sub_clause})", params) if sub_clause else ("", [])
371 return ("", [])
373 def build_where_clause(self, query: Query | None, param_start: int = 1) -> tuple[str, list[Any]]:
374 """Build just the WHERE clause from a Query object.
376 Args:
377 query: The Query object (can be None)
378 param_start: Starting parameter number for placeholders
380 Returns:
381 Tuple of (WHERE clause SQL, parameters)
382 Returns empty string and empty list if no filters
383 """
384 if not query or not query.filters:
385 return "", []
387 where_clauses = []
388 params = []
389 param_count = param_start - 1
391 for filter_spec in query.filters:
392 param_count += 1
393 clause, new_params = self._build_filter_clause(filter_spec, param_count)
394 where_clauses.append(clause)
395 params.extend(new_params)
396 param_count += len(new_params) - 1 # Adjust for multiple params
398 if where_clauses:
399 return " AND " + " AND ".join(where_clauses), params
400 return "", []
402 def build_search_query(self, query: Query) -> tuple[str, list[Any]]:
403 """Build a SELECT query from a Query object.
405 Args:
406 query: The Query object
408 Returns:
409 Tuple of (SQL query, parameters)
410 """
411 sql_parts = [f"SELECT * FROM {self.qualified_table}"]
412 params = []
413 param_count = 0
415 # Build WHERE clause
416 where_clauses = []
417 for filter_spec in query.filters:
418 param_count += 1
419 clause, new_params = self._build_filter_clause(filter_spec, param_count)
420 where_clauses.append(clause)
421 params.extend(new_params)
422 param_count += len(new_params) - 1 # Adjust for multiple params
424 if where_clauses:
425 sql_parts.append("WHERE " + " AND ".join(where_clauses))
427 # Add ORDER BY
428 if query.sort_specs:
429 order_parts = []
430 for sort_spec in query.sort_specs:
431 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC"
432 # Use JSON extraction based on dialect
433 if self.dialect == "postgres":
434 order_parts.append(f"data->'{sort_spec.field}' {direction}")
435 elif self.dialect == "sqlite":
436 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}")
437 else:
438 order_parts.append(f"data {direction}") # Fallback
439 sql_parts.append("ORDER BY " + ", ".join(order_parts))
441 # Add LIMIT and OFFSET
442 if query.limit_value:
443 sql_parts.append(f"LIMIT {query.limit_value}")
444 if query.offset_value:
445 sql_parts.append(f"OFFSET {query.offset_value}")
447 return " ".join(sql_parts), params
449 def build_batch_update_query(self, updates: list[tuple[str, Record]]) -> tuple[str, list[Any]]:
450 """Build a batch UPDATE query using CASE expressions.
452 This provides efficient batch updates for both PostgreSQL and SQLite.
454 Args:
455 updates: List of (id, record) tuples to update
457 Returns:
458 Tuple of (SQL query, parameters)
459 """
460 if not updates:
461 return "", []
463 update_ids = []
464 data_cases = []
465 metadata_cases = []
466 params = []
468 # Build CASE expressions
469 for i, (record_id, record) in enumerate(updates):
470 update_ids.append(record_id)
471 data_json = self._record_to_json(record)
472 metadata_json = json.dumps(record.metadata) if record.metadata else None
474 # Generate placeholders for this update
475 if self.param_style == "qmark":
476 # SQLite uses ? placeholders and needs repeated IDs
477 data_cases.append("WHEN id = ? THEN ?")
478 metadata_cases.append("WHEN id = ? THEN ?")
479 params.extend([record_id, data_json, record_id, metadata_json])
480 else:
481 # PostgreSQL uses numbered/named placeholders
482 param_idx = i * 3 + 1
483 p1 = self._get_param_placeholder(param_idx)
484 p2 = self._get_param_placeholder(param_idx + 1)
485 p3 = self._get_param_placeholder(param_idx + 2)
486 data_cases.append(f"WHEN id = {p1} THEN {p2}")
487 metadata_cases.append(f"WHEN id = {p1} THEN {p3}")
488 params.extend([record_id, data_json, metadata_json])
490 # Build WHERE IN clause
491 id_param_start = len(updates) * 3 + 1 if self.param_style != "qmark" else 0
492 if self.param_style == "qmark":
493 # SQLite: add IDs for WHERE IN clause
494 id_placeholders = ["?" for _ in update_ids]
495 else:
496 # PostgreSQL with numbered/named placeholders
497 id_placeholders = [self._get_param_placeholder(i) for i in range(id_param_start, id_param_start + len(update_ids))]
498 params.extend(update_ids)
500 # Build the UPDATE query
501 # Add ELSE to preserve original value when no CASE matches
502 query = f"""
503 UPDATE {self.qualified_table}
504 SET
505 data = CASE {' '.join(data_cases)} ELSE data END,
506 metadata = CASE {' '.join(metadata_cases)} ELSE metadata END,
507 updated_at = CURRENT_TIMESTAMP
508 WHERE id IN ({', '.join(id_placeholders)})
509 """
511 # PostgreSQL can use RETURNING
512 if self.dialect == "postgres":
513 query += " RETURNING id"
515 return query, params
517 def build_batch_create_query(self, records: list[Record]) -> tuple[str, list[Any], list[str]]:
518 """Build a batch INSERT query for multiple records.
520 Generates efficient multi-value INSERT statements.
522 Args:
523 records: List of records to insert
525 Returns:
526 Tuple of (SQL query, parameters, generated IDs)
527 """
528 if not records:
529 return "", [], []
531 import uuid
533 # Generate IDs and prepare values
534 ids = []
535 values_clauses = []
536 params = []
538 for i, record in enumerate(records):
539 record_id = str(uuid.uuid4())
540 ids.append(record_id)
541 data_json = self._record_to_json(record)
542 metadata_json = json.dumps(record.metadata) if record.metadata else None
544 # Generate placeholders for this row
545 param_idx = i * 3 + 1
546 p1 = self._get_param_placeholder(param_idx)
547 p2 = self._get_param_placeholder(param_idx + 1)
548 p3 = self._get_param_placeholder(param_idx + 2)
549 values_clauses.append(f"({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
550 params.extend([record_id, data_json, metadata_json])
552 # Build the INSERT query
553 query = f"""
554 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at)
555 VALUES {', '.join(values_clauses)}
556 """
558 # PostgreSQL can use RETURNING
559 if self.dialect == "postgres":
560 query += " RETURNING id"
562 return query, params, ids
564 def build_batch_delete_query(self, ids: list[str]) -> tuple[str, list[Any]]:
565 """Build a batch DELETE query for multiple records.
567 Deletes multiple records in a single query.
569 Args:
570 ids: List of record IDs to delete
572 Returns:
573 Tuple of (SQL query, parameters)
574 """
575 if not ids:
576 return "", []
578 # Generate placeholders for IDs
579 placeholders = [self._get_param_placeholder(i) for i in range(1, len(ids) + 1)]
581 query = f"""
582 DELETE FROM {self.qualified_table}
583 WHERE id IN ({', '.join(placeholders)})
584 """
586 # PostgreSQL can use RETURNING
587 if self.dialect == "postgres":
588 query += " RETURNING id"
590 return query, ids
592 def build_count_query(self, query: Query | None = None) -> tuple[str, list[Any]]:
593 """Build a COUNT query.
595 Args:
596 query: Optional Query object for filtering
598 Returns:
599 Tuple of (SQL query, parameters)
600 """
601 if query and query.filters:
602 search_query, params = self.build_search_query(query)
603 # Replace SELECT * with SELECT COUNT(*)
604 count_query = search_query.replace("SELECT *", "SELECT COUNT(*)", 1)
605 # Remove ORDER BY, LIMIT, OFFSET clauses
606 for clause in ["ORDER BY", "LIMIT", "OFFSET"]:
607 if clause in count_query:
608 count_query = count_query[:count_query.index(clause)]
609 return count_query.strip(), params
610 else:
611 return f"SELECT COUNT(*) FROM {self.qualified_table}", []
613 def _build_filter_clause(self, filter_spec: Any, param_start: int) -> tuple[str, list[Any]]:
614 """Build a WHERE clause for a filter.
616 Args:
617 filter_spec: The filter specification
618 param_start: Starting parameter number
620 Returns:
621 Tuple of (SQL clause, parameters)
622 """
623 field = filter_spec.field
624 op = filter_spec.operator
625 value = filter_spec.value
627 # JSON field extraction with type casting for PostgreSQL
628 if self.dialect == "postgres":
629 # For PostgreSQL, we need to cast JSONB text to appropriate types for comparisons
630 base_field_expr = f"data->>'{field}'"
632 # Determine if we need type casting based on operator and value type
633 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]:
634 # These operators need numeric comparison
635 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))):
636 field_expr = f"({base_field_expr})::numeric"
637 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)):
638 field_expr = f"({base_field_expr})::timestamp"
639 else:
640 field_expr = base_field_expr
641 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]:
642 # For equality and IN operations, cast based on value type
643 if isinstance(value, bool):
644 field_expr = f"({base_field_expr})::boolean"
645 elif isinstance(value, (int, float)):
646 field_expr = f"({base_field_expr})::numeric"
647 elif isinstance(value, list) and value and isinstance(value[0], (int, float)):
648 # IN/NOT_IN with numeric values
649 field_expr = f"({base_field_expr})::numeric"
650 else:
651 field_expr = base_field_expr
652 else:
653 field_expr = base_field_expr
655 param_placeholder = self._get_param_placeholder(param_start)
656 elif self.dialect == "sqlite":
657 field_expr = f"json_extract(data, '$.{field}')"
658 param_placeholder = self._get_param_placeholder(param_start)
659 else:
660 field_expr = field
661 param_placeholder = self._get_param_placeholder(param_start)
663 # Build clause based on operator
664 if op == Operator.EQ:
665 return f"{field_expr} = {param_placeholder}", [value]
666 elif op == Operator.NEQ:
667 return f"{field_expr} != {param_placeholder}", [value]
668 elif op == Operator.GT:
669 return f"{field_expr} > {param_placeholder}", [value]
670 elif op == Operator.GTE:
671 return f"{field_expr} >= {param_placeholder}", [value]
672 elif op == Operator.LT:
673 return f"{field_expr} < {param_placeholder}", [value]
674 elif op == Operator.LTE:
675 return f"{field_expr} <= {param_placeholder}", [value]
676 elif op == Operator.LIKE:
677 return f"{field_expr} LIKE {param_placeholder}", [value]
678 elif op == Operator.IN:
679 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))])
680 return f"{field_expr} IN ({placeholders})", list(value)
681 elif op == Operator.NOT_IN:
682 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))])
683 return f"{field_expr} NOT IN ({placeholders})", list(value)
684 elif op == Operator.BETWEEN:
685 placeholder1 = self._get_param_placeholder(param_start)
686 placeholder2 = self._get_param_placeholder(param_start + 1)
687 return f"{field_expr} BETWEEN {placeholder1} AND {placeholder2}", list(value)
688 elif op == Operator.NOT_BETWEEN:
689 placeholder1 = self._get_param_placeholder(param_start)
690 placeholder2 = self._get_param_placeholder(param_start + 1)
691 return f"{field_expr} NOT BETWEEN {placeholder1} AND {placeholder2}", list(value)
692 elif op == Operator.EXISTS:
693 return f"{field_expr} IS NOT NULL", []
694 elif op == Operator.NOT_EXISTS:
695 return f"{field_expr} IS NULL", []
696 elif op == Operator.REGEX:
697 # PostgreSQL uses ~ for regex, SQLite would use REGEXP
698 if self.dialect == "postgres":
699 return f"{field_expr} ~ {param_placeholder}", [value]
700 else:
701 return f"{field_expr} REGEXP {param_placeholder}", [value]
702 else:
703 raise ValueError(f"Unsupported operator: {op}")
705 def _record_to_json(self, record: Record) -> str:
706 """Convert a Record to JSON string for storage."""
707 return SQLRecordSerializer.record_to_json(record)
709 @staticmethod
710 def row_to_record(row: dict[str, Any]) -> Record:
711 """Convert a database row to a Record.
713 Args:
714 row: Database row as dictionary
716 Returns:
717 Record object
718 """
719 return SQLRecordSerializer.row_to_record(row)
722class SQLTableManager:
723 """Manages SQL table creation and schema."""
725 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard"):
726 """Initialize the table manager.
728 Args:
729 table_name: Name of the database table
730 schema_name: Optional schema name
731 dialect: SQL dialect ('postgres', 'sqlite', 'standard')
732 """
733 self.table_name = table_name
734 self.schema_name = schema_name
735 self.dialect = dialect
736 self.qualified_table = self._get_qualified_table_name()
738 def _get_qualified_table_name(self) -> str:
739 """Get the fully qualified table name."""
740 if self.schema_name:
741 return f"{self.schema_name}.{self.table_name}"
742 return self.table_name
744 def get_create_table_sql(self) -> str:
745 """Get the CREATE TABLE SQL statement.
747 Returns:
748 SQL statement for creating the table
749 """
750 if self.dialect == "postgres":
751 return f"""
752 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
753 id VARCHAR(255) PRIMARY KEY,
754 data JSONB NOT NULL,
755 metadata JSONB,
756 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
757 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
758 );
760 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data
761 ON {self.qualified_table} USING GIN (data);
763 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_metadata
764 ON {self.qualified_table} USING GIN (metadata);
765 """
766 elif self.dialect == "sqlite":
767 # SQLite doesn't have JSONB, uses TEXT for JSON storage
768 return f"""
769 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
770 id VARCHAR(255) PRIMARY KEY,
771 data TEXT NOT NULL CHECK (json_valid(data)),
772 metadata TEXT CHECK (metadata IS NULL OR json_valid(metadata)),
773 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
774 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
775 );
777 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created
778 ON {self.qualified_table} (created_at);
780 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated
781 ON {self.qualified_table} (updated_at);
782 """
783 else:
784 # Generic SQL
785 return f"""
786 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
787 id VARCHAR(255) PRIMARY KEY,
788 data TEXT NOT NULL,
789 metadata TEXT,
790 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
791 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
792 );
793 """
795 def get_drop_table_sql(self) -> str:
796 """Get the DROP TABLE SQL statement.
798 Returns:
799 SQL statement for dropping the table
800 """
801 return f"DROP TABLE IF EXISTS {self.qualified_table}"