Synchronous Operations
This guide covers how to use the synchronous API for database operations with the Manticore CockroachDB client.
Connecting to the Database
You can connect to a CockroachDB database using the Database class:
from manticore_cockroachdb import Database
# Connect with individual parameters
db = Database(
host="localhost",
port=26257,
database="example_db",
user="root",
password="",
ssl_mode="disable"
)
# Or connect using a connection URL
db = Database.from_url("postgresql://root@localhost:26257/example_db?sslmode=disable")
Basic Operations
Creating Tables
# Define table schema
users_schema = {
"id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE NOT NULL",
"age": "INTEGER",
"active": "BOOLEAN DEFAULT TRUE",
"created_at": "TIMESTAMPTZ DEFAULT now()"
}
# Create the table
db.create_table("users", users_schema, if_not_exists=True)
Inserting Data
# Insert a single record
user_id = db.insert("users", {
"name": "John Doe",
"email": "john@example.com",
"age": 30
})
# Insert multiple records
db.batch_insert("users", [
{"name": "Alice Smith", "email": "alice@example.com", "age": 25},
{"name": "Bob Johnson", "email": "bob@example.com", "age": 35}
])
Querying Data
# Select all records
all_users = db.select("users")
# Select with conditions
active_users = db.select("users", where={"active": True})
# Select with custom WHERE clause
young_users = db.select("users", where_clause="age < %s", params=[30])
# Select a single record
user = db.select_one("users", where={"id": user_id})
# Count records
user_count = db.count("users")
active_count = db.count("users", where={"active": True})
Updating Data
# Update a record
db.update("users", {"age": 31}, where={"id": user_id})
# Update with custom WHERE clause
db.update("users", {"active": False}, where_clause="age > %s", params=[40])
Deleting Data
# Delete a record
db.delete("users", where={"id": user_id})
# Delete with custom WHERE clause
db.delete("users", where_clause="created_at < %s", params=["2020-01-01"])
Using Transactions
# Start a transaction
with db.transaction():
# All operations within this block are part of the same transaction
db.insert("users", {"name": "Transaction User", "email": "tx@example.com"})
db.update("users", {"active": False}, where={"email": "john@example.com"})
# If any operation fails, the entire transaction is rolled back
# If all operations succeed, the transaction is committed
Using the Table Class
The Table class provides a more object-oriented approach to working with tables:
from manticore_cockroachdb import Table
# Create a Table instance
users = Table("users", db=db)
# Create a record
user = users.create({
"name": "Table User",
"email": "table@example.com",
"age": 28
})
# Read a record
retrieved_user = users.read(user["id"])
# Update a record
updated_user = users.update(user["id"], {"age": 29})
# Delete a record
users.delete(user["id"])
# List records
all_users = users.list()
active_users = users.list(where={"active": True})
# Count records
count = users.count()
Batch Operations with Table
# Batch create
batch_users = [
{"name": "Batch User 1", "email": "batch1@example.com", "age": 21},
{"name": "Batch User 2", "email": "batch2@example.com", "age": 22},
{"name": "Batch User 3", "email": "batch3@example.com", "age": 23}
]
created_users = users.batch_create(batch_users)
# Batch update
updates = [
{"id": created_users[0]["id"], "age": 31},
{"id": created_users[1]["id"], "age": 32},
{"id": created_users[2]["id"], "age": 33}
]
updated_users = users.batch_update(updates)
# Batch delete
ids_to_delete = [user["id"] for user in created_users]
users.batch_delete(ids_to_delete)
Closing the Connection
Always close the database connection when you're done:
Complete Example
from manticore_cockroachdb import Database, Table
# Connect to database
db = Database(database="example_db")
try:
# Create table
db.create_table(
"users",
{
"id": "UUID PRIMARY KEY DEFAULT gen_random_uuid()",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE NOT NULL",
"age": "INTEGER",
"active": "BOOLEAN DEFAULT TRUE"
},
if_not_exists=True
)
# Create Table instance
users = Table("users", db=db)
# Insert data
user = users.create({
"name": "John Doe",
"email": "john@example.com",
"age": 30
})
print(f"Created user: {user['name']} with ID: {user['id']}")
# Update data
updated_user = users.update(user["id"], {"age": 31})
print(f"Updated user age: {updated_user['age']}")
# Query data
all_users = users.list()
print(f"Total users: {len(all_users)}")
# Delete data
users.delete(user["id"])
print("User deleted")
finally:
# Close connection
db.close()