Skip to content

AsyncDatabase API

The AsyncDatabase class provides an asynchronous interface for interacting with CockroachDB. It handles connection management, SQL execution, and transaction support using Python's async/await syntax.

Class Documentation

AsyncDatabase

Main interface for asynchronous CockroachDB operations.

Source code in manticore_cockroachdb/async_database.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
class AsyncDatabase:
    """Main interface for asynchronous CockroachDB operations."""

    def __init__(
        self,
        database: str = "defaultdb",
        host: str = "localhost",
        port: int = 26257,
        user: str = "root",
        password: Optional[str] = None,
        sslmode: str = "disable",
        min_connections: int = 2,
        max_connections: int = 10,
        application_name: str = "manticore-async-db"
    ):
        """Initialize database connection.

        Args:
            database: Database name
            host: Database host
            port: Database port
            user: Database user
            password: Database password
            sslmode: SSL mode (disable, verify-ca, verify-full)
            min_connections: Minimum number of connections in pool
            max_connections: Maximum number of connections in pool
            application_name: Application name for connection
        """
        self.database = database
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.sslmode = sslmode
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.application_name = application_name

        # Build connection string
        self.dsn = self._build_dsn()

        # Initialize connection pool
        self._pool = None
        self._pool_manager = None

    def _build_dsn(self) -> str:
        """Build database connection string.

        Returns:
            Database connection string
        """
        # Start with required components
        dsn = f"postgresql://{self.user}"

        # Add password if provided
        if self.password:
            dsn += f":{self.password}"

        # Add host and database
        dsn += f"@{self.host}:{self.port}/{self.database}"

        # Add SSL mode
        dsn += f"?sslmode={self.sslmode}"

        return dsn

    async def connect(self):
        """Connect to database."""
        if self._pool:
            return  # Already connected

        logger.info(
            f"Connecting to database {self.database} with pool size {self.min_connections}-{self.max_connections}"
        )

        try:
            # Create connection string
            conn_str = (
                f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/"
                f"{self.database}?sslmode={self.sslmode}"
            )

            # Create connection pool
            self._pool = AsyncConnectionPool(
                conn_str,
                min_size=self.min_connections,
                max_size=self.max_connections,
                kwargs={"row_factory": dict_row}
            )

            # Test connection
            async with self._pool.connection() as conn:
                async with conn.cursor() as cur:
                    await cur.execute("SELECT 1")

            logger.info(f"Connected to database {self.database} with pool size {self.min_connections}-{self.max_connections}")
        except Exception as e:
            logger.error(f"Failed to connect to database: {e}")
            if self._pool:
                await self._pool.close()
                self._pool = None
            raise

    async def close(self) -> None:
        """Close database connection pool."""
        if self._pool:
            try:
                await self._pool.__aexit__(None, None, None)
            finally:
                self._pool = None
                self._pool_manager = None
                logger.info("Closed database connection pool")

    async def execute(
        self,
        query: str,
        params: Optional[tuple] = None,
        fetch: bool = True
    ) -> Optional[List[Dict[str, Any]]]:
        """Execute a query and optionally return results.

        Args:
            query: SQL query to execute
            params: Query parameters
            fetch: Whether to return results

        Returns:
            Query results if fetch=True, None otherwise
        """
        if not self._pool:
            await self.connect()

        async with self._pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, params)
                if fetch:
                    try:
                        results = await cur.fetchall()
                        await conn.commit()
                        return results
                    except psycopg.ProgrammingError:
                        # No results to fetch (e.g. for DDL statements)
                        await conn.commit()
                        return None
                else:
                    await conn.commit()
                    return None

    async def create_database(self, name: str) -> None:
        """Create a new database.

        Args:
            name: Database name
        """
        # Connect to default database
        async with AsyncConnectionPool(
            f"postgresql://{self.user}"
            f"{':{}'.format(self.password) if self.password else ''}"
            f"@{self.host}:{self.port}/defaultdb"
            f"?sslmode={self.sslmode}",
            min_size=1,
            max_size=1
        ) as pool:
            async with pool.connection() as conn:
                # Check if database exists
                async with conn.cursor() as cur:
                    await cur.execute(
                        "SELECT datname FROM pg_database WHERE datname = %s",
                        (name,)
                    )
                    if await cur.fetchone():
                        logger.info(f"Database {name} already exists")
                        return

                    # Create database
                    # Use connection.execute instead of cursor to avoid transaction
                    await conn.execute(f'CREATE DATABASE "{name}"')
                    logger.info(f"Created database {name}")

    async def create_table(
        self,
        name: str,
        columns: Dict[str, str],
        if_not_exists: bool = True
    ) -> None:
        """Create a new table.

        Args:
            name: Table name
            columns: Column definitions {name: type}
            if_not_exists: Whether to create table only if it doesn't exist
        """
        # Build column definitions
        col_defs = ", ".join(f"{col} {type_}" for col, type_ in columns.items())

        # Build query
        query = f"CREATE TABLE {'IF NOT EXISTS' if if_not_exists else ''} {name} ({col_defs})"

        # Execute query
        await self.execute(query, fetch=False)
        logger.info(f"Created table {name}")

    async def drop_table(self, name: str, if_exists: bool = True) -> None:
        """Drop a table.

        Args:
            name: Table name
            if_exists: Whether to drop table only if it exists
        """
        # Build query
        query = f"DROP TABLE {'IF EXISTS' if if_exists else ''} {name}"

        # Execute query
        await self.execute(query, fetch=False)
        logger.info(f"Dropped table {name}")

    async def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Insert a record.

        Args:
            table: Table name
            data: Record data

        Returns:
            Inserted record
        """
        # Build column list
        columns = ", ".join(data.keys())

        # Build parameter placeholders
        placeholders = ", ".join(f"%s" for _ in data)

        # Build query
        query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders}) RETURNING *"

        # Execute query
        result = await self.execute(query, tuple(data.values()))
        return result[0]

    async def select(
        self,
        table: str,
        columns: Optional[List[str]] = None,
        where: Optional[Dict[str, Any]] = None,
        order_by: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """Select records from a table.

        Args:
            table: Table name
            columns: Columns to select
            where: Where conditions {column: value}
            order_by: Order by clause
            limit: Result limit

        Returns:
            Selected records
        """
        # Build query
        cols = "*" if not columns else ", ".join(columns)
        query = f'SELECT {cols} FROM "{table}"'

        # Add where clause
        params = []
        if where:
            conditions = []
            for col, value in where.items():
                conditions.append(f"{col} = %s")
                params.append(value)
            query += f" WHERE {' AND '.join(conditions)}"

        # Add order by
        if order_by:
            query += f" ORDER BY {order_by}"

        # Add limit
        if limit:
            query += f" LIMIT {limit}"

        # Execute query
        return await self.execute(query, tuple(params) if params else None)

    async def update(
        self,
        table: str,
        data: Dict[str, Any],
        where: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Update records.

        Args:
            table: Table name
            data: Update data
            where: Filter conditions

        Returns:
            Updated record
        """
        # Build update expressions
        updates = []
        update_params = []
        for key, value in data.items():
            updates.append(f"{key} = %s")
            update_params.append(value)

        # Build where conditions
        conditions = []
        where_params = []
        for key, value in where.items():
            conditions.append(f"{key} = %s")
            where_params.append(value)

        # Build query
        query = f"UPDATE {table} SET {', '.join(updates)} WHERE {' AND '.join(conditions)} RETURNING *"

        # Execute query
        result = await self.execute(query, tuple(update_params + where_params))
        return result[0] if result else None

    async def delete(
        self,
        table: str,
        where: Dict[str, Any]
    ) -> bool:
        """Delete records.

        Args:
            table: Table name
            where: Filter conditions

        Returns:
            Whether any records were deleted
        """
        # Build where conditions
        conditions = []
        params = []
        for key, value in where.items():
            conditions.append(f"{key} = %s")
            params.append(value)

        # Build query
        query = f"DELETE FROM {table} WHERE {' AND '.join(conditions)}"

        # Execute query
        before = await self.execute(f"SELECT COUNT(*) as count FROM {table} WHERE {' AND '.join(conditions)}", tuple(params))
        await self.execute(query, tuple(params), fetch=False)

        # Return whether any records were deleted
        return before[0]["count"] > 0 if before else False

    @classmethod
    def from_url(cls, url: str) -> 'AsyncDatabase':
        """Create database instance from URL.

        Args:
            url: Database URL

        Returns:
            Database instance
        """
        parsed = urlparse(url)
        params = parse_qs(parsed.query)

        db = cls(
            database=parsed.path.lstrip('/'),
            host=parsed.hostname or "localhost",
            port=parsed.port or 26257,
            user=parsed.username or "root",
            password=parsed.password,
            sslmode=params.get('sslmode', ['disable'])[0]
        )

        # Don't connect here, as we'll connect explicitly after creation
        return db

    async def __aenter__(self) -> 'AsyncDatabase':
        """Enter context."""
        if not self._pool:
            await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit context."""
        await self.close()

    async def transaction(self) -> AsyncTransaction:
        """Start a transaction.

        Returns:
            Transaction context manager
        """
        if not self._pool:
            await self.connect()

        conn = await self._pool.connection()
        return AsyncTransaction(conn)

    async def run_in_transaction(
        self,
        operation: Callable[[psycopg.AsyncConnection], T],
        max_retries: int = 3
    ) -> T:
        """Run an operation in a transaction with retry logic.

        Args:
            operation: Operation to run
            max_retries: Maximum number of retry attempts

        Returns:
            Operation result

        Raises:
            ValueError: If transaction fails after max retries
        """
        if not self._pool:
            raise RuntimeError("Database not connected")

        async with self._pool.connection() as conn:
            for retry in range(max_retries):
                try:
                    async with AsyncTransaction(conn) as tx_conn:
                        result = await operation(tx_conn)
                        return result
                except SerializationFailure as e:
                    if retry == max_retries - 1:
                        raise ValueError(
                            f"Transaction failed after {max_retries} retries: {e}"
                        )
                    # Exponential backoff
                    sleep_ms = (2 ** retry) * 100 * (random.random() + 0.5)
                    await asyncio.sleep(sleep_ms / 1000)
                    continue

            # This should never be reached, but just in case
            raise ValueError(f"Transaction failed after {max_retries} retries")

    async def batch_insert(
        self, 
        table: str, 
        records: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Insert multiple records into a table.

        Args:
            table: Table name
            records: Records to insert

        Returns:
            Inserted records
        """
        if not records:
            return []

        # Ensure all records have the same keys
        keys = set(records[0].keys())
        for record in records:
            if set(record.keys()) != keys:
                raise ValueError("All records must have the same keys")

        # Build query
        columns = list(keys)
        placeholders = ", ".join([
            f"({', '.join(['%s'] * len(columns))})" 
            for _ in range(len(records))
        ])
        query = f"""
            INSERT INTO "{table}" ({", ".join(columns)})
            VALUES {placeholders}
            RETURNING *
        """

        # Flatten values
        values = []
        for record in records:
            for col in columns:
                values.append(record[col])

        # Execute query
        results = await self.execute(query, tuple(values))
        return results

    async def batch_update(
        self, 
        table: str, 
        records: List[Dict[str, Any]], 
        key_column: str = "id"
    ) -> List[Dict[str, Any]]:
        """Update multiple records in a table.

        Args:
            table: Table name
            records: Records to update
            key_column: Primary key column

        Returns:
            Updated records
        """
        if not records:
            return []

        # Ensure all records have the key column and at least one more column
        for record in records:
            if key_column not in record:
                raise ValueError(f"All records must have the '{key_column}' column")
            if len(record) < 2:
                raise ValueError("Records must have at least one column to update")

        # Process records one by one for now
        # In the future, this could be optimized with bulk operations
        results = []
        for record in records:
            update_data = {k: v for k, v in record.items() if k != key_column}
            key_value = record[key_column]
            updated = await self.update(table, update_data, {key_column: key_value})
            if updated:
                results.append(updated)

        return results 

