Migration Examples
This page provides practical examples of using the migration features of the Manticore CockroachDB client. These examples demonstrate how to manage database schema changes effectively.
Synchronous Migrations
This example shows how to create and apply migrations using the synchronous API:
import os
from manticore_cockroachdb import Database, Migration
def sync_migration_example():
# Connect to database
db = Database(database="testdb", host="localhost")
db.connect()
try:
# Create migration directory if it doesn't exist
migrations_dir = "migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create migration files
Migration.create_migration(
"001_create_users_table",
"""
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
""",
"""
DROP TABLE users;
""",
directory=migrations_dir
)
Migration.create_migration(
"002_add_age_column",
"""
ALTER TABLE users ADD COLUMN age INTEGER;
""",
"""
ALTER TABLE users DROP COLUMN age;
""",
directory=migrations_dir
)
# Load migrations
migrations = Migration.load_migrations(migrations_dir)
print("Loaded {} migrations".format(len(migrations)))
# Print migration details
for m in migrations:
print(" Version {}: {}".format(m.version, m.description))
# Apply all migrations
applied = Migration.apply_migrations(db, migrations)
print("Applied {} migrations".format(len(applied)))
# Insert test data
db.execute(
"INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
("testuser", "test@example.com", 30)
)
# Query the data
with db.cursor() as cursor:
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
for user in users:
print("User: {}, Email: {}, Age: {}".format(
user["username"], user["email"], user["age"]
))
# Check current migration version
version = Migration.get_current_version(db)
print("Current migration version: {}".format(version))
# Roll back the last migration
rolled_back = Migration.rollback_migration(db, migrations[-1])
print("Rolled back migration: {}".format(rolled_back.version))
# Verify the age column is gone
with db.cursor() as cursor:
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
for user in users:
print("User after rollback: {}, Email: {}".format(
user["username"], user["email"]
))
# Age column should no longer exist
finally:
# Clean up
try:
db.execute("DROP TABLE IF EXISTS migrations")
db.execute("DROP TABLE IF EXISTS users")
except Exception as e:
print("Cleanup error: {}".format(e))
db.close()
if __name__ == "__main__":
sync_migration_example()
Asynchronous Migrations
This example shows how to create and apply migrations using the asynchronous API:
import os
import asyncio
from manticore_cockroachdb import AsyncDatabase, AsyncMigration
async def async_migration_example():
# Connect to database
db = AsyncDatabase(database="testdb", host="localhost")
await db.connect()
try:
# Create migration directory if it doesn't exist
migrations_dir = "async_migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create migration files
await AsyncMigration.create_migration(
"001_create_products_table",
"""
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
""",
"""
DROP TABLE products;
""",
directory=migrations_dir
)
await AsyncMigration.create_migration(
"002_add_description_column",
"""
ALTER TABLE products ADD COLUMN description TEXT;
""",
"""
ALTER TABLE products DROP COLUMN description;
""",
directory=migrations_dir
)
await AsyncMigration.create_migration(
"003_add_category_column",
"""
ALTER TABLE products ADD COLUMN category TEXT NOT NULL DEFAULT 'Uncategorized';
""",
"""
ALTER TABLE products DROP COLUMN category;
""",
directory=migrations_dir
)
# Load migrations
migrations = await AsyncMigration.load_migrations(migrations_dir)
print("Loaded {} migrations".format(len(migrations)))
# Print migration details
for m in migrations:
print(" Version {}: {}".format(m.version, m.description))
# Apply all migrations
applied = await AsyncMigration.apply_migrations(db, migrations)
print("Applied {} migrations".format(len(applied)))
# Insert test data
async with db.cursor() as cursor:
await cursor.execute(
"INSERT INTO products (name, price, description, category) VALUES (%s, %s, %s, %s)",
("Laptop", 1299.99, "High-performance laptop", "Electronics")
)
await cursor.execute(
"INSERT INTO products (name, price, description, category) VALUES (%s, %s, %s, %s)",
("Headphones", 199.99, "Noise-cancelling headphones", "Audio")
)
# Query the data
async with db.cursor() as cursor:
await cursor.execute("SELECT * FROM products")
products = await cursor.fetchall()
for product in products:
print("Product: {}, Price: ${:.2f}, Category: {}, Description: {}".format(
product["name"], product["price"], product["category"], product["description"]
))
# Check current migration version
version = await AsyncMigration.get_current_version(db)
print("Current migration version: {}".format(version))
# Roll back the last migration
rolled_back = await AsyncMigration.rollback_migration(db, migrations[-1])
print("Rolled back migration: {}".format(rolled_back.version))
# Roll back another migration
rolled_back = await AsyncMigration.rollback_migration(db, migrations[-2])
print("Rolled back migration: {}".format(rolled_back.version))
# Verify columns are gone
async with db.cursor() as cursor:
await cursor.execute("SELECT * FROM products")
products = await cursor.fetchall()
for product in products:
print("Product after rollbacks: {}, Price: ${:.2f}".format(
product["name"], product["price"]
))
# Description and category columns should no longer exist
finally:
# Clean up
try:
await db.execute("DROP TABLE IF EXISTS migrations")
await db.execute("DROP TABLE IF EXISTS products")
except Exception as e:
print("Cleanup error: {}".format(e))
await db.close()
if __name__ == "__main__":
asyncio.run(async_migration_example())
Migration with Transactions
This example demonstrates how to use transactions with migrations for atomic schema changes:
import os
from manticore_cockroachdb import Database, Migration
def transaction_migration_example():
# Connect to database
db = Database(database="testdb", host="localhost")
db.connect()
try:
# Create migration directory
migrations_dir = "transaction_migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create a migration with multiple operations in a transaction
Migration.create_migration(
"001_create_related_tables",
"""
-- Create categories table
CREATE TABLE categories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL UNIQUE,
description TEXT
);
-- Create items table with foreign key
CREATE TABLE items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
category_id UUID NOT NULL REFERENCES categories(id),
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Create index for faster lookups
CREATE INDEX items_category_idx ON items(category_id);
""",
"""
DROP TABLE IF EXISTS items;
DROP TABLE IF EXISTS categories;
""",
directory=migrations_dir
)
# Load and apply migrations
migrations = Migration.load_migrations(migrations_dir)
applied = Migration.apply_migrations(db, migrations)
print("Applied {} migrations".format(len(applied)))
# Insert test data using transactions
def insert_test_data(conn):
with conn.cursor() as cur:
# Insert categories
cur.execute(
"INSERT INTO categories (name, description) VALUES (%s, %s) RETURNING id",
("Electronics", "Electronic devices and gadgets")
)
electronics_id = cur.fetchone()["id"]
cur.execute(
"INSERT INTO categories (name, description) VALUES (%s, %s) RETURNING id",
("Books", "Books and publications")
)
books_id = cur.fetchone()["id"]
# Insert items
items = [
("Smartphone", electronics_id, 699.99),
("Tablet", electronics_id, 349.99),
("Novel", books_id, 14.99),
("Textbook", books_id, 79.99)
]
for name, category_id, price in items:
cur.execute(
"INSERT INTO items (name, category_id, price) VALUES (%s, %s, %s)",
(name, category_id, price)
)
return {
"electronics_id": electronics_id,
"books_id": books_id
}
# Run in transaction
result = db.run_in_transaction(insert_test_data)
print("Inserted test data with category IDs: {}".format(result))
# Query the data with a join
with db.cursor() as cursor:
cursor.execute("""
SELECT i.name as item_name, i.price, c.name as category_name
FROM items i
JOIN categories c ON i.category_id = c.id
ORDER BY c.name, i.price DESC
""")
items = cursor.fetchall()
print("\nItems by category:")
for item in items:
print(" {} ({}): ${:.2f}".format(
item["item_name"], item["category_name"], item["price"]
))
finally:
# Clean up
try:
db.execute("DROP TABLE IF EXISTS items")
db.execute("DROP TABLE IF EXISTS categories")
db.execute("DROP TABLE IF EXISTS migrations")
except Exception as e:
print("Cleanup error: {}".format(e))
db.close()
if __name__ == "__main__":
transaction_migration_example()
Migration Status and History
This example shows how to check migration status and history:
```python import os from manticore_cockroachdb import Database, Migration
def migration_status_example(): # Connect to database db = Database(database="testdb", host="localhost") db.connect()
try:
# Create migration directory
migrations_dir = "status_migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create several migrations
for i in range(1, 6):
version = f"{i:03d}"
Migration.create_migration(
f"{version}_migration_{i}",
f"-- Migration {i} up\nCREATE TABLE IF NOT EXISTS table_{i} (id SERIAL PRIMARY KEY, name TEXT);",
f"-- Migration {i} down\nDROP TABLE IF EXISTS table_{i};",
directory=migrations_dir
)
# Load migrations
migrations = Migration.load_migrations(migrations_dir)
print("Loaded {} migrations".format(len(migrations)))
# Apply only the first 3 migrations
applied = Migration.apply_migrations(db, migrations[:3])
print("Applied {} migrations".format(len(applied)))
# Check current version
version = Migration.get_current_version(db)
print("Current migration version: {}".format(version))
# Get migration history
history = Migration.get_migration_history(db)
print("\nMigration history:")
for entry in history:
print(" Version: {}, Applied at: {}, Description: {}".format(
entry["version"], entry["applied_at"], entry["description"]
))
# Check pending migrations
pending = Migration.get_pending_migrations(db, migrations)
print("\nPending migrations:")
for m in pending:
print(" Version: {}, Description: {}".format(m.version, m.description))
# Apply one more migration
applied = Migration.apply_migrations(db, [pending[0]])
print("\nApplied 1 more migration: {}".format(applied[0].version))
# Check status again
version = Migration.get_current_version(db)
print("Current migration version: {}".format(version))
# Roll back to version 002
target_version = "002"
rolled_back = Migration.rollback_to_version(db, migrations, target_version)
print("\nRolled back to version {}, migrations rolled back: {}".format(
target_version, len(rolled_back)
))
# Verify current version
version = Migration.get_current_version(db)
print("Current migration version after rollback: {}".format(version))
finally:
# Clean up
try:
for i in range(1, 6):
db.execute(f"DROP TABLE IF EXISTS table_{i}")
db.execute("DROP TABLE IF EXISTS migrations")
except Exception as e:
print("Cleanup error: {}".format(e))
db.close()
if name == "main": migration_status_example()