Skip to content

Async Examples

This page provides practical examples of using the asynchronous API of the Manticore CockroachDB client. These examples demonstrate common patterns and best practices for working with the library in async environments.

Basic CRUD Operations

This example shows how to perform basic CRUD operations using the asynchronous API:

import asyncio
import uuid
from manticore_cockroachdb import AsyncDatabase, AsyncTable

async def async_crud_example():
    # Connect to database
    db = AsyncDatabase(database="testdb", host="localhost")
    await db.connect()

    try:
        # Create a products table
        products = AsyncTable(
            "products",
            db=db,
            schema={
                "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
                "name": "TEXT NOT NULL",
                "price": "DECIMAL(10,2) NOT NULL",
                "stock": "INTEGER NOT NULL DEFAULT 0",
                "active": "BOOLEAN NOT NULL DEFAULT TRUE"
            }
        )

        # Initialize the table
        await products.initialize()

        # Create a product
        product = await products.create({
            "name": "Smartphone",
            "price": 799.99,
            "stock": 50
        })
        print(f"Created product: {product}")

        # Read the product
        retrieved = await products.read(product["id"])
        print(f"Retrieved product: {retrieved}")

        # Update the product
        updated = await products.update(product["id"], {
            "price": 749.99,
            "stock": 45
        })
        print(f"Updated product: {updated}")

        # List all products
        all_products = await products.list()
        print(f"All products: {all_products}")

        # Delete the product
        deleted = await products.delete(product["id"])
        print(f"Product deleted: {deleted}")

    finally:
        # Close the database connection
        await db.close()

asyncio.run(async_crud_example())

Working with Transactions

This example demonstrates how to use transactions with the asynchronous API:

import asyncio
from manticore_cockroachdb import AsyncDatabase, AsyncTable

async def transaction_example():
    # Connect to database
    db = AsyncDatabase(database="testdb", host="localhost")
    await db.connect()

    try:
        # Create tables
        accounts = AsyncTable(
            "accounts",
            db=db,
            schema={
                "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
                "owner": "TEXT NOT NULL",
                "balance": "DECIMAL(12,2) NOT NULL"
            }
        )
        await accounts.initialize()

        # Create initial accounts
        account1 = await accounts.create({
            "owner": "Alice",
            "balance": 1000.00
        })

        account2 = await accounts.create({
            "owner": "Bob",
            "balance": 500.00
        })

        # Perform a transfer using a transaction
        async def transfer_funds(conn):
            amount = 200.00

            # Deduct from account1
            async with conn.cursor() as cur:
                await cur.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s RETURNING *",
                    (amount, account1["id"])
                )
                sender = await cur.fetchone()

                # Add to account2
                await cur.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s RETURNING *",
                    (amount, account2["id"])
                )
                recipient = await cur.fetchone()

                return {
                    "sender": sender,
                    "recipient": recipient,
                    "amount": amount
                }

        # Run the transfer in a transaction
        result = await db.run_in_transaction(transfer_funds)
        print(f"Transfer completed: {result}")

        # Verify the new balances
        updated_account1 = await accounts.read(account1["id"])
        updated_account2 = await accounts.read(account2["id"])

        print(f"{updated_account1['owner']}'s balance: {updated_account1['balance']}")
        print(f"{updated_account2['owner']}'s balance: {updated_account2['balance']}")

    finally:
        # Clean up
        await db.drop_table("accounts")
        await db.close()

asyncio.run(transaction_example())

Batch Operations

This example shows how to perform batch operations efficiently:

import asyncio
import random
from manticore_cockroachdb import AsyncDatabase, AsyncTable

