Coverage for src/dataknobs_data/backends/sql_base.py: 68%
400 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:34 -0700
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:34 -0700
1"""Base SQL functionality shared between SQL database backends."""
3from __future__ import annotations
5import json
6import uuid
7from datetime import datetime
8from typing import Any, TYPE_CHECKING
10from ..query import Operator, Query, SortOrder
11from ..records import Record
13if TYPE_CHECKING:
14 from ..query_logic import ComplexQuery
17class SQLRecordSerializer:
18 """Mixin for SQL record serialization/deserialization with vector support."""
20 @staticmethod
21 def record_to_json(record: Record) -> str:
22 """Convert a Record to JSON string for storage.
24 Handles VectorField serialization to preserve metadata.
25 """
26 from ..fields import VectorField
28 data = {}
29 for field_name, field_obj in record.fields.items():
30 # Handle VectorField - preserve full metadata
31 if isinstance(field_obj, VectorField):
32 data[field_name] = field_obj.to_dict()
33 # Handle other special fields that have to_list
34 elif hasattr(field_obj, 'to_list') and callable(field_obj.to_list):
35 data[field_name] = field_obj.to_list()
36 else:
37 data[field_name] = field_obj.value
38 return json.dumps(data)
40 @staticmethod
41 def get_vector_extraction_sql(field_name: str, dialect: str = "postgres") -> str:
42 """Get SQL expression to extract vector from JSON field.
44 Handles both raw arrays and VectorField dict formats.
46 Args:
47 field_name: Name of the vector field
48 dialect: SQL dialect (postgres, sqlite, etc.)
50 Returns:
51 SQL expression to extract vector value
52 """
53 if dialect == "postgres":
54 # PostgreSQL: Handle both formats - raw array or VectorField dict
55 return f"""CASE
56 WHEN jsonb_typeof(data->'{field_name}') = 'object'
57 THEN (data->'{field_name}'->>'value')::vector
58 ELSE (data->>'{field_name}')::vector
59 END"""
60 elif dialect == "sqlite":
61 # SQLite doesn't have native vector type, return JSON string
62 return f"""CASE
63 WHEN json_type(json_extract(data, '$.{field_name}')) = 'object'
64 THEN json_extract(data, '$.{field_name}.value')
65 ELSE json_extract(data, '$.{field_name}')
66 END"""
67 else:
68 # Generic fallback
69 return f"data->'{field_name}'"
71 @staticmethod
72 def json_to_record(data_json: str, metadata_json: str | None = None) -> Record:
73 """Convert JSON strings to a Record.
75 Reconstructs VectorField objects from serialized format.
76 """
77 from ..fields import Field, VectorField
79 data = json.loads(data_json) if data_json else {}
80 metadata = json.loads(metadata_json) if metadata_json and metadata_json != 'null' else {}
82 # Reconstruct fields properly, especially VectorFields
83 fields = {}
84 for field_name, field_value in data.items():
85 # Check if this is a serialized VectorField
86 if isinstance(field_value, dict) and field_value.get("type") == "vector":
87 # Ensure the field has a 'name' key for from_dict (in case it's missing)
88 if "name" not in field_value:
89 field_value["name"] = field_name
90 # Reconstruct VectorField from dict
91 fields[field_name] = VectorField.from_dict(field_value)
92 else:
93 # Regular field
94 fields[field_name] = Field(name=field_name, value=field_value)
96 # Create Record with properly typed fields
97 record = Record(metadata=metadata)
98 record.fields.update(fields)
99 return record
101 @staticmethod
102 def row_to_record(row: dict[str, Any]) -> Record:
103 """Convert a database row to a Record.
105 Args:
106 row: Database row as dictionary with 'id', 'data' and optional 'metadata' fields
108 Returns:
109 Reconstructed Record object with ID set
110 """
111 data_json = row.get("data", {})
112 if not isinstance(data_json, str):
113 data_json = json.dumps(data_json)
115 metadata_json = row.get("metadata")
116 if metadata_json and not isinstance(metadata_json, str):
117 metadata_json = json.dumps(metadata_json)
119 record = SQLRecordSerializer.json_to_record(data_json, metadata_json)
121 # Ensure the record has its ID set from the row
122 from ..database_utils import ensure_record_id
123 if "id" in row:
124 record = ensure_record_id(record, row["id"])
126 return record
129class SQLQueryBuilder:
130 """Builds SQL queries from Query objects."""
132 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard", param_style: str = "numeric"):
133 """Initialize the SQL query builder.
135 Args:
136 table_name: Name of the database table
137 schema_name: Optional schema name
138 dialect: SQL dialect ('postgres', 'sqlite', 'standard')
139 param_style: Parameter style ('numeric' for $1, 'qmark' for ?, 'pyformat' for %(name)s)
140 """
141 self.table_name = table_name
142 self.schema_name = schema_name
143 self.dialect = dialect
144 self.param_style = param_style
145 self.qualified_table = self._get_qualified_table_name()
147 def _get_qualified_table_name(self) -> str:
148 """Get the fully qualified table name."""
149 if self.schema_name:
150 return f"{self.schema_name}.{self.table_name}"
151 return self.table_name
153 def _get_param_placeholder(self, param_num: int, param_name: str | None = None) -> str:
154 """Get the appropriate parameter placeholder based on param_style.
156 Args:
157 param_num: Parameter number (1-based)
158 param_name: Optional parameter name for pyformat style
160 Returns:
161 Parameter placeholder string
162 """
163 if self.param_style == "numeric":
164 return f"${param_num}"
165 elif self.param_style == "qmark":
166 return "?"
167 elif self.param_style == "pyformat":
168 name = param_name or f"p{param_num - 1}" # 0-based for pyformat
169 return f"%({name})s"
170 else:
171 # Default to numeric for postgres dialect, qmark for others
172 if self.dialect == "postgres":
173 return f"${param_num}"
174 else:
175 return "?"
177 def build_create_query(self, record: Record, record_id: str | None = None) -> tuple[str, list[Any]]:
178 """Build an INSERT query for creating a record.
180 Args:
181 record: The record to insert
182 record_id: Optional ID (will generate if not provided)
184 Returns:
185 Tuple of (SQL query, parameters)
186 """
187 record_id = record_id or record.storage_id or str(uuid.uuid4())
188 data = SQLRecordSerializer.record_to_json(record)
189 metadata = json.dumps(record.metadata) if record.metadata else None
191 p1 = self._get_param_placeholder(1)
192 p2 = self._get_param_placeholder(2)
193 p3 = self._get_param_placeholder(3)
195 query = f"""
196 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at)
197 VALUES ({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
198 """
199 if self.dialect == "postgres":
200 query += " RETURNING id"
202 params = [record_id, data, metadata]
204 return query, params
206 def build_read_query(self, record_id: str) -> tuple[str, list[Any]]:
207 """Build a SELECT query for reading a record by ID.
209 Args:
210 record_id: The record ID
212 Returns:
213 Tuple of (SQL query, parameters)
214 """
215 p1 = self._get_param_placeholder(1)
216 query = f"SELECT * FROM {self.qualified_table} WHERE id = {p1}"
218 return query, [record_id]
220 def build_update_query(self, record_id: str, record: Record) -> tuple[str, list[Any]]:
221 """Build an UPDATE query for updating a record.
223 Args:
224 record_id: The record ID
225 record: The updated record
227 Returns:
228 Tuple of (SQL query, parameters)
229 """
230 data = self._record_to_json(record)
231 metadata = json.dumps(record.metadata) if record.metadata else None
233 if self.param_style == "qmark":
234 # SQLite: data, metadata, then id
235 query = f"""
236 UPDATE {self.qualified_table}
237 SET data = ?, metadata = ?, updated_at = CURRENT_TIMESTAMP
238 WHERE id = ?
239 """
240 params = [data, metadata, record_id]
241 else:
242 # PostgreSQL: id first, then data, metadata
243 p1 = self._get_param_placeholder(1)
244 p2 = self._get_param_placeholder(2)
245 p3 = self._get_param_placeholder(3)
246 query = f"""
247 UPDATE {self.qualified_table}
248 SET data = {p2}, metadata = {p3}, updated_at = CURRENT_TIMESTAMP
249 WHERE id = {p1}
250 """
251 params = [record_id, data, metadata]
253 return query, params
255 def build_delete_query(self, record_id: str) -> tuple[str, list[Any]]:
256 """Build a DELETE query for deleting a record.
258 Args:
259 record_id: The record ID
261 Returns:
262 Tuple of (SQL query, parameters)
263 """
264 p1 = self._get_param_placeholder(1)
265 query = f"DELETE FROM {self.qualified_table} WHERE id = {p1}"
267 return query, [record_id]
269 def build_exists_query(self, record_id: str) -> tuple[str, list[Any]]:
270 """Build a query to check if a record exists.
272 Args:
273 record_id: The record ID
275 Returns:
276 Tuple of (SQL query, parameters)
277 """
278 p1 = self._get_param_placeholder(1)
279 query = f"SELECT 1 FROM {self.qualified_table} WHERE id = {p1} LIMIT 1"
281 return query, [record_id]
283 def build_complex_search_query(self, query: ComplexQuery) -> tuple[str, list[Any]]:
284 """Build a SELECT query from a ComplexQuery object with boolean logic.
286 Args:
287 query: The ComplexQuery object
289 Returns:
290 Tuple of (SQL query, parameters)
291 """
292 sql_parts = [f"SELECT * FROM {self.qualified_table}"]
293 params = []
295 # Build WHERE clause from complex conditions
296 if query.condition:
297 where_clause, where_params = self._build_complex_condition(query.condition, 1)
298 if where_clause:
299 sql_parts.append(f"WHERE {where_clause}")
300 params.extend(where_params)
302 # Add ORDER BY
303 if query.sort_specs:
304 order_parts = []
305 for sort_spec in query.sort_specs:
306 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC"
307 if self.dialect == "postgres":
308 order_parts.append(f"data->'{sort_spec.field}' {direction}")
309 elif self.dialect == "sqlite":
310 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}")
311 elif self.dialect == "duckdb":
312 order_parts.append(f"json_extract_string(data, '$.{sort_spec.field}') {direction}")
313 else:
314 order_parts.append(f"data {direction}")
315 sql_parts.append("ORDER BY " + ", ".join(order_parts))
317 # Add LIMIT and OFFSET
318 if query.limit_value:
319 sql_parts.append(f"LIMIT {query.limit_value}")
320 if query.offset_value:
321 sql_parts.append(f"OFFSET {query.offset_value}")
323 return " ".join(sql_parts), params
325 def _build_complex_condition(self, condition: Any, param_start: int) -> tuple[str, list[Any]]:
326 """Build WHERE clause for complex boolean logic conditions.
328 Args:
329 condition: The Condition object (LogicCondition or FilterCondition)
330 param_start: Starting parameter number
332 Returns:
333 Tuple of (SQL clause, parameters)
334 """
335 from ..query_logic import FilterCondition, LogicCondition, LogicOperator
337 params = []
339 # Handle FilterCondition (leaf node)
340 if isinstance(condition, FilterCondition):
341 clause, filter_params = self._build_filter_clause(condition.filter, param_start)
342 return clause, filter_params
344 # Handle LogicCondition (branch node)
345 elif isinstance(condition, LogicCondition):
346 if condition.operator == LogicOperator.AND:
347 clauses = []
348 current_param = param_start
349 for sub_condition in condition.conditions:
350 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param)
351 if sub_clause:
352 clauses.append(sub_clause)
353 params.extend(sub_params)
354 current_param += len(sub_params)
355 return (f"({' AND '.join(clauses)})", params) if clauses else ("", [])
357 elif condition.operator == LogicOperator.OR:
358 clauses = []
359 current_param = param_start
360 for sub_condition in condition.conditions:
361 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param)
362 if sub_clause:
363 clauses.append(sub_clause)
364 params.extend(sub_params)
365 current_param += len(sub_params)
366 return (f"({' OR '.join(clauses)})", params) if clauses else ("", [])
368 elif condition.operator == LogicOperator.NOT:
369 sub_clause, sub_params = self._build_complex_condition(condition.conditions[0], param_start)
370 params.extend(sub_params)
371 return (f"NOT ({sub_clause})", params) if sub_clause else ("", [])
373 return ("", [])
375 def build_where_clause(self, query: Query | None, param_start: int = 1) -> tuple[str, list[Any]]:
376 """Build just the WHERE clause from a Query object.
378 Args:
379 query: The Query object (can be None)
380 param_start: Starting parameter number for placeholders
382 Returns:
383 Tuple of (WHERE clause SQL, parameters)
384 Returns empty string and empty list if no filters
385 """
386 if not query or not query.filters:
387 return "", []
389 where_clauses = []
390 params = []
391 param_count = param_start - 1
393 for filter_spec in query.filters:
394 param_count += 1
395 clause, new_params = self._build_filter_clause(filter_spec, param_count)
396 where_clauses.append(clause)
397 params.extend(new_params)
398 param_count += len(new_params) - 1 # Adjust for multiple params
400 if where_clauses:
401 return " AND " + " AND ".join(where_clauses), params
402 return "", []
404 def build_search_query(self, query: Query) -> tuple[str, list[Any]]:
405 """Build a SELECT query from a Query object.
407 Args:
408 query: The Query object
410 Returns:
411 Tuple of (SQL query, parameters)
412 """
413 sql_parts = [f"SELECT * FROM {self.qualified_table}"]
414 params = []
415 param_count = 0
417 # Build WHERE clause
418 where_clauses = []
419 for filter_spec in query.filters:
420 param_count += 1
421 clause, new_params = self._build_filter_clause(filter_spec, param_count)
422 where_clauses.append(clause)
423 params.extend(new_params)
424 param_count += len(new_params) - 1 # Adjust for multiple params
426 if where_clauses:
427 sql_parts.append("WHERE " + " AND ".join(where_clauses))
429 # Add ORDER BY
430 if query.sort_specs:
431 order_parts = []
432 for sort_spec in query.sort_specs:
433 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC"
434 # Special handling for 'id' field - it's in a separate column
435 if sort_spec.field == 'id':
436 order_parts.append(f"id {direction}")
437 # Use JSON extraction based on dialect
438 elif self.dialect == "postgres":
439 order_parts.append(f"data->'{sort_spec.field}' {direction}")
440 elif self.dialect == "sqlite":
441 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}")
442 elif self.dialect == "duckdb":
443 order_parts.append(f"json_extract_string(data, '$.{sort_spec.field}') {direction}")
444 else:
445 order_parts.append(f"data {direction}") # Fallback
446 sql_parts.append("ORDER BY " + ", ".join(order_parts))
448 # Add LIMIT and OFFSET
449 if query.limit_value:
450 sql_parts.append(f"LIMIT {query.limit_value}")
451 if query.offset_value:
452 sql_parts.append(f"OFFSET {query.offset_value}")
454 return " ".join(sql_parts), params
456 def build_batch_update_query(self, updates: list[tuple[str, Record]]) -> tuple[str, list[Any]]:
457 """Build a batch UPDATE query using CASE expressions.
459 This provides efficient batch updates for both PostgreSQL and SQLite.
461 Args:
462 updates: List of (id, record) tuples to update
464 Returns:
465 Tuple of (SQL query, parameters)
466 """
467 if not updates:
468 return "", []
470 update_ids = []
471 data_cases = []
472 metadata_cases = []
473 params = []
475 # Build CASE expressions
476 for i, (record_id, record) in enumerate(updates):
477 update_ids.append(record_id)
478 data_json = self._record_to_json(record)
479 metadata_json = json.dumps(record.metadata) if record.metadata else None
481 # Generate placeholders for this update
482 if self.param_style == "qmark":
483 # SQLite uses ? placeholders and needs repeated IDs
484 data_cases.append("WHEN id = ? THEN ?")
485 metadata_cases.append("WHEN id = ? THEN ?")
486 params.extend([record_id, data_json, record_id, metadata_json])
487 else:
488 # PostgreSQL uses numbered/named placeholders
489 param_idx = i * 3 + 1
490 p1 = self._get_param_placeholder(param_idx)
491 p2 = self._get_param_placeholder(param_idx + 1)
492 p3 = self._get_param_placeholder(param_idx + 2)
493 data_cases.append(f"WHEN id = {p1} THEN {p2}")
494 metadata_cases.append(f"WHEN id = {p1} THEN {p3}")
495 params.extend([record_id, data_json, metadata_json])
497 # Build WHERE IN clause
498 id_param_start = len(updates) * 3 + 1 if self.param_style != "qmark" else 0
499 if self.param_style == "qmark":
500 # SQLite: add IDs for WHERE IN clause
501 id_placeholders = ["?" for _ in update_ids]
502 else:
503 # PostgreSQL with numbered/named placeholders
504 id_placeholders = [self._get_param_placeholder(i) for i in range(id_param_start, id_param_start + len(update_ids))]
505 params.extend(update_ids)
507 # Build the UPDATE query
508 # Add ELSE to preserve original value when no CASE matches
509 query = f"""
510 UPDATE {self.qualified_table}
511 SET
512 data = CASE {' '.join(data_cases)} ELSE data END,
513 metadata = CASE {' '.join(metadata_cases)} ELSE metadata END,
514 updated_at = CURRENT_TIMESTAMP
515 WHERE id IN ({', '.join(id_placeholders)})
516 """
518 # PostgreSQL can use RETURNING
519 if self.dialect == "postgres":
520 query += " RETURNING id"
522 return query, params
524 def build_batch_create_query(self, records: list[Record]) -> tuple[str, list[Any], list[str]]:
525 """Build a batch INSERT query for multiple records.
527 Generates efficient multi-value INSERT statements.
529 Args:
530 records: List of records to insert
532 Returns:
533 Tuple of (SQL query, parameters, generated IDs)
534 """
535 if not records:
536 return "", [], []
538 import uuid
540 # Generate IDs and prepare values
541 ids = []
542 values_clauses = []
543 params = []
545 for i, record in enumerate(records):
546 record_id = str(uuid.uuid4())
547 ids.append(record_id)
548 data_json = self._record_to_json(record)
549 metadata_json = json.dumps(record.metadata) if record.metadata else None
551 # Generate placeholders for this row
552 param_idx = i * 3 + 1
553 p1 = self._get_param_placeholder(param_idx)
554 p2 = self._get_param_placeholder(param_idx + 1)
555 p3 = self._get_param_placeholder(param_idx + 2)
556 values_clauses.append(f"({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
557 params.extend([record_id, data_json, metadata_json])
559 # Build the INSERT query
560 query = f"""
561 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at)
562 VALUES {', '.join(values_clauses)}
563 """
565 # PostgreSQL can use RETURNING
566 if self.dialect == "postgres":
567 query += " RETURNING id"
569 return query, params, ids
571 def build_batch_delete_query(self, ids: list[str]) -> tuple[str, list[Any]]:
572 """Build a batch DELETE query for multiple records.
574 Deletes multiple records in a single query.
576 Args:
577 ids: List of record IDs to delete
579 Returns:
580 Tuple of (SQL query, parameters)
581 """
582 if not ids:
583 return "", []
585 # Generate placeholders for IDs
586 placeholders = [self._get_param_placeholder(i) for i in range(1, len(ids) + 1)]
588 query = f"""
589 DELETE FROM {self.qualified_table}
590 WHERE id IN ({', '.join(placeholders)})
591 """
593 # PostgreSQL can use RETURNING
594 if self.dialect == "postgres":
595 query += " RETURNING id"
597 return query, ids
599 def build_count_query(self, query: Query | None = None) -> tuple[str, list[Any]]:
600 """Build a COUNT query.
602 Args:
603 query: Optional Query object for filtering
605 Returns:
606 Tuple of (SQL query, parameters)
607 """
608 if query and query.filters:
609 search_query, params = self.build_search_query(query)
610 # Replace SELECT * with SELECT COUNT(*)
611 count_query = search_query.replace("SELECT *", "SELECT COUNT(*)", 1)
612 # Remove ORDER BY, LIMIT, OFFSET clauses
613 for clause in ["ORDER BY", "LIMIT", "OFFSET"]:
614 if clause in count_query:
615 count_query = count_query[:count_query.index(clause)]
616 return count_query.strip(), params
617 else:
618 return f"SELECT COUNT(*) FROM {self.qualified_table}", []
620 def _build_filter_clause(self, filter_spec: Any, param_start: int) -> tuple[str, list[Any]]:
621 """Build a WHERE clause for a filter.
623 Args:
624 filter_spec: The filter specification
625 param_start: Starting parameter number
627 Returns:
628 Tuple of (SQL clause, parameters)
629 """
630 field = filter_spec.field
631 op = filter_spec.operator
632 value = filter_spec.value
634 # Special handling for 'id' field - it's in a separate column
635 if field == 'id':
636 field_expr = 'id'
637 param_placeholder = self._get_param_placeholder(param_start)
638 # JSON field extraction with type casting for PostgreSQL
639 elif self.dialect == "postgres":
640 # For PostgreSQL, we need to cast JSONB text to appropriate types for comparisons
641 base_field_expr = f"data->>'{field}'"
643 # Determine if we need type casting based on operator and value type
644 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]:
645 # These operators need numeric comparison
646 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))):
647 field_expr = f"({base_field_expr})::numeric"
648 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)):
649 field_expr = f"({base_field_expr})::timestamp"
650 else:
651 field_expr = base_field_expr
652 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]:
653 # For equality and IN operations, cast based on value type
654 if isinstance(value, bool):
655 field_expr = f"({base_field_expr})::boolean"
656 elif isinstance(value, (int, float)):
657 field_expr = f"({base_field_expr})::numeric"
658 elif isinstance(value, list) and value and isinstance(value[0], (int, float)):
659 # IN/NOT_IN with numeric values
660 field_expr = f"({base_field_expr})::numeric"
661 else:
662 field_expr = base_field_expr
663 else:
664 field_expr = base_field_expr
666 param_placeholder = self._get_param_placeholder(param_start)
667 # JSON field extraction with type casting for DuckDB
668 elif self.dialect == "duckdb":
669 # DuckDB uses json_extract_string for text extraction, with CAST for type conversion
670 base_field_expr = f"json_extract_string(data, '$.{field}')"
672 # Determine if we need type casting based on operator and value type
673 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]:
674 # These operators need numeric comparison
675 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))):
676 field_expr = f"CAST({base_field_expr} AS DOUBLE)"
677 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)):
678 field_expr = f"CAST({base_field_expr} AS TIMESTAMP)"
679 else:
680 field_expr = base_field_expr
681 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]:
682 # For equality and IN operations, cast based on value type
683 if isinstance(value, bool):
684 field_expr = f"CAST({base_field_expr} AS BOOLEAN)"
685 elif isinstance(value, (int, float)):
686 field_expr = f"CAST({base_field_expr} AS DOUBLE)"
687 elif isinstance(value, list) and value and isinstance(value[0], (int, float)):
688 # IN/NOT_IN with numeric values
689 field_expr = f"CAST({base_field_expr} AS DOUBLE)"
690 else:
691 field_expr = base_field_expr
692 else:
693 field_expr = base_field_expr
695 param_placeholder = self._get_param_placeholder(param_start)
696 elif self.dialect == "sqlite":
697 field_expr = f"json_extract(data, '$.{field}')"
698 param_placeholder = self._get_param_placeholder(param_start)
699 else:
700 field_expr = field
701 param_placeholder = self._get_param_placeholder(param_start)
703 # Build clause based on operator
704 if op == Operator.EQ:
705 return f"{field_expr} = {param_placeholder}", [value]
706 elif op == Operator.NEQ:
707 return f"{field_expr} != {param_placeholder}", [value]
708 elif op == Operator.GT:
709 return f"{field_expr} > {param_placeholder}", [value]
710 elif op == Operator.GTE:
711 return f"{field_expr} >= {param_placeholder}", [value]
712 elif op == Operator.LT:
713 return f"{field_expr} < {param_placeholder}", [value]
714 elif op == Operator.LTE:
715 return f"{field_expr} <= {param_placeholder}", [value]
716 elif op == Operator.LIKE:
717 return f"{field_expr} LIKE {param_placeholder}", [value]
718 elif op == Operator.NOT_LIKE:
719 return f"{field_expr} NOT LIKE {param_placeholder}", [value]
720 elif op == Operator.IN:
721 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))])
722 return f"{field_expr} IN ({placeholders})", list(value)
723 elif op == Operator.NOT_IN:
724 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))])
725 return f"{field_expr} NOT IN ({placeholders})", list(value)
726 elif op == Operator.BETWEEN:
727 placeholder1 = self._get_param_placeholder(param_start)
728 placeholder2 = self._get_param_placeholder(param_start + 1)
729 return f"{field_expr} BETWEEN {placeholder1} AND {placeholder2}", list(value)
730 elif op == Operator.NOT_BETWEEN:
731 placeholder1 = self._get_param_placeholder(param_start)
732 placeholder2 = self._get_param_placeholder(param_start + 1)
733 return f"{field_expr} NOT BETWEEN {placeholder1} AND {placeholder2}", list(value)
734 elif op == Operator.EXISTS:
735 return f"{field_expr} IS NOT NULL", []
736 elif op == Operator.NOT_EXISTS:
737 return f"{field_expr} IS NULL", []
738 elif op == Operator.REGEX:
739 # PostgreSQL uses ~ for regex, SQLite and DuckDB use REGEXP
740 if self.dialect == "postgres":
741 return f"{field_expr} ~ {param_placeholder}", [value]
742 elif self.dialect == "duckdb":
743 return f"regexp_matches({field_expr}, {param_placeholder})", [value]
744 else:
745 return f"{field_expr} REGEXP {param_placeholder}", [value]
746 else:
747 raise ValueError(f"Unsupported operator: {op}")
749 def _record_to_json(self, record: Record) -> str:
750 """Convert a Record to JSON string for storage."""
751 return SQLRecordSerializer.record_to_json(record)
753 @staticmethod
754 def row_to_record(row: dict[str, Any]) -> Record:
755 """Convert a database row to a Record.
757 Args:
758 row: Database row as dictionary
760 Returns:
761 Record object
762 """
763 return SQLRecordSerializer.row_to_record(row)
766class SQLTableManager:
767 """Manages SQL table creation and schema."""
769 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard"):
770 """Initialize the table manager.
772 Args:
773 table_name: Name of the database table
774 schema_name: Optional schema name
775 dialect: SQL dialect ('postgres', 'sqlite', 'standard')
776 """
777 self.table_name = table_name
778 self.schema_name = schema_name
779 self.dialect = dialect
780 self.qualified_table = self._get_qualified_table_name()
782 def _get_qualified_table_name(self) -> str:
783 """Get the fully qualified table name."""
784 if self.schema_name:
785 return f"{self.schema_name}.{self.table_name}"
786 return self.table_name
788 def get_create_table_sql(self) -> str:
789 """Get the CREATE TABLE SQL statement.
791 Returns:
792 SQL statement for creating the table
793 """
794 if self.dialect == "postgres":
795 return f"""
796 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
797 id VARCHAR(255) PRIMARY KEY,
798 data JSONB NOT NULL,
799 metadata JSONB,
800 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
801 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
802 );
804 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data
805 ON {self.qualified_table} USING GIN (data);
807 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_metadata
808 ON {self.qualified_table} USING GIN (metadata);
809 """
810 elif self.dialect == "sqlite":
811 # SQLite doesn't have JSONB, uses TEXT for JSON storage
812 return f"""
813 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
814 id VARCHAR(255) PRIMARY KEY,
815 data TEXT NOT NULL CHECK (json_valid(data)),
816 metadata TEXT CHECK (metadata IS NULL OR json_valid(metadata)),
817 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
818 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
819 );
821 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created
822 ON {self.qualified_table} (created_at);
824 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated
825 ON {self.qualified_table} (updated_at);
826 """
827 elif self.dialect == "duckdb":
828 # DuckDB has native JSON type for efficient JSON storage and querying
829 return f"""
830 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
831 id VARCHAR(255) PRIMARY KEY,
832 data JSON NOT NULL,
833 metadata JSON,
834 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
835 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
836 );
838 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created
839 ON {self.qualified_table} (created_at);
841 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated
842 ON {self.qualified_table} (updated_at);
843 """
844 else:
845 # Generic SQL
846 return f"""
847 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
848 id VARCHAR(255) PRIMARY KEY,
849 data TEXT NOT NULL,
850 metadata TEXT,
851 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
852 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
853 );
854 """
856 def get_drop_table_sql(self) -> str:
857 """Get the DROP TABLE SQL statement.
859 Returns:
860 SQL statement for dropping the table
861 """
862 return f"DROP TABLE IF EXISTS {self.qualified_table}"