Functions

__init__(database='defaultdb', host='localhost', port=26257, user='root', password=None, sslmode='disable', min_connections=2, max_connections=10, application_name='manticore-async-db')

Initialize database connection.

Parameters:

Name Type Description Default
database str

Database name

'defaultdb'
host str

Database host

'localhost'
port int

Database port

26257
user str

Database user

'root'
password Optional[str]

Database password

None
sslmode str

SSL mode (disable, verify-ca, verify-full)

'disable'
min_connections int

Minimum number of connections in pool

2
max_connections int

Maximum number of connections in pool

10
application_name str

Application name for connection

'manticore-async-db'
Source code in manticore_cockroachdb/async_database.py
def __init__(
    self,
    database: str = "defaultdb",
    host: str = "localhost",
    port: int = 26257,
    user: str = "root",
    password: Optional[str] = None,
    sslmode: str = "disable",
    min_connections: int = 2,
    max_connections: int = 10,
    application_name: str = "manticore-async-db"
):
    """Initialize database connection.

    Args:
        database: Database name
        host: Database host
        port: Database port
        user: Database user
        password: Database password
        sslmode: SSL mode (disable, verify-ca, verify-full)
        min_connections: Minimum number of connections in pool
        max_connections: Maximum number of connections in pool
        application_name: Application name for connection
    """
    self.database = database
    self.host = host
    self.port = port
    self.user = user
    self.password = password
    self.sslmode = sslmode
    self.min_connections = min_connections
    self.max_connections = max_connections
    self.application_name = application_name

    # Build connection string
    self.dsn = self._build_dsn()

    # Initialize connection pool
    self._pool = None
    self._pool_manager = None