async def batch_operations_example():
    # Connect to database
    db = AsyncDatabase(database="testdb", host="localhost")
    await db.connect()

    try:
        # Create a sensors table
        sensors = AsyncTable(
            "sensors",
            db=db,
            schema={
                "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
                "location": "TEXT NOT NULL",
                "type": "TEXT NOT NULL",
                "reading": "FLOAT NOT NULL",
                "timestamp": "TIMESTAMP DEFAULT NOW()"
            }
        )
        await sensors.initialize()

        # Generate sensor data
        sensor_data = []
        locations = ["kitchen", "living_room", "bedroom", "bathroom", "garage"]
        types = ["temperature", "humidity", "pressure", "light"]

        for _ in range(20):
            sensor_data.append({
                "location": random.choice(locations),
                "type": random.choice(types),
                "reading": round(random.uniform(0, 100), 2)
            })

        # Insert all records in a batch
        inserted = await sensors.batch_create(sensor_data)
        print(f"Inserted {len(inserted)} sensor readings")

        # Query data - temperature sensors with high readings
        high_temps = await sensors.list(
            where={"type": "temperature"},
            order_by="reading DESC",
            limit=5
        )

        print("\nHighest temperature readings:")
        for reading in high_temps:
            print(f"Location: {reading['location']}, Reading: {reading['reading']}°C")

        # Update multiple records in batch
        # For example, adjust all humidity readings
        humidity_readings = await sensors.list(where={"type": "humidity"})
        for reading in humidity_readings:
            reading["reading"] = round(reading["reading"] * 0.95, 2)  # Adjust by 5%

        updated = await sensors.batch_update(humidity_readings)
        print(f"\nUpdated {len(updated)} humidity readings")

    finally:
        # Clean up
        await db.drop_table("sensors")
        await db.close()

asyncio.run(batch_operations_example())

Using Context Managers

This example demonstrates how to use async context managers for cleaner code:

import asyncio
from manticore_cockroachdb import AsyncDatabase, AsyncTable

async def context_manager_example():
    # Use database as context manager
    async with AsyncDatabase(database="testdb", host="localhost") as db:
        # Use table as context manager
        async with AsyncTable(
            "notes",
            db=db,
            schema={
                "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
                "title": "TEXT NOT NULL",
                "content": "TEXT",
                "created_at": "TIMESTAMP DEFAULT NOW()"
            }
        ) as notes:
            # Table is automatically initialized

            # Create notes
            await notes.create({
                "title": "Shopping List",
                "content": "Milk, Eggs, Bread"
            })

            await notes.create({
                "title": "Meeting Notes",
                "content": "Discuss project timeline and resource allocation"
            })

            # List all notes
            all_notes = await notes.list(order_by="created_at DESC")

            print("Notes:")
            for note in all_notes:
                print(f"- {note['title']}: {note['content']}")

        # Clean up
        await db.drop_table("notes")
        # Connection is automatically closed when exiting the context

asyncio.run(context_manager_example())

Error Handling

This example shows proper error handling with the async API:

import asyncio
from manticore_cockroachdb import AsyncDatabase, AsyncTable
from psycopg.errors import UniqueViolation, ForeignKeyViolation

async def error_handling_example():
    # Connect to database
    db = AsyncDatabase(database="testdb", host="localhost")
    await db.connect()

    try:
        # Create a users table
        users = AsyncTable(
            "users_example",
            db=db,
            schema={
                "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
                "username": "TEXT NOT NULL UNIQUE",  # Unique constraint
                "email": "TEXT NOT NULL"
            }
        )
        await users.initialize()

        # Create a user
        try:
            user = await users.create({
                "username": "johndoe",
                "email": "john@example.com"
            })
            print(f"Created user: {user['username']}")

            # Try to create another user with the same username (will fail)
            duplicate_user = await users.create({
                "username": "johndoe",  # Same username
                "email": "another@example.com"
            })

        except UniqueViolation as e:
            print(f"Couldn't create duplicate user: {e}")

            # Handle the error and try a different username
            try:
                alternate_user = await users.create({
                    "username": "johndoe2",  # Different username
                    "email": "another@example.com"
                })
                print(f"Created alternative user: {alternate_user['username']}")

            except Exception as e2:
                print(f"Failed to create alternative user: {e2}")

        # List all successful users
        all_users = await users.list()
        print(f"Total users: {len(all_users)}")
        for user in all_users:
            print(f"- {user['username']}: {user['email']}")

    except Exception as e:
        print(f"Unexpected error: {e}")

    finally:
        # Clean up
        await db.drop_table("users_example")
        await db.close()

asyncio.run(error_handling_example())