Skip to content

AsyncTable

AsyncTable

Asynchronous database table with CRUD operations.

Source code in manticore_cockroachdb/crud/async_table.py
class AsyncTable:
    """Asynchronous database table with CRUD operations."""

    def __init__(
        self,
        name: str,
        db: Optional[AsyncDatabase] = None,
        schema: Optional[Dict[str, str]] = None,
        if_not_exists: bool = True
    ):
        """Initialize table.

        Args:
            name: Table name
            db: AsyncDatabase instance
            schema: Column definitions {name: type}
            if_not_exists: Whether to create table only if it doesn't exist
        """
        self.name = name
        self.db = db or AsyncDatabase()
        self.schema = schema
        self._initialized = False

    async def initialize(self) -> None:
        """Initialize the table.

        Creates the table if schema is provided and table does not exist.
        """
        if self._initialized:
            return

        # Create table if schema is provided
        if self.schema:
            await self.db.create_table(self.name, self.schema, True)

        self._initialized = True

    async def create(self, data: Dict) -> Dict:
        """Create a new record.

        Args:
            data: Record data

        Returns:
            Created record
        """
        if not self._initialized:
            await self.initialize()

        return await self.db.insert(self.name, data)

    async def read(self, id: Union[str, int]) -> Optional[Dict]:
        """Read a record.

        Args:
            id: Record ID

        Returns:
            Record data or None if not found
        """
        if not self._initialized:
            await self.initialize()

        results = await self.db.select(self.name, where={"id": id})
        return results[0] if results else None

    async def update(self, id: Union[str, int], data: Dict) -> Optional[Dict]:
        """Update a record.

        Args:
            id: Record ID
            data: Update data

        Returns:
            Updated record
        """
        if not self._initialized:
            await self.initialize()

        return await self.db.update(self.name, data, {"id": id})

    async def delete(self, id: Union[str, int]) -> bool:
        """Delete a record.

        Args:
            id: Record ID

        Returns:
            Whether the record was deleted
        """
        if not self._initialized:
            await self.initialize()

        return await self.db.delete(self.name, {"id": id})

    async def list(
        self,
        where: Optional[Dict] = None,
        order_by: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[Dict]:
        """List records.

        Args:
            where: Filter conditions
            order_by: Order by expression
            limit: Maximum number of records to return

        Returns:
            List of records
        """
        if not self._initialized:
            await self.initialize()

        return await self.db.select(
            self.name,
            where=where,
            order_by=order_by,
            limit=limit
        )

    async def count(self, where: Optional[Dict] = None) -> int:
        """Count records.

        Args:
            where: Filter conditions

        Returns:
            Number of records
        """
        if not self._initialized:
            await self.initialize()

        query = f"SELECT COUNT(*) as count FROM {self.name}"
        params = []

        if where:
            conditions = []
            for key, value in where.items():
                conditions.append(f"{key} = %s")
                params.append(value)
            query += f" WHERE {' AND '.join(conditions)}"

        result = await self.db.execute(query, tuple(params))
        return result[0]["count"]

    async def batch_create(self, records: List[Dict]) -> List[Dict]:
        """Create multiple records in a batch.

        Args:
            records: Records to create

        Returns:
            Created records
        """
        if not self._initialized:
            await self.initialize()

        return await self.db.batch_insert(self.name, records)

    async def batch_update(
        self,
        records: List[Dict],
        key_column: str = "id"
    ) -> List[Dict]:
        """Update multiple records in a batch.

        Args:
            records: Records to update
            key_column: Column to use as key

        Returns:
            Updated records
        """
        if not self._initialized:
            await self.initialize()

        return await self.db.batch_update(self.name, records, key_column)

    async def __aenter__(self) -> 'AsyncTable':
        """Enter async context."""
        if not self._initialized:
            await self.initialize()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit async context."""
        # No need to clean up anything for the table instance
        pass 

Functions

__init__(name, db=None, schema=None, if_not_exists=True)

Initialize table.

Parameters:

Name Type Description Default
name str

Table name

required
db Optional[AsyncDatabase]

AsyncDatabase instance

None
schema Optional[Dict[str, str]]

Column definitions {name: type}

None
if_not_exists bool

Whether to create table only if it doesn't exist

True
Source code in manticore_cockroachdb/crud/async_table.py
def __init__(
    self,
    name: str,
    db: Optional[AsyncDatabase] = None,
    schema: Optional[Dict[str, str]] = None,
    if_not_exists: bool = True
):
    """Initialize table.

    Args:
        name: Table name
        db: AsyncDatabase instance
        schema: Column definitions {name: type}
        if_not_exists: Whether to create table only if it doesn't exist
    """
    self.name = name
    self.db = db or AsyncDatabase()
    self.schema = schema
    self._initialized = False
initialize() async

Initialize the table.

Creates the table if schema is provided and table does not exist.

Source code in manticore_cockroachdb/crud/async_table.py
async def initialize(self) -> None:
    """Initialize the table.

    Creates the table if schema is provided and table does not exist.
    """
    if self._initialized:
        return

    # Create table if schema is provided
    if self.schema:
        await self.db.create_table(self.name, self.schema, True)

    self._initialized = True
create(data) async

Create a new record.

Parameters:

Name Type Description Default
data Dict

Record data

required

Returns:

Type Description
Dict

Created record

Source code in manticore_cockroachdb/crud/async_table.py
async def create(self, data: Dict) -> Dict:
    """Create a new record.

    Args:
        data: Record data

    Returns:
        Created record
    """
    if not self._initialized:
        await self.initialize()

    return await self.db.insert(self.name, data)
read(id) async

Read a record.

Parameters:

Name Type Description Default
id Union[str, int]

Record ID

required

Returns:

Type Description
Optional[Dict]

Record data or None if not found

Source code in manticore_cockroachdb/crud/async_table.py
async def read(self, id: Union[str, int]) -> Optional[Dict]:
    """Read a record.

    Args:
        id: Record ID

    Returns:
        Record data or None if not found
    """
    if not self._initialized:
        await self.initialize()

    results = await self.db.select(self.name, where={"id": id})
    return results[0] if results else None
update(id, data) async

Update a record.

Parameters:

Name Type Description Default
id Union[str, int]

Record ID

required
data Dict

Update data

required

Returns:

Type Description
Optional[Dict]

Updated record

Source code in manticore_cockroachdb/crud/async_table.py
async def update(self, id: Union[str, int], data: Dict) -> Optional[Dict]:
    """Update a record.

    Args:
        id: Record ID
        data: Update data

    Returns:
        Updated record
    """
    if not self._initialized:
        await self.initialize()

    return await self.db.update(self.name, data, {"id": id})
delete(id) async

Delete a record.

Parameters:

Name Type Description Default
id Union[str, int]

Record ID

required

Returns:

Type Description
bool

Whether the record was deleted

Source code in manticore_cockroachdb/crud/async_table.py
async def delete(self, id: Union[str, int]) -> bool:
    """Delete a record.

    Args:
        id: Record ID

    Returns:
        Whether the record was deleted
    """
    if not self._initialized:
        await self.initialize()

    return await self.db.delete(self.name, {"id": id})
list(where=None, order_by=None, limit=None) async

List records.

Parameters:

Name Type Description Default
where Optional[Dict]

Filter conditions

None
order_by Optional[str]

Order by expression

None
limit Optional[int]

Maximum number of records to return

None

Returns:

Type Description
List[Dict]

List of records

Source code in manticore_cockroachdb/crud/async_table.py
async def list(
    self,
    where: Optional[Dict] = None,
    order_by: Optional[str] = None,
    limit: Optional[int] = None
) -> List[Dict]:
    """List records.

    Args:
        where: Filter conditions
        order_by: Order by expression
        limit: Maximum number of records to return

    Returns:
        List of records
    """
    if not self._initialized:
        await self.initialize()

    return await self.db.select(
        self.name,
        where=where,
        order_by=order_by,
        limit=limit
    )
count(where=None) async

Count records.

Parameters:

Name Type Description Default
where Optional[Dict]

Filter conditions

None

Returns:

Type Description
int

Number of records

Source code in manticore_cockroachdb/crud/async_table.py
async def count(self, where: Optional[Dict] = None) -> int:
    """Count records.

    Args:
        where: Filter conditions

    Returns:
        Number of records
    """
    if not self._initialized:
        await self.initialize()

    query = f"SELECT COUNT(*) as count FROM {self.name}"
    params = []

    if where:
        conditions = []
        for key, value in where.items():
            conditions.append(f"{key} = %s")
            params.append(value)
        query += f" WHERE {' AND '.join(conditions)}"

    result = await self.db.execute(query, tuple(params))
    return result[0]["count"]
batch_create(records) async

Create multiple records in a batch.

Parameters:

Name Type Description Default
records List[Dict]

Records to create

required

Returns:

Type Description
List[Dict]

Created records

Source code in manticore_cockroachdb/crud/async_table.py
async def batch_create(self, records: List[Dict]) -> List[Dict]:
    """Create multiple records in a batch.

    Args:
        records: Records to create

    Returns:
        Created records
    """
    if not self._initialized:
        await self.initialize()

    return await self.db.batch_insert(self.name, records)
batch_update(records, key_column='id') async

Update multiple records in a batch.

Parameters:

Name Type Description Default
records List[Dict]

Records to update

required
key_column str

Column to use as key

'id'

Returns:

Type Description
List[Dict]

Updated records

Source code in manticore_cockroachdb/crud/async_table.py
async def batch_update(
    self,
    records: List[Dict],
    key_column: str = "id"
) -> List[Dict]:
    """Update multiple records in a batch.

    Args:
        records: Records to update
        key_column: Column to use as key

    Returns:
        Updated records
    """
    if not self._initialized:
        await self.initialize()

    return await self.db.batch_update(self.name, records, key_column)
__aenter__() async

Enter async context.

Source code in manticore_cockroachdb/crud/async_table.py
async def __aenter__(self) -> 'AsyncTable':
    """Enter async context."""
    if not self._initialized:
        await self.initialize()
    return self
__aexit__(exc_type, exc_val, exc_tb) async

Exit async context.

Source code in manticore_cockroachdb/crud/async_table.py
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Exit async context."""
    # No need to clean up anything for the table instance
    pass 

Overview

The AsyncTable class provides a high-level, asynchronous interface for interacting with database tables. It encapsulates common CRUD (Create, Read, Update, Delete) operations and handles table initialization automatically.

Basic Usage

import asyncio
from manticore_cockroachdb import AsyncDatabase, AsyncTable

async def main():
    # Create database connection
    db = AsyncDatabase(database="mydb", host="localhost")
    await db.connect()

    # Define a users table
    users = AsyncTable(
        "users",
        db=db,
        schema={
            "id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
            "name": "TEXT NOT NULL",
            "email": "TEXT UNIQUE NOT NULL",
        }
    )

    # Initialize the table (creates it if it doesn't exist)
    await users.initialize()

    # Create a user
    user = await users.create({
        "name": "John Doe",
        "email": "john@example.com"
    })

    print(f"Created user: {user}")

    # Read the user
    retrieved_user = await users.read(user["id"])
    print(f"Retrieved user: {retrieved_user}")

    # Update the user
    updated_user = await users.update(user["id"], {"name": "Jane Doe"})
    print(f"Updated user: {updated_user}")

    # List users
    all_users = await users.list()
    print(f"All users: {all_users}")

    # Count users
    count = await users.count()
    print(f"User count: {count}")

    # Delete the user
    deleted = await users.delete(user["id"])
    print(f"User deleted: {deleted}")

    await db.close()

asyncio.run(main())

CRUD Operations

Creating Records

The create method inserts a new record into the table:

user = await users_table.create({
    "name": "John Doe",
    "email": "john@example.com",
    "age": 30
})

# user contains the full record, including any default values and generated IDs
print(user["id"])  # The generated UUID

Reading Records

The read method retrieves a record by its ID:

user = await users_table.read("550e8400-e29b-41d4-a716-446655440000")
if user:
    print(f"Found user: {user['name']}")
else:
    print("User not found")

Updating Records

The update method modifies an existing record:

updated_user = await users_table.update(
    "550e8400-e29b-41d4-a716-446655440000",
    {
        "name": "John Smith",
        "age": 31
    }
)

if updated_user:
    print(f"Updated user: {updated_user}")
else:
    print("User not found")

Deleting Records

The delete method removes a record from the table:

success = await users_table.delete("550e8400-e29b-41d4-a716-446655440000")
if success:
    print("User deleted successfully")
else:
    print("User not found or could not be deleted")

Batch Operations

Batch Creation

The batch_create method inserts multiple records in a single transaction:

users = [
    {"name": "User 1", "email": "user1@example.com"},
    {"name": "User 2", "email": "user2@example.com"},
    {"name": "User 3", "email": "user3@example.com"},
]

created_users = await users_table.batch_create(users)
print(f"Created {len(created_users)} users")

Batch Updates

The batch_update method updates multiple records in a single transaction:

updates = [
    {"id": "id1", "active": False},
    {"id": "id2", "active": False},
    {"id": "id3", "active": True},
]

updated_users = await users_table.batch_update(updates)
print(f"Updated {len(updated_users)} users")

Query Operations

Listing Records

The list method retrieves multiple records with filtering, ordering, and limits:

# Get all users
all_users = await users_table.list()

# Get users with filtering
active_users = await users_table.list(where={"active": True})

# Get users with ordering
sorted_users = await users_table.list(order_by="name ASC")

# Get users with limit
first_10_users = await users_table.list(limit=10)

# Combine options
result = await users_table.list(
    where={"active": True},
    order_by="created_at DESC",
    limit=5
)

Counting Records

The count method counts records, optionally with filters:

# Count all users
total_users = await users_table.count()

# Count with filters
active_count = await users_table.count(where={"active": True})

Using as a Context Manager

async with AsyncTable("users", db=db) as users:
    # Table is automatically initialized
    user = await users.create({"name": "John", "email": "john@example.com"})
    # ... perform more operations
    # No need to call initialize() or worry about cleanup