_build_dsn()

Build database connection string.

Returns:

Type Description
str

Database connection string

Source code in manticore_cockroachdb/async_database.py
def _build_dsn(self) -> str:
    """Build database connection string.

    Returns:
        Database connection string
    """
    # Start with required components
    dsn = f"postgresql://{self.user}"

    # Add password if provided
    if self.password:
        dsn += f":{self.password}"

    # Add host and database
    dsn += f"@{self.host}:{self.port}/{self.database}"

    # Add SSL mode
    dsn += f"?sslmode={self.sslmode}"

    return dsn
connect() async

Connect to database.

Source code in manticore_cockroachdb/async_database.py
async def connect(self):
    """Connect to database."""
    if self._pool:
        return  # Already connected

    logger.info(
        f"Connecting to database {self.database} with pool size {self.min_connections}-{self.max_connections}"
    )

    try:
        # Create connection string
        conn_str = (
            f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/"
            f"{self.database}?sslmode={self.sslmode}"
        )

        # Create connection pool
        self._pool = AsyncConnectionPool(
            conn_str,
            min_size=self.min_connections,
            max_size=self.max_connections,
            kwargs={"row_factory": dict_row}
        )

        # Test connection
        async with self._pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")

        logger.info(f"Connected to database {self.database} with pool size {self.min_connections}-{self.max_connections}")
    except Exception as e:
        logger.error(f"Failed to connect to database: {e}")
        if self._pool:
            await self._pool.close()
            self._pool = None
        raise
