Skip to content

Database API

The Database class provides a synchronous interface for interacting with CockroachDB. It handles connection management, SQL execution, and transaction support.

Class Documentation

Database

Main interface for CockroachDB operations.

Source code in manticore_cockroachdb/database.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
class Database:
    """Main interface for CockroachDB operations."""

    def __init__(
        self,
        database: str = "defaultdb",
        host: str = "localhost",
        port: int = 26257,
        user: str = "root",
        password: Optional[str] = None,
        sslmode: str = "disable",
        min_connections: int = 2,
        max_connections: int = 10,
        application_name: str = "manticore-db"
    ):
        """Initialize database connection.

        Args:
            database: Database name
            host: Database host
            port: Database port
            user: Database user
            password: Database password
            sslmode: SSL mode (disable, verify-ca, verify-full)
            min_connections: Minimum number of connections in pool
            max_connections: Maximum number of connections in pool
            application_name: Application name for connection
        """
        self.database = database
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.sslmode = sslmode
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.application_name = application_name

        # Build connection string
        self.dsn = self._build_dsn()

        # Initialize connection pool
        self._pool = None
        self.connect()

    def _build_dsn(self) -> str:
        """Build database connection string.

        Returns:
            Database connection string
        """
        # Start with required components
        dsn = f"postgresql://{self.user}"

        # Add password if provided
        if self.password:
            dsn += f":{self.password}"

        # Add host and database
        dsn += f"@{self.host}:{self.port}/{self.database}"

        # Add SSL mode
        dsn += f"?sslmode={self.sslmode}"

        return dsn

    def connect(self) -> None:
        """Connect to database and initialize connection pool."""
        try:
            # Create connection pool
            pool = ConnectionPool(
                self.dsn,
                min_size=self.min_connections,
                max_size=self.max_connections,
                configure=lambda conn: setattr(conn, 'row_factory', dict_row),
                open=True,
                kwargs={
                    "application_name": self.application_name
                }
            )
            # Use pool as context manager
            pool.__enter__()
            self._pool = pool
            logger.info(
                f"Connected to database {self.database} "
                f"with pool size {self.min_connections}-{self.max_connections}"
            )
        except Exception as e:
            logger.error(f"Failed to connect to database: {e}")
            raise

    def close(self) -> None:
        """Close database connection pool."""
        if self._pool:
            try:
                self._pool.__exit__(None, None, None)
            finally:
                self._pool = None
                logger.info("Closed database connection pool")

    def execute(
        self,
        query: str,
        params: Optional[tuple] = None,
        fetch: bool = True
    ) -> Optional[List[Dict[str, Any]]]:
        """Execute a query and optionally return results.

        Args:
            query: SQL query to execute
            params: Query parameters
            fetch: Whether to return results

        Returns:
            Query results if fetch=True, None otherwise
        """
        if not self._pool:
            raise RuntimeError("Database not connected")

        conn = self._pool.getconn()
        try:
            with conn.cursor() as cur:
                cur.execute(query, params)
                if fetch:
                    try:
                        results = cur.fetchall()
                        conn.commit()
                        return results
                    except psycopg.ProgrammingError:
                        # No results to fetch (e.g. for DDL statements)
                        conn.commit()
                        return None
                conn.commit()
                return None
        finally:
            self._pool.putconn(conn)

    def create_database(self, name: str) -> None:
        """Create a new database.

        Args:
            name: Database name
        """
        # Connect to default database
        default_db = Database(
            database="defaultdb",
            host=self.host,
            port=self.port,
            user=self.user,
            password=self.password,
            sslmode=self.sslmode
        )

        try:
            # Check if database exists
            exists = default_db.execute(
                "SELECT 1 FROM pg_database WHERE datname = %s",
                (name,)
            )

            if not exists:
                # Create database
                default_db.execute(f'CREATE DATABASE "{name}"')
                logger.info(f"Created database {name}")
            else:
                logger.info(f"Database {name} already exists")
        finally:
            default_db.close()

    def create_table(
        self,
        name: str,
        columns: Dict[str, str],
        if_not_exists: bool = True
    ) -> None:
        """Create a new table.

        Args:
            name: Table name
            columns: Column definitions {name: type}
            if_not_exists: Whether to create table only if it doesn't exist
        """
        # Build column definitions
        col_defs = [f"{col} {type_}" for col, type_ in columns.items()]

        # Build query
        exists_clause = "IF NOT EXISTS" if if_not_exists else ""
        query = f'CREATE TABLE {exists_clause} "{name}" ({", ".join(col_defs)})'

        # Execute query
        self.execute(query)
        logger.info(f"Created table {name}")

    def drop_table(self, name: str, if_exists: bool = True) -> None:
        """Drop a table.

        Args:
            name: Table name
            if_exists: Whether to drop only if table exists
        """
        exists_clause = "IF EXISTS" if if_exists else ""
        query = f'DROP TABLE {exists_clause} "{name}"'
        self.execute(query)
        logger.info(f"Dropped table {name}")

    def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Insert a record into a table.

        Args:
            table: Table name
            data: Record data

        Returns:
            Inserted record
        """
        # Build query
        columns = list(data.keys())
        placeholders = [f"%s" for _ in range(len(columns))]
        values = [data[col] for col in columns]

        query = (
            f'INSERT INTO "{table}" ({", ".join(columns)}) '
            f'VALUES ({", ".join(placeholders)}) RETURNING *'
        )

        # Execute query
        results = self.execute(query, tuple(values))
        return results[0] if results else None

    def select(
        self,
        table: str,
        columns: Optional[List[str]] = None,
        where: Optional[Dict[str, Any]] = None,
        order_by: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """Select records from a table.

        Args:
            table: Table name
            columns: Columns to select
            where: Where conditions {column: value}
            order_by: Order by clause
            limit: Result limit

        Returns:
            Selected records
        """
        # Build query
        cols = "*" if not columns else ", ".join(columns)
        query = f'SELECT {cols} FROM "{table}"'

        # Add where clause
        params = []
        if where:
            conditions = []
            for col, value in where.items():
                conditions.append(f"{col} = %s")
                params.append(value)
            query += f" WHERE {' AND '.join(conditions)}"

        # Add order by
        if order_by:
            query += f" ORDER BY {order_by}"

        # Add limit
        if limit:
            query += f" LIMIT {limit}"

        # Execute query
        return self.execute(query, tuple(params) if params else None)

    def update(
        self,
        table: str,
        data: Dict[str, Any],
        where: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Update records in a table.

        Args:
            table: Table name
            data: Update data
            where: Where conditions

        Returns:
            Updated record
        """
        # Build query
        updates = [f"{col} = %s" for col in data.keys()]
        conditions = [f"{col} = %s" for col in where.keys()]

        query = (
            f'UPDATE "{table}" SET {", ".join(updates)} '
            f'WHERE {" AND ".join(conditions)} RETURNING *'
        )

        # Build parameters
        params = list(data.values()) + list(where.values())

        # Execute query
        results = self.execute(query, tuple(params))
        return results[0] if results else None

    def delete(
        self,
        table: str,
        where: Dict[str, Any]
    ) -> bool:
        """Delete records from a table.

        Args:
            table: Table name
            where: Where conditions

        Returns:
            True if records were deleted, False otherwise
        """
        # First, check if record exists
        conditions = [f"{col} = %s" for col in where.keys()]
        check_query = f'SELECT COUNT(*) as count FROM "{table}" WHERE {" AND ".join(conditions)}'
        result = self.execute(check_query, tuple(where.values()))

        if result[0]["count"] == 0:
            return False

        # Build delete query
        delete_query = f'DELETE FROM "{table}" WHERE {" AND ".join(conditions)}'

        # Execute query
        self.execute(delete_query, tuple(where.values()), fetch=False)
        return True

    @classmethod
    def from_url(cls, url: str) -> 'Database':
        """Create database instance from URL.

        Args:
            url: Database URL

        Returns:
            Database instance
        """
        parsed = urlparse(url)
        params = parse_qs(parsed.query)

        db = cls(
            database=parsed.path.lstrip('/'),
            host=parsed.hostname or "localhost",
            port=parsed.port or 26257,
            user=parsed.username or "root",
            password=parsed.password,
            sslmode=params.get('sslmode', ['disable'])[0]
        )

        # We're already connecting in __init__, so no need to call connect() here
        return db

    def __enter__(self) -> 'Database':
        """Enter context manager."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit context manager."""
        self.close()

    def transaction(self) -> Transaction:
        """Start a new transaction.

        Returns:
            Transaction context manager
        """
        if not self._pool:
            raise RuntimeError("Database not connected")

        conn = self._pool.getconn()
        return Transaction(conn)

    def run_in_transaction(
        self,
        operation: Callable[[psycopg.Connection], T],
        max_retries: int = 3
    ) -> T:
        """Run an operation in a transaction with retry logic.

        Args:
            operation: Operation to run
            max_retries: Maximum number of retry attempts

        Returns:
            Operation result

        Raises:
            ValueError: If transaction fails after max retries
        """
        if not self._pool:
            raise RuntimeError("Database not connected")

        conn = self._pool.getconn()
        try:
            for retry in range(max_retries):
                try:
                    with Transaction(conn) as tx_conn:
                        result = operation(tx_conn)
                        return result
                except SerializationFailure as e:
                    if retry == max_retries - 1:
                        raise ValueError(
                            f"Transaction failed after {max_retries} retries: {e}"
                        )
                    # Exponential backoff
                    sleep_ms = (2 ** retry) * 100 * (random.random() + 0.5)
                    time.sleep(sleep_ms / 1000)
                    continue
        finally:
            self._pool.putconn(conn)

    def batch_insert(
        self,
        table: str,
        records: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Insert multiple records in a single transaction.

        Args:
            table: Table name
            records: Records to insert

        Returns:
            List of inserted records
        """
        if not records:
            return []

        # Get column names from first record
        columns = list(records[0].keys())

        # Build query
        placeholders = [
            f"({', '.join(['%s' for _ in range(len(columns))])})"
            for _ in range(len(records))
        ]
        query = (
            f'INSERT INTO "{table}" ({", ".join(columns)}) '
            f'VALUES {", ".join(placeholders)} '
            'RETURNING *'
        )

        # Flatten values
        values = [
            value
            for record in records
            for value in [record[col] for col in columns]
        ]

        # Execute in transaction
        def insert_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
            with conn.cursor() as cur:
                cur.execute(query, values)
                return cur.fetchall()

        return self.run_in_transaction(insert_batch)

    def batch_update(
        self,
        table: str,
        records: List[Dict[str, Any]],
        key_column: str = "id"
    ) -> List[Dict[str, Any]]:
        """Update multiple records in a single transaction.

        Args:
            table: Table name
            records: Records to update
            key_column: Column to use as key

        Returns:
            List of updated records
        """
        if not records:
            return []

        # Get column names excluding key
        columns = [col for col in records[0].keys() if col != key_column]
        if not columns:
            raise ValueError("No columns to update")

        # Build SET clause
        updates = []
        values = []
        for col in columns:
            # Build CASE expression
            when_clauses = []
            for record in records:
                when_clauses.append(f"WHEN %s THEN %s")
                values.extend([record[key_column], record[col]])
            updates.append(
                f"{col} = (CASE {key_column} "
                f"{' '.join(when_clauses)} "
                f"ELSE {col} END)"
            )

        # Build query
        query = (
            f'UPDATE "{table}" SET {", ".join(updates)} '
            f'WHERE {key_column} IN ({", ".join(["%s" for _ in records])}) '
            f'RETURNING *'
        )

        # Add values for IN clause
        values.extend(record[key_column] for record in records)

        # Execute in transaction
        def update_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
            with conn.cursor() as cur:
                cur.execute(query, values)
                results = cur.fetchall()

                # Create lookup by key
                result_map = {r[key_column]: r for r in results}

                # Return results in original order
                return [result_map[record[key_column]] for record in records]

        return self.run_in_transaction(update_batch) 

Functions

__init__(database='defaultdb', host='localhost', port=26257, user='root', password=None, sslmode='disable', min_connections=2, max_connections=10, application_name='manticore-db')

Initialize database connection.

Parameters:

Name Type Description Default
database str

Database name

'defaultdb'
host str

Database host

'localhost'
port int

Database port

26257
user str

Database user

'root'
password Optional[str]

Database password

None
sslmode str

SSL mode (disable, verify-ca, verify-full)

'disable'
min_connections int

Minimum number of connections in pool

2
max_connections int

Maximum number of connections in pool

10
application_name str

Application name for connection

'manticore-db'
Source code in manticore_cockroachdb/database.py
def __init__(
    self,
    database: str = "defaultdb",
    host: str = "localhost",
    port: int = 26257,
    user: str = "root",
    password: Optional[str] = None,
    sslmode: str = "disable",
    min_connections: int = 2,
    max_connections: int = 10,
    application_name: str = "manticore-db"
):
    """Initialize database connection.

    Args:
        database: Database name
        host: Database host
        port: Database port
        user: Database user
        password: Database password
        sslmode: SSL mode (disable, verify-ca, verify-full)
        min_connections: Minimum number of connections in pool
        max_connections: Maximum number of connections in pool
        application_name: Application name for connection
    """
    self.database = database
    self.host = host
    self.port = port
    self.user = user
    self.password = password
    self.sslmode = sslmode
    self.min_connections = min_connections
    self.max_connections = max_connections
    self.application_name = application_name

    # Build connection string
    self.dsn = self._build_dsn()

    # Initialize connection pool
    self._pool = None
    self.connect()
_build_dsn()

Build database connection string.

Returns:

Type Description
str

Database connection string

Source code in manticore_cockroachdb/database.py
def _build_dsn(self) -> str:
    """Build database connection string.

    Returns:
        Database connection string
    """
    # Start with required components
    dsn = f"postgresql://{self.user}"

    # Add password if provided
    if self.password:
        dsn += f":{self.password}"

    # Add host and database
    dsn += f"@{self.host}:{self.port}/{self.database}"

    # Add SSL mode
    dsn += f"?sslmode={self.sslmode}"

    return dsn
connect()

Connect to database and initialize connection pool.

Source code in manticore_cockroachdb/database.py
def connect(self) -> None:
    """Connect to database and initialize connection pool."""
    try:
        # Create connection pool
        pool = ConnectionPool(
            self.dsn,
            min_size=self.min_connections,
            max_size=self.max_connections,
            configure=lambda conn: setattr(conn, 'row_factory', dict_row),
            open=True,
            kwargs={
                "application_name": self.application_name
            }
        )
        # Use pool as context manager
        pool.__enter__()
        self._pool = pool
        logger.info(
            f"Connected to database {self.database} "
            f"with pool size {self.min_connections}-{self.max_connections}"
        )
    except Exception as e:
        logger.error(f"Failed to connect to database: {e}")
        raise
close()

Close database connection pool.

Source code in manticore_cockroachdb/database.py
def close(self) -> None:
    """Close database connection pool."""
    if self._pool:
        try:
            self._pool.__exit__(None, None, None)
        finally:
            self._pool = None
            logger.info("Closed database connection pool")
execute(query, params=None, fetch=True)

Execute a query and optionally return results.

Parameters:

Name Type Description Default
query str

SQL query to execute

required
params Optional[tuple]

Query parameters

None
fetch bool

Whether to return results

True

Returns:

Type Description
Optional[List[Dict[str, Any]]]

Query results if fetch=True, None otherwise

Source code in manticore_cockroachdb/database.py
def execute(
    self,
    query: str,
    params: Optional[tuple] = None,
    fetch: bool = True
) -> Optional[List[Dict[str, Any]]]:
    """Execute a query and optionally return results.

    Args:
        query: SQL query to execute
        params: Query parameters
        fetch: Whether to return results

    Returns:
        Query results if fetch=True, None otherwise
    """
    if not self._pool:
        raise RuntimeError("Database not connected")

    conn = self._pool.getconn()
    try:
        with conn.cursor() as cur:
            cur.execute(query, params)
            if fetch:
                try:
                    results = cur.fetchall()
                    conn.commit()
                    return results
                except psycopg.ProgrammingError:
                    # No results to fetch (e.g. for DDL statements)
                    conn.commit()
                    return None
            conn.commit()
            return None
    finally:
        self._pool.putconn(conn)
create_database(name)

Create a new database.

Parameters:

Name Type Description Default
name str

Database name

required
Source code in manticore_cockroachdb/database.py
def create_database(self, name: str) -> None:
    """Create a new database.

    Args:
        name: Database name
    """
    # Connect to default database
    default_db = Database(
        database="defaultdb",
        host=self.host,
        port=self.port,
        user=self.user,
        password=self.password,
        sslmode=self.sslmode
    )

    try:
        # Check if database exists
        exists = default_db.execute(
            "SELECT 1 FROM pg_database WHERE datname = %s",
            (name,)
        )

        if not exists:
            # Create database
            default_db.execute(f'CREATE DATABASE "{name}"')
            logger.info(f"Created database {name}")
        else:
            logger.info(f"Database {name} already exists")
    finally:
        default_db.close()
create_table(name, columns, if_not_exists=True)

Create a new table.

Parameters:

Name Type Description Default
name str

Table name

required
columns Dict[str, str]

Column definitions {name: type}

required
if_not_exists bool

Whether to create table only if it doesn't exist

True
Source code in manticore_cockroachdb/database.py
def create_table(
    self,
    name: str,
    columns: Dict[str, str],
    if_not_exists: bool = True
) -> None:
    """Create a new table.

    Args:
        name: Table name
        columns: Column definitions {name: type}
        if_not_exists: Whether to create table only if it doesn't exist
    """
    # Build column definitions
    col_defs = [f"{col} {type_}" for col, type_ in columns.items()]

    # Build query
    exists_clause = "IF NOT EXISTS" if if_not_exists else ""
    query = f'CREATE TABLE {exists_clause} "{name}" ({", ".join(col_defs)})'

    # Execute query
    self.execute(query)
    logger.info(f"Created table {name}")
drop_table(name, if_exists=True)

Drop a table.

Parameters:

Name Type Description Default
name str

Table name

required
if_exists bool

Whether to drop only if table exists

True
Source code in manticore_cockroachdb/database.py
def drop_table(self, name: str, if_exists: bool = True) -> None:
    """Drop a table.

    Args:
        name: Table name
        if_exists: Whether to drop only if table exists
    """
    exists_clause = "IF EXISTS" if if_exists else ""
    query = f'DROP TABLE {exists_clause} "{name}"'
    self.execute(query)
    logger.info(f"Dropped table {name}")
insert(table, data)

Insert a record into a table.

Parameters:

Name Type Description Default
table str

Table name

required
data Dict[str, Any]

Record data

required

Returns:

Type Description
Dict[str, Any]

Inserted record

Source code in manticore_cockroachdb/database.py
def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
    """Insert a record into a table.

    Args:
        table: Table name
        data: Record data

    Returns:
        Inserted record
    """
    # Build query
    columns = list(data.keys())
    placeholders = [f"%s" for _ in range(len(columns))]
    values = [data[col] for col in columns]

    query = (
        f'INSERT INTO "{table}" ({", ".join(columns)}) '
        f'VALUES ({", ".join(placeholders)}) RETURNING *'
    )

    # Execute query
    results = self.execute(query, tuple(values))
    return results[0] if results else None
select(table, columns=None, where=None, order_by=None, limit=None)

Select records from a table.

Parameters:

Name Type Description Default
table str

Table name

required
columns Optional[List[str]]

Columns to select

None
where Optional[Dict[str, Any]]

Where conditions {column: value}

None
order_by Optional[str]

Order by clause

None
limit Optional[int]

Result limit

None

Returns:

Type Description
List[Dict[str, Any]]

Selected records

Source code in manticore_cockroachdb/database.py
def select(
    self,
    table: str,
    columns: Optional[List[str]] = None,
    where: Optional[Dict[str, Any]] = None,
    order_by: Optional[str] = None,
    limit: Optional[int] = None
) -> List[Dict[str, Any]]:
    """Select records from a table.

    Args:
        table: Table name
        columns: Columns to select
        where: Where conditions {column: value}
        order_by: Order by clause
        limit: Result limit

    Returns:
        Selected records
    """
    # Build query
    cols = "*" if not columns else ", ".join(columns)
    query = f'SELECT {cols} FROM "{table}"'

    # Add where clause
    params = []
    if where:
        conditions = []
        for col, value in where.items():
            conditions.append(f"{col} = %s")
            params.append(value)
        query += f" WHERE {' AND '.join(conditions)}"

    # Add order by
    if order_by:
        query += f" ORDER BY {order_by}"

    # Add limit
    if limit:
        query += f" LIMIT {limit}"

    # Execute query
    return self.execute(query, tuple(params) if params else None)
update(table, data, where)

Update records in a table.

Parameters:

Name Type Description Default
table str

Table name

required
data Dict[str, Any]

Update data

required
where Dict[str, Any]

Where conditions

required

Returns:

Type Description
Optional[Dict[str, Any]]

Updated record

Source code in manticore_cockroachdb/database.py
def update(
    self,
    table: str,
    data: Dict[str, Any],
    where: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """Update records in a table.

    Args:
        table: Table name
        data: Update data
        where: Where conditions

    Returns:
        Updated record
    """
    # Build query
    updates = [f"{col} = %s" for col in data.keys()]
    conditions = [f"{col} = %s" for col in where.keys()]

    query = (
        f'UPDATE "{table}" SET {", ".join(updates)} '
        f'WHERE {" AND ".join(conditions)} RETURNING *'
    )

    # Build parameters
    params = list(data.values()) + list(where.values())

    # Execute query
    results = self.execute(query, tuple(params))
    return results[0] if results else None
delete(table, where)

Delete records from a table.

Parameters:

Name Type Description Default
table str

Table name

required
where Dict[str, Any]

Where conditions

required

Returns:

Type Description
bool

True if records were deleted, False otherwise

Source code in manticore_cockroachdb/database.py
def delete(
    self,
    table: str,
    where: Dict[str, Any]
) -> bool:
    """Delete records from a table.

    Args:
        table: Table name
        where: Where conditions

    Returns:
        True if records were deleted, False otherwise
    """
    # First, check if record exists
    conditions = [f"{col} = %s" for col in where.keys()]
    check_query = f'SELECT COUNT(*) as count FROM "{table}" WHERE {" AND ".join(conditions)}'
    result = self.execute(check_query, tuple(where.values()))

    if result[0]["count"] == 0:
        return False

    # Build delete query
    delete_query = f'DELETE FROM "{table}" WHERE {" AND ".join(conditions)}'

    # Execute query
    self.execute(delete_query, tuple(where.values()), fetch=False)
    return True
from_url(url) classmethod

Create database instance from URL.

Parameters:

Name Type Description Default
url str

Database URL

required

Returns:

Type Description
Database

Database instance

Source code in manticore_cockroachdb/database.py
@classmethod
def from_url(cls, url: str) -> 'Database':
    """Create database instance from URL.

    Args:
        url: Database URL

    Returns:
        Database instance
    """
    parsed = urlparse(url)
    params = parse_qs(parsed.query)

    db = cls(
        database=parsed.path.lstrip('/'),
        host=parsed.hostname or "localhost",
        port=parsed.port or 26257,
        user=parsed.username or "root",
        password=parsed.password,
        sslmode=params.get('sslmode', ['disable'])[0]
    )

    # We're already connecting in __init__, so no need to call connect() here
    return db
__enter__()

Enter context manager.

Source code in manticore_cockroachdb/database.py
def __enter__(self) -> 'Database':
    """Enter context manager."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Exit context manager.

Source code in manticore_cockroachdb/database.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Exit context manager."""
    self.close()
transaction()

Start a new transaction.

Returns:

Type Description
Transaction

Transaction context manager

Source code in manticore_cockroachdb/database.py
def transaction(self) -> Transaction:
    """Start a new transaction.

    Returns:
        Transaction context manager
    """
    if not self._pool:
        raise RuntimeError("Database not connected")

    conn = self._pool.getconn()
    return Transaction(conn)
run_in_transaction(operation, max_retries=3)

Run an operation in a transaction with retry logic.

Parameters:

Name Type Description Default
operation Callable[[Connection], T]

Operation to run

required
max_retries int

Maximum number of retry attempts

3

Returns:

Type Description
T

Operation result

Raises:

Type Description
ValueError

If transaction fails after max retries

Source code in manticore_cockroachdb/database.py
def run_in_transaction(
    self,
    operation: Callable[[psycopg.Connection], T],
    max_retries: int = 3
) -> T:
    """Run an operation in a transaction with retry logic.

    Args:
        operation: Operation to run
        max_retries: Maximum number of retry attempts

    Returns:
        Operation result

    Raises:
        ValueError: If transaction fails after max retries
    """
    if not self._pool:
        raise RuntimeError("Database not connected")

    conn = self._pool.getconn()
    try:
        for retry in range(max_retries):
            try:
                with Transaction(conn) as tx_conn:
                    result = operation(tx_conn)
                    return result
            except SerializationFailure as e:
                if retry == max_retries - 1:
                    raise ValueError(
                        f"Transaction failed after {max_retries} retries: {e}"
                    )
                # Exponential backoff
                sleep_ms = (2 ** retry) * 100 * (random.random() + 0.5)
                time.sleep(sleep_ms / 1000)
                continue
    finally:
        self._pool.putconn(conn)
batch_insert(table, records)

Insert multiple records in a single transaction.

Parameters:

Name Type Description Default
table str

Table name

required
records List[Dict[str, Any]]

Records to insert

required

Returns:

Type Description
List[Dict[str, Any]]

List of inserted records

Source code in manticore_cockroachdb/database.py
def batch_insert(
    self,
    table: str,
    records: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """Insert multiple records in a single transaction.

    Args:
        table: Table name
        records: Records to insert

    Returns:
        List of inserted records
    """
    if not records:
        return []

    # Get column names from first record
    columns = list(records[0].keys())

    # Build query
    placeholders = [
        f"({', '.join(['%s' for _ in range(len(columns))])})"
        for _ in range(len(records))
    ]
    query = (
        f'INSERT INTO "{table}" ({", ".join(columns)}) '
        f'VALUES {", ".join(placeholders)} '
        'RETURNING *'
    )

    # Flatten values
    values = [
        value
        for record in records
        for value in [record[col] for col in columns]
    ]

    # Execute in transaction
    def insert_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
        with conn.cursor() as cur:
            cur.execute(query, values)
            return cur.fetchall()

    return self.run_in_transaction(insert_batch)
batch_update(table, records, key_column='id')

Update multiple records in a single transaction.

Parameters:

Name Type Description Default
table str

Table name

required
records List[Dict[str, Any]]

Records to update

required
key_column str

Column to use as key

'id'

Returns:

Type Description
List[Dict[str, Any]]

List of updated records

Source code in manticore_cockroachdb/database.py
def batch_update(
    self,
    table: str,
    records: List[Dict[str, Any]],
    key_column: str = "id"
) -> List[Dict[str, Any]]:
    """Update multiple records in a single transaction.

    Args:
        table: Table name
        records: Records to update
        key_column: Column to use as key

    Returns:
        List of updated records
    """
    if not records:
        return []

    # Get column names excluding key
    columns = [col for col in records[0].keys() if col != key_column]
    if not columns:
        raise ValueError("No columns to update")

    # Build SET clause
    updates = []
    values = []
    for col in columns:
        # Build CASE expression
        when_clauses = []
        for record in records:
            when_clauses.append(f"WHEN %s THEN %s")
            values.extend([record[key_column], record[col]])
        updates.append(
            f"{col} = (CASE {key_column} "
            f"{' '.join(when_clauses)} "
            f"ELSE {col} END)"
        )

    # Build query
    query = (
        f'UPDATE "{table}" SET {", ".join(updates)} '
        f'WHERE {key_column} IN ({", ".join(["%s" for _ in records])}) '
        f'RETURNING *'
    )

    # Add values for IN clause
    values.extend(record[key_column] for record in records)

    # Execute in transaction
    def update_batch(conn: psycopg.Connection) -> List[Dict[str, Any]]:
        with conn.cursor() as cur:
            cur.execute(query, values)
            results = cur.fetchall()

            # Create lookup by key
            result_map = {r[key_column]: r for r in results}

            # Return results in original order
            return [result_map[record[key_column]] for record in records]

    return self.run_in_transaction(update_batch) 

Usage Examples

from manticore_cockroachdb import Database

# Connect to database
db = Database(
    host="localhost",
    port=26257,
    database="example_db",
    user="root",
    password="",
    ssl_mode="disable"
)

# Or connect using a URL
# db = Database.from_url("postgresql://root@localhost:26257/example_db?sslmode=disable")

# Create a table
db.create_table(
    "users",
    {
        "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
        "name": "TEXT NOT NULL",
        "email": "TEXT UNIQUE NOT NULL",
        "age": "INTEGER",
        "active": "BOOLEAN DEFAULT TRUE"
    },
    if_not_exists=True
)

# Insert data
user_id = db.insert("users", {
    "name": "John Doe",
    "email": "john@example.com",
    "age": 30
})

# Select data
user = db.select_one("users", where={"id": user_id})
print(f"User: {user['name']}, Email: {user['email']}")

# Update data
db.update("users", {"age": 31}, where={"id": user_id})

# Delete data
db.delete("users", where={"id": user_id})

# Execute raw SQL
db.execute("SELECT * FROM users WHERE age > %s", [25])

# Use transactions
with db.transaction():
    db.insert("users", {"name": "Alice", "email": "alice@example.com", "age": 25})
    db.insert("users", {"name": "Bob", "email": "bob@example.com", "age": 28})

# Close the connection
db.close()