Skip to content

Database API

The Database class provides a synchronous interface for interacting with CockroachDB. It handles connection management, SQL execution, and transaction support.

Class Documentation

Database

Main interface for CockroachDB operations.

Source code in manticore_cockroachdb/database.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
class Database:
    """Main interface for CockroachDB operations."""

    def __init__(
        self,
        database: str = "defaultdb",
        host: str = "localhost",
        port: int = 26257,
        user: str = "root",
        password: Optional[str] = None,
        sslmode: str = "disable",
        min_connections: int = 2,
        max_connections: int = 10,
        application_name: str = "manticore-db",
        connect_timeout: int = 30,
        connect: bool = True
    ):
        """Initialize database connection.

        Args:
            database: Database name
            host: Database host
            port: Database port
            user: Database user
            password: Database password
            sslmode: SSL mode (disable, verify-ca, verify-full)
            min_connections: Minimum number of connections in pool
            max_connections: Maximum number of connections in pool
            application_name: Application name for connection
            connect_timeout: Connection timeout in seconds
            connect: Whether to connect immediately
        """
        self.database = database
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.sslmode = sslmode
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.application_name = application_name
        self.connect_timeout = connect_timeout

        # Build connection string
        self.dsn = self._build_dsn()

        # Initialize connection pool
        self._pool = None

        # Connect to database if requested
        if connect:
            self.connect()

    def _build_dsn(self) -> str:
        """Build DSN string for database connection."""
        dsn = f"postgresql://{self.user}"
        if self.password:
            dsn += f":{self.password}"
        dsn += f"@{self.host}:{self.port}/{self.database}"
        dsn += f"?sslmode={self.sslmode}&application_name={self.application_name}&connect_timeout={self.connect_timeout}"
        return dsn

    def connect(self) -> None:
        """Connect to database."""
        if self._pool is not None:
            return

        try:
            dsn = self._build_dsn()
            self._pool = ConnectionPool(
                dsn,
                min_size=self.min_connections,
                max_size=self.max_connections,
                kwargs={"row_factory": dict_row}
            )

            # Test the connection by getting a connection from the pool
            with self._pool.connection() as conn:
                with conn.cursor() as cur:
                    cur.execute("SELECT 1")

            logger.info(f"Connected to database {self.database} with pool size {self.min_connections}-{self.max_connections}")
        except Exception as e:
            from .crud.exceptions import ConnectionError
            self._pool = None
            raise ConnectionError(f"Failed to connect to {self.host}:{self.port}", cause=e)

    def close(self) -> None:
        """Close database connection."""
        if self._pool is not None:
            self._pool.close()
            logger.info("Closed database connection pool")
            self._pool = None

    def execute(
        self,
        query: str,
        params: Optional[tuple] = None,
        fetch: bool = True
    ) -> Optional[List[Dict[str, Any]]]:
        """Execute a SQL query.

        Args:
            query: SQL query
            params: Query parameters
            fetch: Whether to fetch results

        Returns:
            Query results or None

        Raises:
            DatabaseError: If query execution fails
        """
        from .crud.exceptions import DatabaseError

        if self._pool is None:
            self.connect()

        conn = self._pool.getconn()
        try:
            with conn.cursor() as cur:
                try:
                    cur.execute(query, params)
                    if fetch:
                        try:
                            results = cur.fetchall()
                            conn.commit()
                            return results
                        except psycopg.ProgrammingError:
                            # No results to fetch (e.g. for DDL statements)
                            conn.commit()
                            return None
                    conn.commit()
                    return None
                except Exception as e:
                    conn.rollback()
                    # Wrap the exception with our DatabaseError
                    raise DatabaseError(f"Query execution failed: {str(e)}") from e
        finally:
            self._pool.putconn(conn)

    def create_database(self, name: str) -> None:
        """Create a new database.

        Args:
            name: Database name
        """
        self.execute(f'CREATE DATABASE IF NOT EXISTS "{name}"', fetch=False)

    def drop_database(self, name: str) -> None:
        """Drop a database.

        Args:
            name: Database name
        """
        self.execute(f'DROP DATABASE IF EXISTS "{name}"', fetch=False)

    def create_table(
        self,
        name: str,
        columns: Dict[str, str],
        if_not_exists: bool = True
    ) -> None:
        """Create a new table.

        Args:
            name: Table name
            columns: Column definitions {name: type}
            if_not_exists: Whether to create table only if it doesn't exist
        """
        exists_clause = "IF NOT EXISTS " if if_not_exists else ""
        column_defs = ", ".join([f'"{col}" {dtype}' for col, dtype in columns.items()])
        query = f'CREATE TABLE {exists_clause}"{name}" ({column_defs})'
        self.execute(query, fetch=False)

    def drop_table(self, name: str, if_exists: bool = True) -> None:
        """Drop a table.

        Args:
            name: Table name
            if_exists: Whether to drop table only if it exists
        """
        exists_clause = "IF EXISTS " if if_exists else ""
        self.execute(f'DROP TABLE {exists_clause}"{name}"', fetch=False)

    def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Insert a record into a table.

        Args:
            table: Table name
            data: Record data

        Returns:
            Inserted record
        """
        from .crud.exceptions import DatabaseError

        # Handle empty data case
        if not data:
            query = f"""
                INSERT INTO "{table}" DEFAULT VALUES
                RETURNING *
            """
            try:
                results = self.execute(query)
                return results[0] if results else None
            except DatabaseError as e:
                # Check for unique violation
                if isinstance(e.__cause__, UniqueViolation):
                    # Re-raise with more specific message
                    constraint = str(e.__cause__).split("constraint")[1].split()[0].strip('"\'')
                    raise DatabaseError(f"Unique constraint violation: {constraint}") from e.__cause__
                raise

        columns = [f'"{k}"' for k in data.keys()]
        placeholders = ["%s" for _ in range(len(data))]
        values = list(data.values())

        query = f"""
            INSERT INTO "{table}" ({", ".join(columns)})
            VALUES ({", ".join(placeholders)})
            RETURNING *
        """

        try:
            results = self.execute(query, tuple(values))
            return results[0] if results else None
        except DatabaseError as e:
            # Check for unique violation
            if isinstance(e.__cause__, UniqueViolation):
                # Re-raise with more specific message
                constraint = str(e.__cause__).split("constraint")[1].split()[0].strip('"\'')
                raise DatabaseError(f"Unique constraint violation: {constraint}") from e.__cause__
            raise

    def select(
        self,
        table: str,
        columns: Optional[List[str]] = None,
        where: Optional[Dict[str, Any]] = None,
        order_by: Optional[str] = None,
        limit: Optional[int] = None,
        offset: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """Select records from a table.

        Args:
            table: Table name
            columns: Columns to select
            where: Filter conditions
            order_by: Order by clause
            limit: Result limit
            offset: Result offset

        Returns:
            Selected records
        """
        # Build columns clause
        if columns:
            columns_clause = ", ".join([f'"{col}"' for col in columns])
        else:
            columns_clause = "*"

        # Build query
        query = f'SELECT {columns_clause} FROM "{table}"'

        # Add where clause
        params = []
        if where:
            conditions = []
            for key in where:
                conditions.append(f'"{key}" = %s')
                params.append(where[key])
            query += f" WHERE {' AND '.join(conditions)}"

        # Add order by clause
        if order_by:
            query += f" ORDER BY {order_by}"

        # Add limit clause
        if limit:
            query += f" LIMIT {limit}"

        # Add offset clause
        if offset:
            query += f" OFFSET {offset}"

        # Execute query
        return self.execute(query, tuple(params) if params else None) or []

    def update(
        self,
        table: str,
        data: Dict[str, Any],
        where: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Update records in a table.

        Args:
            table: Table name
            data: Update data
            where: Filter conditions

        Returns:
            Updated record
        """
        # Build set clause
        set_items = []
        params = []
        for i, (key, value) in enumerate(data.items()):
            set_items.append(f'"{key}" = %s')
            params.append(value)

        # Build where clause
        conditions = []
        for i, (key, value) in enumerate(where.items()):
            conditions.append(f'"{key}" = %s')
            params.append(value)

        # Build query
        query = f"""
            UPDATE "{table}"
            SET {", ".join(set_items)}
            WHERE {" AND ".join(conditions)}
            RETURNING *
        """

        # Execute query
        results = self.execute(query, tuple(params))
        return results[0] if results else None

    def delete(
        self,
        table: str,
        where: Dict[str, Any]
    ) -> bool:
        """Delete records from a table.

        Args:
            table: Table name
            where: Filter conditions

        Returns:
            True if records were deleted
        """
        # Build where clause
        conditions = []
        params = []
        for i, (key, value) in enumerate(where.items()):
            conditions.append(f'"{key}" = %s')
            params.append(value)

        # Build query
        query = f"""
            DELETE FROM "{table}"
            WHERE {" AND ".join(conditions)}
            RETURNING id
        """

        # Execute query
        results = self.execute(query, tuple(params))
        return bool(results)

    @classmethod
    def from_url(cls, url: str, connect: bool = False) -> 'Database':
        """Create database from URL.

        Args:
            url: Database URL in format postgresql://user:password@host:port/dbname?sslmode=mode
            connect: Whether to connect immediately

        Returns:
            Database instance
        """
        parsed = urlparse(url)

        # Extract components
        host = parsed.hostname or "localhost"
        port = parsed.port or 26257
        user = parsed.username or "root"
        password = parsed.password
        database = parsed.path.lstrip("/") or "defaultdb"

        # Parse query parameters
        query_params = parse_qs(parsed.query)
        sslmode = query_params.get("sslmode", ["disable"])[0]

        # Create database instance
        db = cls(
            database=database,
            host=host,
            port=port,
            user=user,
            password=password,
            sslmode=sslmode,
            connect=connect
        )

        return db

    def __enter__(self) -> 'Database':
        """Enter context manager."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit context manager."""
        self.close()

    def transaction(self) -> Transaction:
        """Create a transaction context manager.

        Returns:
            Transaction context manager
        """
        if self._pool is None:
            self.connect()

        conn = self._pool.getconn()
        return Transaction(conn)

    def run_in_transaction(
        self,
        operation: Callable[[psycopg.Connection], T],
        max_retries: int = 3
    ) -> T:
        """Run an operation in a transaction with retry logic.

        Args:
            operation: Operation to run
            max_retries: Maximum number of retries

        Returns:
            Operation result
        """
        from .crud.exceptions import DatabaseError

        if self._pool is None:
            self.connect()

        conn = self._pool.getconn()
        try:
            retry_count = 0
            while True:
                try:
                    with Transaction(conn) as tx:
                        return operation(tx)
                except SerializationFailure:
                    # Only retry on serialization failures
                    if retry_count >= max_retries:
                        raise DatabaseError(f"Transaction failed after {max_retries} retries")

                    retry_count += 1
                    # Exponential backoff with jitter
                    delay = (0.1 * 2 ** retry_count) + (random.random() * 0.1)
                    logger.warning(f"Serialization failure, retrying in {delay:.2f}s")
                    time.sleep(delay)
                except Exception as e:
                    raise DatabaseError(f"Transaction failed: {str(e)}") from e
        finally:
            self._pool.putconn(conn)

    def batch_insert(
        self,
        table: str,
        records: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Insert multiple records into a table.

        Args:
            table: Table name
            records: Records to insert

        Returns:
            Inserted records
        """
        if not records:
            return []

        # Ensure all records have the same keys
        keys = set(records[0].keys())
        for record in records[1:]:
            if set(record.keys()) != keys:
                raise ValueError("All records must have the same keys")

        def insert_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
            columns = [f'"{k}"' for k in keys]
            placeholders = []
            values = []

            for i, record in enumerate(records):
                record_placeholders = ["%s" for _ in range(len(keys))]
                placeholders.append(f"({', '.join(record_placeholders)})")
                values.extend([record[k] for k in keys])

            query = f"""
                INSERT INTO "{table}" ({", ".join(columns)})
                VALUES {", ".join(placeholders)}
                RETURNING *
            """

            with conn.cursor(row_factory=dict_row) as cur:
                cur.execute(query, tuple(values))
                return cur.fetchall()

        return self.run_in_transaction(insert_batch)

    def batch_update(
        self,
        table: str,
        records: List[Dict[str, Any]],
        key_column: str = "id"
    ) -> List[Dict[str, Any]]:
        """Update multiple records in a table.

        Args:
            table: Table name
            records: Records to update
            key_column: Primary key column

        Returns:
            Updated records
        """
        if not records:
            return []

        # Ensure all records have the key column
        for record in records:
            if key_column not in record:
                raise ValueError(f"All records must have the key column '{key_column}'")

        def update_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
            updated_records = []

            for record in records:
                key_value = record[key_column]
                update_data = {k: v for k, v in record.items() if k != key_column}

                set_items = []
                params = []
                for i, (key, value) in enumerate(update_data.items(), 1):
                    set_items.append(f'"{key}" = %s')
                    params.append(value)

                query = f"""
                    UPDATE "{table}"
                    SET {", ".join(set_items)}
                    WHERE "{key_column}" = %s
                    RETURNING *
                """
                params.append(key_value)

                with conn.cursor(row_factory=dict_row) as cur:
                    cur.execute(query, tuple(params))
                    result = cur.fetchone()
                    if result:
                        updated_records.append(result)

            return updated_records

        return self.run_in_transaction(update_batch)

    def exists(self, table_name: str) -> bool:
        """Check if a table exists.

        Args:
            table_name: Table name

        Returns:
            True if table exists
        """
        query = """
            SELECT COUNT(*) as count
            FROM information_schema.tables
            WHERE table_name = %s
            AND table_schema = 'public'
        """
        result = self.execute(query, (table_name,))
        return result[0]["count"] > 0 if result else False 

Functions

__init__(database='defaultdb', host='localhost', port=26257, user='root', password=None, sslmode='disable', min_connections=2, max_connections=10, application_name='manticore-db', connect_timeout=30, connect=True)

Initialize database connection.

Parameters:

Name Type Description Default
database str

Database name

'defaultdb'
host str

Database host

'localhost'
port int

Database port

26257
user str

Database user

'root'
password Optional[str]

Database password

None
sslmode str

SSL mode (disable, verify-ca, verify-full)

'disable'
min_connections int

Minimum number of connections in pool

2
max_connections int

Maximum number of connections in pool

10
application_name str

Application name for connection

'manticore-db'
connect_timeout int

Connection timeout in seconds

30
connect bool

Whether to connect immediately

True
Source code in manticore_cockroachdb/database.py
def __init__(
    self,
    database: str = "defaultdb",
    host: str = "localhost",
    port: int = 26257,
    user: str = "root",
    password: Optional[str] = None,
    sslmode: str = "disable",
    min_connections: int = 2,
    max_connections: int = 10,
    application_name: str = "manticore-db",
    connect_timeout: int = 30,
    connect: bool = True
):
    """Initialize database connection.

    Args:
        database: Database name
        host: Database host
        port: Database port
        user: Database user
        password: Database password
        sslmode: SSL mode (disable, verify-ca, verify-full)
        min_connections: Minimum number of connections in pool
        max_connections: Maximum number of connections in pool
        application_name: Application name for connection
        connect_timeout: Connection timeout in seconds
        connect: Whether to connect immediately
    """
    self.database = database
    self.host = host
    self.port = port
    self.user = user
    self.password = password
    self.sslmode = sslmode
    self.min_connections = min_connections
    self.max_connections = max_connections
    self.application_name = application_name
    self.connect_timeout = connect_timeout

    # Build connection string
    self.dsn = self._build_dsn()

    # Initialize connection pool
    self._pool = None

    # Connect to database if requested
    if connect:
        self.connect()
_build_dsn()

Build DSN string for database connection.

Source code in manticore_cockroachdb/database.py
def _build_dsn(self) -> str:
    """Build DSN string for database connection."""
    dsn = f"postgresql://{self.user}"
    if self.password:
        dsn += f":{self.password}"
    dsn += f"@{self.host}:{self.port}/{self.database}"
    dsn += f"?sslmode={self.sslmode}&application_name={self.application_name}&connect_timeout={self.connect_timeout}"
    return dsn
connect()

Connect to database.

Source code in manticore_cockroachdb/database.py
def connect(self) -> None:
    """Connect to database."""
    if self._pool is not None:
        return

    try:
        dsn = self._build_dsn()
        self._pool = ConnectionPool(
            dsn,
            min_size=self.min_connections,
            max_size=self.max_connections,
            kwargs={"row_factory": dict_row}
        )

        # Test the connection by getting a connection from the pool
        with self._pool.connection() as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT 1")

        logger.info(f"Connected to database {self.database} with pool size {self.min_connections}-{self.max_connections}")
    except Exception as e:
        from .crud.exceptions import ConnectionError
        self._pool = None
        raise ConnectionError(f"Failed to connect to {self.host}:{self.port}", cause=e)
close()

Close database connection.

Source code in manticore_cockroachdb/database.py
def close(self) -> None:
    """Close database connection."""
    if self._pool is not None:
        self._pool.close()
        logger.info("Closed database connection pool")
        self._pool = None
execute(query, params=None, fetch=True)

Execute a SQL query.

Parameters:

Name Type Description Default
query str

SQL query

required
params Optional[tuple]

Query parameters

None
fetch bool

Whether to fetch results

True

Returns:

Type Description
Optional[List[Dict[str, Any]]]

Query results or None

Raises:

Type Description
DatabaseError

If query execution fails

Source code in manticore_cockroachdb/database.py
def execute(
    self,
    query: str,
    params: Optional[tuple] = None,
    fetch: bool = True
) -> Optional[List[Dict[str, Any]]]:
    """Execute a SQL query.

    Args:
        query: SQL query
        params: Query parameters
        fetch: Whether to fetch results

    Returns:
        Query results or None

    Raises:
        DatabaseError: If query execution fails
    """
    from .crud.exceptions import DatabaseError

    if self._pool is None:
        self.connect()

    conn = self._pool.getconn()
    try:
        with conn.cursor() as cur:
            try:
                cur.execute(query, params)
                if fetch:
                    try:
                        results = cur.fetchall()
                        conn.commit()
                        return results
                    except psycopg.ProgrammingError:
                        # No results to fetch (e.g. for DDL statements)
                        conn.commit()
                        return None
                conn.commit()
                return None
            except Exception as e:
                conn.rollback()
                # Wrap the exception with our DatabaseError
                raise DatabaseError(f"Query execution failed: {str(e)}") from e
    finally:
        self._pool.putconn(conn)
create_database(name)

Create a new database.

Parameters:

Name Type Description Default
name str

Database name

required
Source code in manticore_cockroachdb/database.py
def create_database(self, name: str) -> None:
    """Create a new database.

    Args:
        name: Database name
    """
    self.execute(f'CREATE DATABASE IF NOT EXISTS "{name}"', fetch=False)
drop_database(name)

Drop a database.

Parameters:

Name Type Description Default
name str

Database name

required
Source code in manticore_cockroachdb/database.py
def drop_database(self, name: str) -> None:
    """Drop a database.

    Args:
        name: Database name
    """
    self.execute(f'DROP DATABASE IF EXISTS "{name}"', fetch=False)
create_table(name, columns, if_not_exists=True)

Create a new table.

Parameters:

Name Type Description Default
name str

Table name

required
columns Dict[str, str]

Column definitions {name: type}

required
if_not_exists bool

Whether to create table only if it doesn't exist

True
Source code in manticore_cockroachdb/database.py
def create_table(
    self,
    name: str,
    columns: Dict[str, str],
    if_not_exists: bool = True
) -> None:
    """Create a new table.

    Args:
        name: Table name
        columns: Column definitions {name: type}
        if_not_exists: Whether to create table only if it doesn't exist
    """
    exists_clause = "IF NOT EXISTS " if if_not_exists else ""
    column_defs = ", ".join([f'"{col}" {dtype}' for col, dtype in columns.items()])
    query = f'CREATE TABLE {exists_clause}"{name}" ({column_defs})'
    self.execute(query, fetch=False)
drop_table(name, if_exists=True)

Drop a table.

Parameters:

Name Type Description Default
name str

Table name

required
if_exists bool

Whether to drop table only if it exists

True
Source code in manticore_cockroachdb/database.py
def drop_table(self, name: str, if_exists: bool = True) -> None:
    """Drop a table.

    Args:
        name: Table name
        if_exists: Whether to drop table only if it exists
    """
    exists_clause = "IF EXISTS " if if_exists else ""
    self.execute(f'DROP TABLE {exists_clause}"{name}"', fetch=False)
insert(table, data)

Insert a record into a table.

Parameters:

Name Type Description Default
table str

Table name

required
data Dict[str, Any]

Record data

required

Returns:

Type Description
Dict[str, Any]

Inserted record

Source code in manticore_cockroachdb/database.py
def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
    """Insert a record into a table.

    Args:
        table: Table name
        data: Record data

    Returns:
        Inserted record
    """
    from .crud.exceptions import DatabaseError

    # Handle empty data case
    if not data:
        query = f"""
            INSERT INTO "{table}" DEFAULT VALUES
            RETURNING *
        """
        try:
            results = self.execute(query)
            return results[0] if results else None
        except DatabaseError as e:
            # Check for unique violation
            if isinstance(e.__cause__, UniqueViolation):
                # Re-raise with more specific message
                constraint = str(e.__cause__).split("constraint")[1].split()[0].strip('"\'')
                raise DatabaseError(f"Unique constraint violation: {constraint}") from e.__cause__
            raise

    columns = [f'"{k}"' for k in data.keys()]
    placeholders = ["%s" for _ in range(len(data))]
    values = list(data.values())

    query = f"""
        INSERT INTO "{table}" ({", ".join(columns)})
        VALUES ({", ".join(placeholders)})
        RETURNING *
    """

    try:
        results = self.execute(query, tuple(values))
        return results[0] if results else None
    except DatabaseError as e:
        # Check for unique violation
        if isinstance(e.__cause__, UniqueViolation):
            # Re-raise with more specific message
            constraint = str(e.__cause__).split("constraint")[1].split()[0].strip('"\'')
            raise DatabaseError(f"Unique constraint violation: {constraint}") from e.__cause__
        raise
select(table, columns=None, where=None, order_by=None, limit=None, offset=None)

Select records from a table.

Parameters:

Name Type Description Default
table str

Table name

required
columns Optional[List[str]]

Columns to select

None
where Optional[Dict[str, Any]]

Filter conditions

None
order_by Optional[str]

Order by clause

None
limit Optional[int]

Result limit

None
offset Optional[int]

Result offset

None

Returns:

Type Description
List[Dict[str, Any]]

Selected records

Source code in manticore_cockroachdb/database.py
def select(
    self,
    table: str,
    columns: Optional[List[str]] = None,
    where: Optional[Dict[str, Any]] = None,
    order_by: Optional[str] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None
) -> List[Dict[str, Any]]:
    """Select records from a table.

    Args:
        table: Table name
        columns: Columns to select
        where: Filter conditions
        order_by: Order by clause
        limit: Result limit
        offset: Result offset

    Returns:
        Selected records
    """
    # Build columns clause
    if columns:
        columns_clause = ", ".join([f'"{col}"' for col in columns])
    else:
        columns_clause = "*"

    # Build query
    query = f'SELECT {columns_clause} FROM "{table}"'

    # Add where clause
    params = []
    if where:
        conditions = []
        for key in where:
            conditions.append(f'"{key}" = %s')
            params.append(where[key])
        query += f" WHERE {' AND '.join(conditions)}"

    # Add order by clause
    if order_by:
        query += f" ORDER BY {order_by}"

    # Add limit clause
    if limit:
        query += f" LIMIT {limit}"

    # Add offset clause
    if offset:
        query += f" OFFSET {offset}"

    # Execute query
    return self.execute(query, tuple(params) if params else None) or []
update(table, data, where)

Update records in a table.

Parameters:

Name Type Description Default
table str

Table name

required
data Dict[str, Any]

Update data

required
where Dict[str, Any]

Filter conditions

required

Returns:

Type Description
Optional[Dict[str, Any]]

Updated record

Source code in manticore_cockroachdb/database.py
def update(
    self,
    table: str,
    data: Dict[str, Any],
    where: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """Update records in a table.

    Args:
        table: Table name
        data: Update data
        where: Filter conditions

    Returns:
        Updated record
    """
    # Build set clause
    set_items = []
    params = []
    for i, (key, value) in enumerate(data.items()):
        set_items.append(f'"{key}" = %s')
        params.append(value)

    # Build where clause
    conditions = []
    for i, (key, value) in enumerate(where.items()):
        conditions.append(f'"{key}" = %s')
        params.append(value)

    # Build query
    query = f"""
        UPDATE "{table}"
        SET {", ".join(set_items)}
        WHERE {" AND ".join(conditions)}
        RETURNING *
    """

    # Execute query
    results = self.execute(query, tuple(params))
    return results[0] if results else None
delete(table, where)

Delete records from a table.

Parameters:

Name Type Description Default
table str

Table name

required
where Dict[str, Any]

Filter conditions

required

Returns:

Type Description
bool

True if records were deleted

Source code in manticore_cockroachdb/database.py
def delete(
    self,
    table: str,
    where: Dict[str, Any]
) -> bool:
    """Delete records from a table.

    Args:
        table: Table name
        where: Filter conditions

    Returns:
        True if records were deleted
    """
    # Build where clause
    conditions = []
    params = []
    for i, (key, value) in enumerate(where.items()):
        conditions.append(f'"{key}" = %s')
        params.append(value)

    # Build query
    query = f"""
        DELETE FROM "{table}"
        WHERE {" AND ".join(conditions)}
        RETURNING id
    """

    # Execute query
    results = self.execute(query, tuple(params))
    return bool(results)
from_url(url, connect=False) classmethod

Create database from URL.

Parameters:

Name Type Description Default
url str

Database URL in format postgresql://user:password@host:port/dbname?sslmode=mode

required
connect bool

Whether to connect immediately

False

Returns:

Type Description
Database

Database instance

Source code in manticore_cockroachdb/database.py
@classmethod
def from_url(cls, url: str, connect: bool = False) -> 'Database':
    """Create database from URL.

    Args:
        url: Database URL in format postgresql://user:password@host:port/dbname?sslmode=mode
        connect: Whether to connect immediately

    Returns:
        Database instance
    """
    parsed = urlparse(url)

    # Extract components
    host = parsed.hostname or "localhost"
    port = parsed.port or 26257
    user = parsed.username or "root"
    password = parsed.password
    database = parsed.path.lstrip("/") or "defaultdb"

    # Parse query parameters
    query_params = parse_qs(parsed.query)
    sslmode = query_params.get("sslmode", ["disable"])[0]

    # Create database instance
    db = cls(
        database=database,
        host=host,
        port=port,
        user=user,
        password=password,
        sslmode=sslmode,
        connect=connect
    )

    return db
__enter__()

Enter context manager.

Source code in manticore_cockroachdb/database.py
def __enter__(self) -> 'Database':
    """Enter context manager."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Exit context manager.

Source code in manticore_cockroachdb/database.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Exit context manager."""
    self.close()
transaction()

Create a transaction context manager.

Returns:

Type Description
Transaction

Transaction context manager

Source code in manticore_cockroachdb/database.py
def transaction(self) -> Transaction:
    """Create a transaction context manager.

    Returns:
        Transaction context manager
    """
    if self._pool is None:
        self.connect()

    conn = self._pool.getconn()
    return Transaction(conn)
run_in_transaction(operation, max_retries=3)

Run an operation in a transaction with retry logic.

Parameters:

Name Type Description Default
operation Callable[[Connection], T]

Operation to run

required
max_retries int

Maximum number of retries

3

Returns:

Type Description
T

Operation result

Source code in manticore_cockroachdb/database.py
def run_in_transaction(
    self,
    operation: Callable[[psycopg.Connection], T],
    max_retries: int = 3
) -> T:
    """Run an operation in a transaction with retry logic.

    Args:
        operation: Operation to run
        max_retries: Maximum number of retries

    Returns:
        Operation result
    """
    from .crud.exceptions import DatabaseError

    if self._pool is None:
        self.connect()

    conn = self._pool.getconn()
    try:
        retry_count = 0
        while True:
            try:
                with Transaction(conn) as tx:
                    return operation(tx)
            except SerializationFailure:
                # Only retry on serialization failures
                if retry_count >= max_retries:
                    raise DatabaseError(f"Transaction failed after {max_retries} retries")

                retry_count += 1
                # Exponential backoff with jitter
                delay = (0.1 * 2 ** retry_count) + (random.random() * 0.1)
                logger.warning(f"Serialization failure, retrying in {delay:.2f}s")
                time.sleep(delay)
            except Exception as e:
                raise DatabaseError(f"Transaction failed: {str(e)}") from e
    finally:
        self._pool.putconn(conn)
batch_insert(table, records)

Insert multiple records into a table.

Parameters:

Name Type Description Default
table str

Table name

required
records List[Dict[str, Any]]

Records to insert

required

Returns:

Type Description
List[Dict[str, Any]]

Inserted records

Source code in manticore_cockroachdb/database.py
def batch_insert(
    self,
    table: str,
    records: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """Insert multiple records into a table.

    Args:
        table: Table name
        records: Records to insert

    Returns:
        Inserted records
    """
    if not records:
        return []

    # Ensure all records have the same keys
    keys = set(records[0].keys())
    for record in records[1:]:
        if set(record.keys()) != keys:
            raise ValueError("All records must have the same keys")

    def insert_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
        columns = [f'"{k}"' for k in keys]
        placeholders = []
        values = []

        for i, record in enumerate(records):
            record_placeholders = ["%s" for _ in range(len(keys))]
            placeholders.append(f"({', '.join(record_placeholders)})")
            values.extend([record[k] for k in keys])

        query = f"""
            INSERT INTO "{table}" ({", ".join(columns)})
            VALUES {", ".join(placeholders)}
            RETURNING *
        """

        with conn.cursor(row_factory=dict_row) as cur:
            cur.execute(query, tuple(values))
            return cur.fetchall()

    return self.run_in_transaction(insert_batch)
batch_update(table, records, key_column='id')

Update multiple records in a table.

Parameters:

Name Type Description Default
table str

Table name

required
records List[Dict[str, Any]]

Records to update

required
key_column str

Primary key column

'id'

Returns:

Type Description
List[Dict[str, Any]]

Updated records

Source code in manticore_cockroachdb/database.py
def batch_update(
    self,
    table: str,
    records: List[Dict[str, Any]],
    key_column: str = "id"
) -> List[Dict[str, Any]]:
    """Update multiple records in a table.

    Args:
        table: Table name
        records: Records to update
        key_column: Primary key column

    Returns:
        Updated records
    """
    if not records:
        return []

    # Ensure all records have the key column
    for record in records:
        if key_column not in record:
            raise ValueError(f"All records must have the key column '{key_column}'")

    def update_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
        updated_records = []

        for record in records:
            key_value = record[key_column]
            update_data = {k: v for k, v in record.items() if k != key_column}

            set_items = []
            params = []
            for i, (key, value) in enumerate(update_data.items(), 1):
                set_items.append(f'"{key}" = %s')
                params.append(value)

            query = f"""
                UPDATE "{table}"
                SET {", ".join(set_items)}
                WHERE "{key_column}" = %s
                RETURNING *
            """
            params.append(key_value)

            with conn.cursor(row_factory=dict_row) as cur:
                cur.execute(query, tuple(params))
                result = cur.fetchone()
                if result:
                    updated_records.append(result)

        return updated_records

    return self.run_in_transaction(update_batch)
exists(table_name)

Check if a table exists.

Parameters:

Name Type Description Default
table_name str

Table name

required

Returns:

Type Description
bool

True if table exists

Source code in manticore_cockroachdb/database.py
def exists(self, table_name: str) -> bool:
    """Check if a table exists.

    Args:
        table_name: Table name

    Returns:
        True if table exists
    """
    query = """
        SELECT COUNT(*) as count
        FROM information_schema.tables
        WHERE table_name = %s
        AND table_schema = 'public'
    """
    result = self.execute(query, (table_name,))
    return result[0]["count"] > 0 if result else False 

Usage Examples

from manticore_cockroachdb import Database

# Connect to database
db = Database(
    host="localhost",
    port=26257,
    database="example_db",
    user="root",
    password="",
    ssl_mode="disable"
)

# Or connect using a URL
# db = Database.from_url("postgresql://root@localhost:26257/example_db?sslmode=disable")

# Create a table
db.create_table(
    "users",
    {
        "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
        "name": "TEXT NOT NULL",
        "email": "TEXT UNIQUE NOT NULL",
        "age": "INTEGER",
        "active": "BOOLEAN DEFAULT TRUE"
    },
    if_not_exists=True
)

# Insert data
user_id = db.insert("users", {
    "name": "John Doe",
    "email": "john@example.com",
    "age": 30
})

# Select data
user = db.select_one("users", where={"id": user_id})
print(f"User: {user['name']}, Email: {user['email']}")

# Update data
db.update("users", {"age": 31}, where={"id": user_id})

# Delete data
db.delete("users", where={"id": user_id})

# Execute raw SQL
db.execute("SELECT * FROM users WHERE age > %s", [25])

# Use transactions
with db.transaction():
    db.insert("users", {"name": "Alice", "email": "alice@example.com", "age": 25})
    db.insert("users", {"name": "Bob", "email": "bob@example.com", "age": 28})

# Close the connection
db.close()