close() async

Close database connection pool.

Source code in manticore_cockroachdb/async_database.py
async def close(self) -> None:
    """Close database connection pool."""
    if self._pool:
        try:
            await self._pool.__aexit__(None, None, None)
        finally:
            self._pool = None
            self._pool_manager = None
            logger.info("Closed database connection pool")
execute(query, params=None, fetch=True) async

Execute a query and optionally return results.

Parameters:

Name Type Description Default
query str

SQL query to execute

required
params Optional[tuple]

Query parameters

None
fetch bool

Whether to return results

True

Returns:

Type Description
Optional[List[Dict[str, Any]]]

Query results if fetch=True, None otherwise

Source code in manticore_cockroachdb/async_database.py
async def execute(
    self,
    query: str,
    params: Optional[tuple] = None,
    fetch: bool = True
) -> Optional[List[Dict[str, Any]]]:
    """Execute a query and optionally return results.

    Args:
        query: SQL query to execute
        params: Query parameters
        fetch: Whether to return results

    Returns:
        Query results if fetch=True, None otherwise
    """
    if not self._pool:
        await self.connect()

    async with self._pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(query, params)
            if fetch:
                try:
                    results = await cur.fetchall()
                    await conn.commit()
                    return results
                except psycopg.ProgrammingError:
                    # No results to fetch (e.g. for DDL statements)
                    await conn.commit()
                    return None
            else:
                await conn.commit()
                return None
create_database(name) async

Create a new database.

Parameters:

Name Type Description Default
name str

Database name

required
Source code in manticore_cockroachdb/async_database.py
async def create_database(self, name: str) -> None:
    """Create a new database.

    Args:
        name: Database name
    """
    # Connect to default database
    async with AsyncConnectionPool(
        f"postgresql://{self.user}"
        f"{':{}'.format(self.password) if self.password else ''}"
        f"@{self.host}:{self.port}/defaultdb"
        f"?sslmode={self.sslmode}",
        min_size=1,
        max_size=1
    ) as pool:
        async with pool.connection() as conn:
            # Check if database exists
            async with conn.cursor() as cur:
                await cur.execute(
                    "SELECT datname FROM pg_database WHERE datname = %s",
                    (name,)
                )
                if await cur.fetchone():
                    logger.info(f"Database {name} already exists")
                    return

                # Create database
                # Use connection.execute instead of cursor to avoid transaction
                await conn.execute(f'CREATE DATABASE "{name}"')
                logger.info(f"Created database {name}")
