Coverage for src/dataknobs_data/backends/sql_base.py: 11%
374 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
1"""Base SQL functionality shared between SQL database backends."""
3from __future__ import annotations
5import json
6import uuid
7from datetime import datetime
8from typing import Any, TYPE_CHECKING
10from ..query import Operator, Query, SortOrder
11from ..records import Record
13if TYPE_CHECKING:
14 from ..query_logic import ComplexQuery
17class SQLRecordSerializer:
18 """Mixin for SQL record serialization/deserialization with vector support."""
20 @staticmethod
21 def record_to_json(record: Record) -> str:
22 """Convert a Record to JSON string for storage.
24 Handles VectorField serialization to preserve metadata.
25 """
26 from ..fields import VectorField
28 data = {}
29 for field_name, field_obj in record.fields.items():
30 # Handle VectorField - preserve full metadata
31 if isinstance(field_obj, VectorField):
32 data[field_name] = field_obj.to_dict()
33 # Handle other special fields that have to_list
34 elif hasattr(field_obj, 'to_list') and callable(field_obj.to_list):
35 data[field_name] = field_obj.to_list()
36 else:
37 data[field_name] = field_obj.value
38 return json.dumps(data)
40 @staticmethod
41 def get_vector_extraction_sql(field_name: str, dialect: str = "postgres") -> str:
42 """Get SQL expression to extract vector from JSON field.
44 Handles both raw arrays and VectorField dict formats.
46 Args:
47 field_name: Name of the vector field
48 dialect: SQL dialect (postgres, sqlite, etc.)
50 Returns:
51 SQL expression to extract vector value
52 """
53 if dialect == "postgres":
54 # PostgreSQL: Handle both formats - raw array or VectorField dict
55 return f"""CASE
56 WHEN jsonb_typeof(data->'{field_name}') = 'object'
57 THEN (data->'{field_name}'->>'value')::vector
58 ELSE (data->>'{field_name}')::vector
59 END"""
60 elif dialect == "sqlite":
61 # SQLite doesn't have native vector type, return JSON string
62 return f"""CASE
63 WHEN json_type(json_extract(data, '$.{field_name}')) = 'object'
64 THEN json_extract(data, '$.{field_name}.value')
65 ELSE json_extract(data, '$.{field_name}')
66 END"""
67 else:
68 # Generic fallback
69 return f"data->'{field_name}'"
71 @staticmethod
72 def json_to_record(data_json: str, metadata_json: str | None = None) -> Record:
73 """Convert JSON strings to a Record.
75 Reconstructs VectorField objects from serialized format.
76 """
77 from ..fields import Field, VectorField
79 data = json.loads(data_json) if data_json else {}
80 metadata = json.loads(metadata_json) if metadata_json and metadata_json != 'null' else {}
82 # Reconstruct fields properly, especially VectorFields
83 fields = {}
84 for field_name, field_value in data.items():
85 # Check if this is a serialized VectorField
86 if isinstance(field_value, dict) and field_value.get("type") == "vector":
87 # Ensure the field has a 'name' key for from_dict (in case it's missing)
88 if "name" not in field_value:
89 field_value["name"] = field_name
90 # Reconstruct VectorField from dict
91 fields[field_name] = VectorField.from_dict(field_value)
92 else:
93 # Regular field
94 fields[field_name] = Field(name=field_name, value=field_value)
96 # Create Record with properly typed fields
97 record = Record(metadata=metadata)
98 record.fields.update(fields)
99 return record
101 @staticmethod
102 def row_to_record(row: dict[str, Any]) -> Record:
103 """Convert a database row to a Record.
105 Args:
106 row: Database row as dictionary with 'id', 'data' and optional 'metadata' fields
108 Returns:
109 Reconstructed Record object with ID set
110 """
111 data_json = row.get("data", {})
112 if not isinstance(data_json, str):
113 data_json = json.dumps(data_json)
115 metadata_json = row.get("metadata")
116 if metadata_json and not isinstance(metadata_json, str):
117 metadata_json = json.dumps(metadata_json)
119 record = SQLRecordSerializer.json_to_record(data_json, metadata_json)
121 # Ensure the record has its ID set from the row
122 from ..database_utils import ensure_record_id
123 if "id" in row:
124 record = ensure_record_id(record, row["id"])
126 return record
129class SQLQueryBuilder:
130 """Builds SQL queries from Query objects."""
132 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard", param_style: str = "numeric"):
133 """Initialize the SQL query builder.
135 Args:
136 table_name: Name of the database table
137 schema_name: Optional schema name
138 dialect: SQL dialect ('postgres', 'sqlite', 'standard')
139 param_style: Parameter style ('numeric' for $1, 'qmark' for ?, 'pyformat' for %(name)s)
140 """
141 self.table_name = table_name
142 self.schema_name = schema_name
143 self.dialect = dialect
144 self.param_style = param_style
145 self.qualified_table = self._get_qualified_table_name()
147 def _get_qualified_table_name(self) -> str:
148 """Get the fully qualified table name."""
149 if self.schema_name:
150 return f"{self.schema_name}.{self.table_name}"
151 return self.table_name
153 def _get_param_placeholder(self, param_num: int, param_name: str | None = None) -> str:
154 """Get the appropriate parameter placeholder based on param_style.
156 Args:
157 param_num: Parameter number (1-based)
158 param_name: Optional parameter name for pyformat style
160 Returns:
161 Parameter placeholder string
162 """
163 if self.param_style == "numeric":
164 return f"${param_num}"
165 elif self.param_style == "qmark":
166 return "?"
167 elif self.param_style == "pyformat":
168 name = param_name or f"p{param_num - 1}" # 0-based for pyformat
169 return f"%({name})s"
170 else:
171 # Default to numeric for postgres dialect, qmark for others
172 if self.dialect == "postgres":
173 return f"${param_num}"
174 else:
175 return "?"
177 def build_create_query(self, record: Record, record_id: str | None = None) -> tuple[str, list[Any]]:
178 """Build an INSERT query for creating a record.
180 Args:
181 record: The record to insert
182 record_id: Optional ID (will generate if not provided)
184 Returns:
185 Tuple of (SQL query, parameters)
186 """
187 record_id = record_id or str(uuid.uuid4())
188 data = SQLRecordSerializer.record_to_json(record)
189 metadata = json.dumps(record.metadata) if record.metadata else None
191 p1 = self._get_param_placeholder(1)
192 p2 = self._get_param_placeholder(2)
193 p3 = self._get_param_placeholder(3)
195 query = f"""
196 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at)
197 VALUES ({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
198 """
199 if self.dialect == "postgres":
200 query += " RETURNING id"
202 params = [record_id, data, metadata]
204 return query, params
206 def build_read_query(self, record_id: str) -> tuple[str, list[Any]]:
207 """Build a SELECT query for reading a record by ID.
209 Args:
210 record_id: The record ID
212 Returns:
213 Tuple of (SQL query, parameters)
214 """
215 p1 = self._get_param_placeholder(1)
216 query = f"SELECT * FROM {self.qualified_table} WHERE id = {p1}"
218 return query, [record_id]
220 def build_update_query(self, record_id: str, record: Record) -> tuple[str, list[Any]]:
221 """Build an UPDATE query for updating a record.
223 Args:
224 record_id: The record ID
225 record: The updated record
227 Returns:
228 Tuple of (SQL query, parameters)
229 """
230 data = self._record_to_json(record)
231 metadata = json.dumps(record.metadata) if record.metadata else None
233 if self.param_style == "qmark":
234 # SQLite: data, metadata, then id
235 query = f"""
236 UPDATE {self.qualified_table}
237 SET data = ?, metadata = ?, updated_at = CURRENT_TIMESTAMP
238 WHERE id = ?
239 """
240 params = [data, metadata, record_id]
241 else:
242 # PostgreSQL: id first, then data, metadata
243 p1 = self._get_param_placeholder(1)
244 p2 = self._get_param_placeholder(2)
245 p3 = self._get_param_placeholder(3)
246 query = f"""
247 UPDATE {self.qualified_table}
248 SET data = {p2}, metadata = {p3}, updated_at = CURRENT_TIMESTAMP
249 WHERE id = {p1}
250 """
251 params = [record_id, data, metadata]
253 return query, params
255 def build_delete_query(self, record_id: str) -> tuple[str, list[Any]]:
256 """Build a DELETE query for deleting a record.
258 Args:
259 record_id: The record ID
261 Returns:
262 Tuple of (SQL query, parameters)
263 """
264 p1 = self._get_param_placeholder(1)
265 query = f"DELETE FROM {self.qualified_table} WHERE id = {p1}"
267 return query, [record_id]
269 def build_exists_query(self, record_id: str) -> tuple[str, list[Any]]:
270 """Build a query to check if a record exists.
272 Args:
273 record_id: The record ID
275 Returns:
276 Tuple of (SQL query, parameters)
277 """
278 p1 = self._get_param_placeholder(1)
279 query = f"SELECT 1 FROM {self.qualified_table} WHERE id = {p1} LIMIT 1"
281 return query, [record_id]
283 def build_complex_search_query(self, query: ComplexQuery) -> tuple[str, list[Any]]:
284 """Build a SELECT query from a ComplexQuery object with boolean logic.
286 Args:
287 query: The ComplexQuery object
289 Returns:
290 Tuple of (SQL query, parameters)
291 """
292 sql_parts = [f"SELECT * FROM {self.qualified_table}"]
293 params = []
295 # Build WHERE clause from complex conditions
296 if query.condition:
297 where_clause, where_params = self._build_complex_condition(query.condition, 1)
298 if where_clause:
299 sql_parts.append(f"WHERE {where_clause}")
300 params.extend(where_params)
302 # Add ORDER BY
303 if query.sort_specs:
304 order_parts = []
305 for sort_spec in query.sort_specs:
306 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC"
307 if self.dialect == "postgres":
308 order_parts.append(f"data->'{sort_spec.field}' {direction}")
309 elif self.dialect == "sqlite":
310 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}")
311 else:
312 order_parts.append(f"data {direction}")
313 sql_parts.append("ORDER BY " + ", ".join(order_parts))
315 # Add LIMIT and OFFSET
316 if query.limit_value:
317 sql_parts.append(f"LIMIT {query.limit_value}")
318 if query.offset_value:
319 sql_parts.append(f"OFFSET {query.offset_value}")
321 return " ".join(sql_parts), params
323 def _build_complex_condition(self, condition: Any, param_start: int) -> tuple[str, list[Any]]:
324 """Build WHERE clause for complex boolean logic conditions.
326 Args:
327 condition: The Condition object (LogicCondition or FilterCondition)
328 param_start: Starting parameter number
330 Returns:
331 Tuple of (SQL clause, parameters)
332 """
333 from ..query_logic import FilterCondition, LogicCondition, LogicOperator
335 params = []
337 # Handle FilterCondition (leaf node)
338 if isinstance(condition, FilterCondition):
339 clause, filter_params = self._build_filter_clause(condition.filter, param_start)
340 return clause, filter_params
342 # Handle LogicCondition (branch node)
343 elif isinstance(condition, LogicCondition):
344 if condition.operator == LogicOperator.AND:
345 clauses = []
346 current_param = param_start
347 for sub_condition in condition.conditions:
348 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param)
349 if sub_clause:
350 clauses.append(sub_clause)
351 params.extend(sub_params)
352 current_param += len(sub_params)
353 return (f"({' AND '.join(clauses)})", params) if clauses else ("", [])
355 elif condition.operator == LogicOperator.OR:
356 clauses = []
357 current_param = param_start
358 for sub_condition in condition.conditions:
359 sub_clause, sub_params = self._build_complex_condition(sub_condition, current_param)
360 if sub_clause:
361 clauses.append(sub_clause)
362 params.extend(sub_params)
363 current_param += len(sub_params)
364 return (f"({' OR '.join(clauses)})", params) if clauses else ("", [])
366 elif condition.operator == LogicOperator.NOT:
367 sub_clause, sub_params = self._build_complex_condition(condition.conditions[0], param_start)
368 params.extend(sub_params)
369 return (f"NOT ({sub_clause})", params) if sub_clause else ("", [])
371 return ("", [])
373 def build_where_clause(self, query: Query | None, param_start: int = 1) -> tuple[str, list[Any]]:
374 """Build just the WHERE clause from a Query object.
376 Args:
377 query: The Query object (can be None)
378 param_start: Starting parameter number for placeholders
380 Returns:
381 Tuple of (WHERE clause SQL, parameters)
382 Returns empty string and empty list if no filters
383 """
384 if not query or not query.filters:
385 return "", []
387 where_clauses = []
388 params = []
389 param_count = param_start - 1
391 for filter_spec in query.filters:
392 param_count += 1
393 clause, new_params = self._build_filter_clause(filter_spec, param_count)
394 where_clauses.append(clause)
395 params.extend(new_params)
396 param_count += len(new_params) - 1 # Adjust for multiple params
398 if where_clauses:
399 return " AND " + " AND ".join(where_clauses), params
400 return "", []
402 def build_search_query(self, query: Query) -> tuple[str, list[Any]]:
403 """Build a SELECT query from a Query object.
405 Args:
406 query: The Query object
408 Returns:
409 Tuple of (SQL query, parameters)
410 """
411 sql_parts = [f"SELECT * FROM {self.qualified_table}"]
412 params = []
413 param_count = 0
415 # Build WHERE clause
416 where_clauses = []
417 for filter_spec in query.filters:
418 param_count += 1
419 clause, new_params = self._build_filter_clause(filter_spec, param_count)
420 where_clauses.append(clause)
421 params.extend(new_params)
422 param_count += len(new_params) - 1 # Adjust for multiple params
424 if where_clauses:
425 sql_parts.append("WHERE " + " AND ".join(where_clauses))
427 # Add ORDER BY
428 if query.sort_specs:
429 order_parts = []
430 for sort_spec in query.sort_specs:
431 direction = "DESC" if sort_spec.order == SortOrder.DESC else "ASC"
432 # Special handling for 'id' field - it's in a separate column
433 if sort_spec.field == 'id':
434 order_parts.append(f"id {direction}")
435 # Use JSON extraction based on dialect
436 elif self.dialect == "postgres":
437 order_parts.append(f"data->'{sort_spec.field}' {direction}")
438 elif self.dialect == "sqlite":
439 order_parts.append(f"json_extract(data, '$.{sort_spec.field}') {direction}")
440 else:
441 order_parts.append(f"data {direction}") # Fallback
442 sql_parts.append("ORDER BY " + ", ".join(order_parts))
444 # Add LIMIT and OFFSET
445 if query.limit_value:
446 sql_parts.append(f"LIMIT {query.limit_value}")
447 if query.offset_value:
448 sql_parts.append(f"OFFSET {query.offset_value}")
450 return " ".join(sql_parts), params
452 def build_batch_update_query(self, updates: list[tuple[str, Record]]) -> tuple[str, list[Any]]:
453 """Build a batch UPDATE query using CASE expressions.
455 This provides efficient batch updates for both PostgreSQL and SQLite.
457 Args:
458 updates: List of (id, record) tuples to update
460 Returns:
461 Tuple of (SQL query, parameters)
462 """
463 if not updates:
464 return "", []
466 update_ids = []
467 data_cases = []
468 metadata_cases = []
469 params = []
471 # Build CASE expressions
472 for i, (record_id, record) in enumerate(updates):
473 update_ids.append(record_id)
474 data_json = self._record_to_json(record)
475 metadata_json = json.dumps(record.metadata) if record.metadata else None
477 # Generate placeholders for this update
478 if self.param_style == "qmark":
479 # SQLite uses ? placeholders and needs repeated IDs
480 data_cases.append("WHEN id = ? THEN ?")
481 metadata_cases.append("WHEN id = ? THEN ?")
482 params.extend([record_id, data_json, record_id, metadata_json])
483 else:
484 # PostgreSQL uses numbered/named placeholders
485 param_idx = i * 3 + 1
486 p1 = self._get_param_placeholder(param_idx)
487 p2 = self._get_param_placeholder(param_idx + 1)
488 p3 = self._get_param_placeholder(param_idx + 2)
489 data_cases.append(f"WHEN id = {p1} THEN {p2}")
490 metadata_cases.append(f"WHEN id = {p1} THEN {p3}")
491 params.extend([record_id, data_json, metadata_json])
493 # Build WHERE IN clause
494 id_param_start = len(updates) * 3 + 1 if self.param_style != "qmark" else 0
495 if self.param_style == "qmark":
496 # SQLite: add IDs for WHERE IN clause
497 id_placeholders = ["?" for _ in update_ids]
498 else:
499 # PostgreSQL with numbered/named placeholders
500 id_placeholders = [self._get_param_placeholder(i) for i in range(id_param_start, id_param_start + len(update_ids))]
501 params.extend(update_ids)
503 # Build the UPDATE query
504 # Add ELSE to preserve original value when no CASE matches
505 query = f"""
506 UPDATE {self.qualified_table}
507 SET
508 data = CASE {' '.join(data_cases)} ELSE data END,
509 metadata = CASE {' '.join(metadata_cases)} ELSE metadata END,
510 updated_at = CURRENT_TIMESTAMP
511 WHERE id IN ({', '.join(id_placeholders)})
512 """
514 # PostgreSQL can use RETURNING
515 if self.dialect == "postgres":
516 query += " RETURNING id"
518 return query, params
520 def build_batch_create_query(self, records: list[Record]) -> tuple[str, list[Any], list[str]]:
521 """Build a batch INSERT query for multiple records.
523 Generates efficient multi-value INSERT statements.
525 Args:
526 records: List of records to insert
528 Returns:
529 Tuple of (SQL query, parameters, generated IDs)
530 """
531 if not records:
532 return "", [], []
534 import uuid
536 # Generate IDs and prepare values
537 ids = []
538 values_clauses = []
539 params = []
541 for i, record in enumerate(records):
542 record_id = str(uuid.uuid4())
543 ids.append(record_id)
544 data_json = self._record_to_json(record)
545 metadata_json = json.dumps(record.metadata) if record.metadata else None
547 # Generate placeholders for this row
548 param_idx = i * 3 + 1
549 p1 = self._get_param_placeholder(param_idx)
550 p2 = self._get_param_placeholder(param_idx + 1)
551 p3 = self._get_param_placeholder(param_idx + 2)
552 values_clauses.append(f"({p1}, {p2}, {p3}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
553 params.extend([record_id, data_json, metadata_json])
555 # Build the INSERT query
556 query = f"""
557 INSERT INTO {self.qualified_table} (id, data, metadata, created_at, updated_at)
558 VALUES {', '.join(values_clauses)}
559 """
561 # PostgreSQL can use RETURNING
562 if self.dialect == "postgres":
563 query += " RETURNING id"
565 return query, params, ids
567 def build_batch_delete_query(self, ids: list[str]) -> tuple[str, list[Any]]:
568 """Build a batch DELETE query for multiple records.
570 Deletes multiple records in a single query.
572 Args:
573 ids: List of record IDs to delete
575 Returns:
576 Tuple of (SQL query, parameters)
577 """
578 if not ids:
579 return "", []
581 # Generate placeholders for IDs
582 placeholders = [self._get_param_placeholder(i) for i in range(1, len(ids) + 1)]
584 query = f"""
585 DELETE FROM {self.qualified_table}
586 WHERE id IN ({', '.join(placeholders)})
587 """
589 # PostgreSQL can use RETURNING
590 if self.dialect == "postgres":
591 query += " RETURNING id"
593 return query, ids
595 def build_count_query(self, query: Query | None = None) -> tuple[str, list[Any]]:
596 """Build a COUNT query.
598 Args:
599 query: Optional Query object for filtering
601 Returns:
602 Tuple of (SQL query, parameters)
603 """
604 if query and query.filters:
605 search_query, params = self.build_search_query(query)
606 # Replace SELECT * with SELECT COUNT(*)
607 count_query = search_query.replace("SELECT *", "SELECT COUNT(*)", 1)
608 # Remove ORDER BY, LIMIT, OFFSET clauses
609 for clause in ["ORDER BY", "LIMIT", "OFFSET"]:
610 if clause in count_query:
611 count_query = count_query[:count_query.index(clause)]
612 return count_query.strip(), params
613 else:
614 return f"SELECT COUNT(*) FROM {self.qualified_table}", []
616 def _build_filter_clause(self, filter_spec: Any, param_start: int) -> tuple[str, list[Any]]:
617 """Build a WHERE clause for a filter.
619 Args:
620 filter_spec: The filter specification
621 param_start: Starting parameter number
623 Returns:
624 Tuple of (SQL clause, parameters)
625 """
626 field = filter_spec.field
627 op = filter_spec.operator
628 value = filter_spec.value
630 # Special handling for 'id' field - it's in a separate column
631 if field == 'id':
632 field_expr = 'id'
633 param_placeholder = self._get_param_placeholder(param_start)
634 # JSON field extraction with type casting for PostgreSQL
635 elif self.dialect == "postgres":
636 # For PostgreSQL, we need to cast JSONB text to appropriate types for comparisons
637 base_field_expr = f"data->>'{field}'"
639 # Determine if we need type casting based on operator and value type
640 if op in [Operator.GT, Operator.GTE, Operator.LT, Operator.LTE, Operator.BETWEEN, Operator.NOT_BETWEEN]:
641 # These operators need numeric comparison
642 if isinstance(value, (int, float)) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], (int, float))):
643 field_expr = f"({base_field_expr})::numeric"
644 elif isinstance(value, datetime) or (isinstance(value, (list, tuple)) and len(value) > 0 and isinstance(value[0], datetime)):
645 field_expr = f"({base_field_expr})::timestamp"
646 else:
647 field_expr = base_field_expr
648 elif op in [Operator.EQ, Operator.NEQ, Operator.IN, Operator.NOT_IN]:
649 # For equality and IN operations, cast based on value type
650 if isinstance(value, bool):
651 field_expr = f"({base_field_expr})::boolean"
652 elif isinstance(value, (int, float)):
653 field_expr = f"({base_field_expr})::numeric"
654 elif isinstance(value, list) and value and isinstance(value[0], (int, float)):
655 # IN/NOT_IN with numeric values
656 field_expr = f"({base_field_expr})::numeric"
657 else:
658 field_expr = base_field_expr
659 else:
660 field_expr = base_field_expr
662 param_placeholder = self._get_param_placeholder(param_start)
663 elif self.dialect == "sqlite":
664 field_expr = f"json_extract(data, '$.{field}')"
665 param_placeholder = self._get_param_placeholder(param_start)
666 else:
667 field_expr = field
668 param_placeholder = self._get_param_placeholder(param_start)
670 # Build clause based on operator
671 if op == Operator.EQ:
672 return f"{field_expr} = {param_placeholder}", [value]
673 elif op == Operator.NEQ:
674 return f"{field_expr} != {param_placeholder}", [value]
675 elif op == Operator.GT:
676 return f"{field_expr} > {param_placeholder}", [value]
677 elif op == Operator.GTE:
678 return f"{field_expr} >= {param_placeholder}", [value]
679 elif op == Operator.LT:
680 return f"{field_expr} < {param_placeholder}", [value]
681 elif op == Operator.LTE:
682 return f"{field_expr} <= {param_placeholder}", [value]
683 elif op == Operator.LIKE:
684 return f"{field_expr} LIKE {param_placeholder}", [value]
685 elif op == Operator.NOT_LIKE:
686 return f"{field_expr} NOT LIKE {param_placeholder}", [value]
687 elif op == Operator.IN:
688 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))])
689 return f"{field_expr} IN ({placeholders})", list(value)
690 elif op == Operator.NOT_IN:
691 placeholders = ", ".join([self._get_param_placeholder(i) for i in range(param_start, param_start + len(value))])
692 return f"{field_expr} NOT IN ({placeholders})", list(value)
693 elif op == Operator.BETWEEN:
694 placeholder1 = self._get_param_placeholder(param_start)
695 placeholder2 = self._get_param_placeholder(param_start + 1)
696 return f"{field_expr} BETWEEN {placeholder1} AND {placeholder2}", list(value)
697 elif op == Operator.NOT_BETWEEN:
698 placeholder1 = self._get_param_placeholder(param_start)
699 placeholder2 = self._get_param_placeholder(param_start + 1)
700 return f"{field_expr} NOT BETWEEN {placeholder1} AND {placeholder2}", list(value)
701 elif op == Operator.EXISTS:
702 return f"{field_expr} IS NOT NULL", []
703 elif op == Operator.NOT_EXISTS:
704 return f"{field_expr} IS NULL", []
705 elif op == Operator.REGEX:
706 # PostgreSQL uses ~ for regex, SQLite would use REGEXP
707 if self.dialect == "postgres":
708 return f"{field_expr} ~ {param_placeholder}", [value]
709 else:
710 return f"{field_expr} REGEXP {param_placeholder}", [value]
711 else:
712 raise ValueError(f"Unsupported operator: {op}")
714 def _record_to_json(self, record: Record) -> str:
715 """Convert a Record to JSON string for storage."""
716 return SQLRecordSerializer.record_to_json(record)
718 @staticmethod
719 def row_to_record(row: dict[str, Any]) -> Record:
720 """Convert a database row to a Record.
722 Args:
723 row: Database row as dictionary
725 Returns:
726 Record object
727 """
728 return SQLRecordSerializer.row_to_record(row)
731class SQLTableManager:
732 """Manages SQL table creation and schema."""
734 def __init__(self, table_name: str, schema_name: str | None = None, dialect: str = "standard"):
735 """Initialize the table manager.
737 Args:
738 table_name: Name of the database table
739 schema_name: Optional schema name
740 dialect: SQL dialect ('postgres', 'sqlite', 'standard')
741 """
742 self.table_name = table_name
743 self.schema_name = schema_name
744 self.dialect = dialect
745 self.qualified_table = self._get_qualified_table_name()
747 def _get_qualified_table_name(self) -> str:
748 """Get the fully qualified table name."""
749 if self.schema_name:
750 return f"{self.schema_name}.{self.table_name}"
751 return self.table_name
753 def get_create_table_sql(self) -> str:
754 """Get the CREATE TABLE SQL statement.
756 Returns:
757 SQL statement for creating the table
758 """
759 if self.dialect == "postgres":
760 return f"""
761 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
762 id VARCHAR(255) PRIMARY KEY,
763 data JSONB NOT NULL,
764 metadata JSONB,
765 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
766 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
767 );
769 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_data
770 ON {self.qualified_table} USING GIN (data);
772 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_metadata
773 ON {self.qualified_table} USING GIN (metadata);
774 """
775 elif self.dialect == "sqlite":
776 # SQLite doesn't have JSONB, uses TEXT for JSON storage
777 return f"""
778 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
779 id VARCHAR(255) PRIMARY KEY,
780 data TEXT NOT NULL CHECK (json_valid(data)),
781 metadata TEXT CHECK (metadata IS NULL OR json_valid(metadata)),
782 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
783 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
784 );
786 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_created
787 ON {self.qualified_table} (created_at);
789 CREATE INDEX IF NOT EXISTS idx_{self.table_name}_updated
790 ON {self.qualified_table} (updated_at);
791 """
792 else:
793 # Generic SQL
794 return f"""
795 CREATE TABLE IF NOT EXISTS {self.qualified_table} (
796 id VARCHAR(255) PRIMARY KEY,
797 data TEXT NOT NULL,
798 metadata TEXT,
799 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
800 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
801 );
802 """
804 def get_drop_table_sql(self) -> str:
805 """Get the DROP TABLE SQL statement.
807 Returns:
808 SQL statement for dropping the table
809 """
810 return f"DROP TABLE IF EXISTS {self.qualified_table}"