create_table(name, columns, if_not_exists=True) async

Create a new table.

Parameters:

Name Type Description Default
name str

Table name

required
columns Dict[str, str]

Column definitions {name: type}

required
if_not_exists bool

Whether to create table only if it doesn't exist

True
Source code in manticore_cockroachdb/async_database.py
async def create_table(
    self,
    name: str,
    columns: Dict[str, str],
    if_not_exists: bool = True
) -> None:
    """Create a new table.

    Args:
        name: Table name
        columns: Column definitions {name: type}
        if_not_exists: Whether to create table only if it doesn't exist
    """
    # Build column definitions
    col_defs = ", ".join(f"{col} {type_}" for col, type_ in columns.items())

    # Build query
    query = f"CREATE TABLE {'IF NOT EXISTS' if if_not_exists else ''} {name} ({col_defs})"

    # Execute query
    await self.execute(query, fetch=False)
    logger.info(f"Created table {name}")
drop_table(name, if_exists=True) async

Drop a table.

Parameters:

Name Type Description Default
name str

Table name

required
if_exists bool

Whether to drop table only if it exists

True
Source code in manticore_cockroachdb/async_database.py
async def drop_table(self, name: str, if_exists: bool = True) -> None:
    """Drop a table.

    Args:
        name: Table name
        if_exists: Whether to drop table only if it exists
    """
    # Build query
    query = f"DROP TABLE {'IF EXISTS' if if_exists else ''} {name}"

    # Execute query
    await self.execute(query, fetch=False)
    logger.info(f"Dropped table {name}")
insert(table, data) async

Insert a record.

Parameters:

Name Type Description Default
table str

Table name

required
data Dict[str, Any]

Record data

required

Returns:

Type Description
Dict[str, Any]

Inserted record

Source code in manticore_cockroachdb/async_database.py
async def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
    """Insert a record.

    Args:
        table: Table name
        data: Record data

    Returns:
        Inserted record
    """
    # Build column list
    columns = ", ".join(data.keys())

    # Build parameter placeholders
    placeholders = ", ".join(f"%s" for _ in data)

    # Build query
    query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders}) RETURNING *"

    # Execute query
    result = await self.execute(query, tuple(data.values()))
    return result[0]
select(table, columns=None, where=None, order_by=None, limit=None) async

Select records from a table.

Parameters:

Name Type Description Default
table str

Table name

required
columns Optional[List[str]]

Columns to select

None
where Optional[Dict[str, Any]]

Where conditions {column: value}

None
order_by Optional[str]

Order by clause

None
limit Optional[int]

Result limit

None

Returns:

Type Description
List[Dict[str, Any]]

Selected records

Source code in manticore_cockroachdb/async_database.py
async def select(
    self,
    table: str,
    columns: Optional[List[str]] = None,
    where: Optional[Dict[str, Any]] = None,
    order_by: Optional[str] = None,
    limit: Optional[int] = None
) -> List[Dict[str, Any]]:
    """Select records from a table.

    Args:
        table: Table name
        columns: Columns to select
        where: Where conditions {column: value}
        order_by: Order by clause
        limit: Result limit

    Returns:
        Selected records
    """
    # Build query
    cols = "*" if not columns else ", ".join(columns)
    query = f'SELECT {cols} FROM "{table}"'

    # Add where clause
    params = []
    if where:
        conditions = []
        for col, value in where.items():
            conditions.append(f"{col} = %s")
            params.append(value)
        query += f" WHERE {' AND '.join(conditions)}"

    # Add order by
    if order_by:
        query += f" ORDER BY {order_by}"

    # Add limit
    if limit:
        query += f" LIMIT {limit}"

    # Execute query
    return await self.execute(query, tuple(params) if params else None)
update(table, data, where) async

Update records.

Parameters:

Name Type Description Default
table str

Table name

required
data Dict[str, Any]

Update data

required
where Dict[str, Any]

Filter conditions

required

Returns:

Type Description
Optional[Dict[str, Any]]

Updated record

Source code in manticore_cockroachdb/async_database.py
async def update(
    self,
    table: str,
    data: Dict[str, Any],
    where: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
    """Update records.

    Args:
        table: Table name
        data: Update data
        where: Filter conditions

    Returns:
        Updated record
    """
    # Build update expressions
    updates = []
    update_params = []
    for key, value in data.items():
        updates.append(f"{key} = %s")
        update_params.append(value)

    # Build where conditions
    conditions = []
    where_params = []
    for key, value in where.items():
        conditions.append(f"{key} = %s")
        where_params.append(value)

    # Build query
    query = f"UPDATE {table} SET {', '.join(updates)} WHERE {' AND '.join(conditions)} RETURNING *"

    # Execute query
    result = await self.execute(query, tuple(update_params + where_params))
    return result[0] if result else None
delete(table, where) async

Delete records.

Parameters:

Name Type Description Default
table str

Table name

required
where Dict[str, Any]

Filter conditions

required

Returns:

Type Description
bool

Whether any records were deleted

Source code in manticore_cockroachdb/async_database.py
async def delete(
    self,
    table: str,
    where: Dict[str, Any]
) -> bool:
    """Delete records.

    Args:
        table: Table name
        where: Filter conditions

    Returns:
        Whether any records were deleted
    """
    # Build where conditions
    conditions = []
    params = []
    for key, value in where.items():
        conditions.append(f"{key} = %s")
        params.append(value)

    # Build query
    query = f"DELETE FROM {table} WHERE {' AND '.join(conditions)}"

    # Execute query
    before = await self.execute(f"SELECT COUNT(*) as count FROM {table} WHERE {' AND '.join(conditions)}", tuple(params))
    await self.execute(query, tuple(params), fetch=False)

    # Return whether any records were deleted
    return before[0]["count"] > 0 if before else False
from_url(url) classmethod

Create database instance from URL.

Parameters:

Name Type Description Default
url str

Database URL

required

Returns:

Type Description
AsyncDatabase

Database instance

Source code in manticore_cockroachdb/async_database.py
@classmethod
def from_url(cls, url: str) -> 'AsyncDatabase':
    """Create database instance from URL.

    Args:
        url: Database URL

    Returns:
        Database instance
    """
    parsed = urlparse(url)
    params = parse_qs(parsed.query)

    db = cls(
        database=parsed.path.lstrip('/'),
        host=parsed.hostname or "localhost",
        port=parsed.port or 26257,
        user=parsed.username or "root",
        password=parsed.password,
        sslmode=params.get('sslmode', ['disable'])[0]
    )

    # Don't connect here, as we'll connect explicitly after creation
    return db
__aenter__() async

Enter context.

Source code in manticore_cockroachdb/async_database.py
async def __aenter__(self) -> 'AsyncDatabase':
    """Enter context."""
    if not self._pool:
        await self.connect()
    return self
__aexit__(exc_type, exc_val, exc_tb) async

Exit context.

Source code in manticore_cockroachdb/async_database.py
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Exit context."""
    await self.close()
transaction() async

Start a transaction.

Returns:

Type Description
AsyncTransaction

Transaction context manager

Source code in manticore_cockroachdb/async_database.py
async def transaction(self) -> AsyncTransaction:
    """Start a transaction.

    Returns:
        Transaction context manager
    """
    if not self._pool:
        await self.connect()

    conn = await self._pool.connection()
    return AsyncTransaction(conn)
run_in_transaction(operation, max_retries=3) async

Run an operation in a transaction with retry logic.

Parameters:

Name Type Description Default
operation Callable[[AsyncConnection], T]

Operation to run

required
max_retries int

Maximum number of retry attempts

3

Returns:

Type Description
T

Operation result

Raises:

Type Description
ValueError

If transaction fails after max retries

Source code in manticore_cockroachdb/async_database.py
async def run_in_transaction(
    self,
    operation: Callable[[psycopg.AsyncConnection], T],
    max_retries: int = 3
) -> T:
    """Run an operation in a transaction with retry logic.

    Args:
        operation: Operation to run
        max_retries: Maximum number of retry attempts

    Returns:
        Operation result

    Raises:
        ValueError: If transaction fails after max retries
    """
    if not self._pool:
        raise RuntimeError("Database not connected")

    async with self._pool.connection() as conn:
        for retry in range(max_retries):
            try:
                async with AsyncTransaction(conn) as tx_conn:
                    result = await operation(tx_conn)
                    return result
            except SerializationFailure as e:
                if retry == max_retries - 1:
                    raise ValueError(
                        f"Transaction failed after {max_retries} retries: {e}"
                    )
                # Exponential backoff
                sleep_ms = (2 ** retry) * 100 * (random.random() + 0.5)
                await asyncio.sleep(sleep_ms / 1000)
                continue

        # This should never be reached, but just in case
        raise ValueError(f"Transaction failed after {max_retries} retries")
batch_insert(table, records) async

Insert multiple records into a table.

Parameters:

Name Type Description Default
table str

Table name

required
records List[Dict[str, Any]]

Records to insert

required

Returns:

Type Description
List[Dict[str, Any]]

Inserted records

Source code in manticore_cockroachdb/async_database.py
async def batch_insert(
    self, 
    table: str, 
    records: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """Insert multiple records into a table.

    Args:
        table: Table name
        records: Records to insert

    Returns:
        Inserted records
    """
    if not records:
        return []

    # Ensure all records have the same keys
    keys = set(records[0].keys())
    for record in records:
        if set(record.keys()) != keys:
            raise ValueError("All records must have the same keys")

    # Build query
    columns = list(keys)
    placeholders = ", ".join([
        f"({', '.join(['%s'] * len(columns))})" 
        for _ in range(len(records))
    ])
    query = f"""
        INSERT INTO "{table}" ({", ".join(columns)})
        VALUES {placeholders}
        RETURNING *
    """

    # Flatten values
    values = []
    for record in records:
        for col in columns:
            values.append(record[col])

    # Execute query
    results = await self.execute(query, tuple(values))
    return results
batch_update(table, records, key_column='id') async

Update multiple records in a table.

Parameters:

Name Type Description Default
table str

Table name

required
records List[Dict[str, Any]]

Records to update

required
key_column str

Primary key column

'id'

Returns:

Type Description
List[Dict[str, Any]]

Updated records

Source code in manticore_cockroachdb/async_database.py
async def batch_update(
    self, 
    table: str, 
    records: List[Dict[str, Any]], 
    key_column: str = "id"
) -> List[Dict[str, Any]]:
    """Update multiple records in a table.

    Args:
        table: Table name
        records: Records to update
        key_column: Primary key column

    Returns:
        Updated records
    """
    if not records:
        return []

    # Ensure all records have the key column and at least one more column
    for record in records:
        if key_column not in record:
            raise ValueError(f"All records must have the '{key_column}' column")
        if len(record) < 2:
            raise ValueError("Records must have at least one column to update")

    # Process records one by one for now
    # In the future, this could be optimized with bulk operations
    results = []
    for record in records:
        update_data = {k: v for k, v in record.items() if k != key_column}
        key_value = record[key_column]
        updated = await self.update(table, update_data, {key_column: key_value})
        if updated:
            results.append(updated)

    return results 

Usage Examples

import asyncio
from manticore_cockroachdb import AsyncDatabase

async def main():
    # Connect to database
    db = AsyncDatabase(
        host="localhost",
        port=26257,
        database="example_db",
        user="root",
        password="",
        ssl_mode="disable"
    )

    # Or connect using a URL
    # db = AsyncDatabase.from_url("postgresql://root@localhost:26257/example_db?sslmode=disable")

    # Connect to the database
    await db.connect()

    try:
        # Create a table
        await db.create_table(
            "async_users",
            {
                "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
                "name": "TEXT NOT NULL",
                "email": "TEXT UNIQUE NOT NULL",
                "age": "INTEGER",
                "active": "BOOLEAN DEFAULT TRUE"
            },
            if_not_exists=True
        )

        # Insert data
        user_id = await db.insert("async_users", {
            "name": "John Doe",
            "email": "john@example.com",
            "age": 30
        })

        # Select data
        user = await db.select_one("async_users", where={"id": user_id})
        print(f"User: {user['name']}, Email: {user['email']}")

        # Update data
        await db.update("async_users", {"age": 31}, where={"id": user_id})

        # Delete data
        await db.delete("async_users", where={"id": user_id})

        # Execute raw SQL
        results = await db.execute("SELECT * FROM async_users WHERE age > %s", [25])

        # Use transactions
        async with db.transaction():
            await db.insert("async_users", {"name": "Alice", "email": "alice@example.com", "age": 25})
            await db.insert("async_users", {"name": "Bob", "email": "bob@example.com", "age": 28})

    finally:
        # Close the connection
        await db.close()

# Run the async function
asyncio.run(main())