Migration Examples
This page provides practical examples of using the migration features of the Manticore CockroachDB client. These examples demonstrate how to manage database schema changes effectively.
Synchronous Migrations
This example shows how to create and apply migrations using the synchronous API:
import os
from manticore_cockroachdb import Database, Migration
def sync_migration_example():
# Connect to database
db = Database(database="testdb", host="localhost")
db.connect()
try:
# Create migration directory if it doesn't exist
migrations_dir = "migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create migration files
Migration.create_migration(
"001_create_users_table",
"""
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
""",
"""
DROP TABLE users;
""",
directory=migrations_dir
)
Migration.create_migration(
"002_add_age_column",
"""
ALTER TABLE users ADD COLUMN age INTEGER;
""",
"""
ALTER TABLE users DROP COLUMN age;
""",
directory=migrations_dir
)
# Load migrations
migrations = Migration.load_migrations(migrations_dir)
print("Loaded {} migrations".format(len(migrations)))
# Print migration details
for m in migrations:
print(" Version {}: {}".format(m.version, m.description))
# Apply all migrations
applied = Migration.apply_migrations(db, migrations)
print("Applied {} migrations".format(len(applied)))
# Insert test data
db.execute(
"INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
("testuser", "test@example.com", 30)
)
# Query the data
with db.cursor() as cursor:
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
for user in users:
print("User: {}, Email: {}, Age: {}".format(
user["username"], user["email"], user["age"]
))
# Check current migration version
version = Migration.get_current_version(db)
print("Current migration version: {}".format(version))
# Roll back the last migration
rolled_back = Migration.rollback_migration(db, migrations[-1])
print("Rolled back migration: {}".format(rolled_back.version))
# Verify the age column is gone
with db.cursor() as cursor:
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
for user in users:
print("User after rollback: {}, Email: {}".format(
user["username"], user["email"]
))
# Age column should no longer exist
finally:
# Clean up
try:
db.execute("DROP TABLE IF EXISTS migrations")
db.execute("DROP TABLE IF EXISTS users")
except Exception as e:
print("Cleanup error: {}".format(e))
db.close()
if __name__ == "__main__":
sync_migration_example()
Asynchronous Migrations
This example shows how to create and apply migrations using the asynchronous API:
import os
import asyncio
import shutil
from pathlib import Path
from manticore_cockroachdb.async_database import AsyncDatabase
from manticore_cockroachdb.async_migration import AsyncMigrator
async def async_migration_example():
# Connect to database
db = AsyncDatabase(database="testdb", host="localhost")
await db.connect()
# Setup migrations directory
migrations_dir = Path("./async_migrations")
if migrations_dir.exists():
shutil.rmtree(migrations_dir)
migrations_dir.mkdir(exist_ok=True)
try:
# Create migration instance
migration = AsyncMigrator(db, migrations_dir=str(migrations_dir))
# Ensure migrations table exists
await migration.initialize()
# Create migration files
await migration.create_migration(
"create products table",
"""
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
""",
"""
DROP TABLE products;
"""
)
await migration.create_migration(
"add description column",
"""
ALTER TABLE products ADD COLUMN description TEXT;
""",
"""
ALTER TABLE products DROP COLUMN description;
"""
)
await migration.create_migration(
"add category column",
"""
ALTER TABLE products ADD COLUMN category TEXT NOT NULL DEFAULT 'Uncategorized';
""",
"""
ALTER TABLE products DROP COLUMN category;
"""
)
# List migration files
print("Migration files created:")
for file in sorted(migrations_dir.glob("*.sql")):
print(f" {file.name}")
# Load migrations
migrations = await migration.load_migrations()
print(f"Loaded {len(migrations)} migrations")
# Print migration details
for m in migrations:
print(f" Version {m.version}: {m.description}")
# Apply all migrations
await migration.migrate()
print("Migrations applied successfully")
# Insert test data
await db.execute(
"INSERT INTO products (name, price, description, category) VALUES (%s, %s, %s, %s)",
("Laptop", 1299.99, "High-performance laptop", "Electronics")
)
await db.execute(
"INSERT INTO products (name, price, description, category) VALUES (%s, %s, %s, %s)",
("Headphones", 199.99, "Noise-cancelling headphones", "Audio")
)
# Query the data
products = await db.execute("SELECT * FROM products")
for product in products:
print(f"Product: {product['name']}, Price: ${product['price']:.2f}, " +
f"Category: {product['category']}, Description: {product['description']}")
# Manual migration reversion (AsyncMigrator doesn't have a built-in rollback method)
print("Manually reverting the last migration")
try:
# Get the last migration
last_migration = max(migrations, key=lambda m: m.version)
# Execute the down SQL directly
if last_migration.down_sql:
# Execute the down SQL to revert the schema changes
await db.execute(last_migration.down_sql)
# Remove the migration record from the _migrations table
await db.execute(
"DELETE FROM _migrations WHERE version = %s",
(last_migration.version,)
)
print(f"Manually reverted migration V{last_migration.version}")
else:
print(f"No down SQL for migration V{last_migration.version}")
except Exception as e:
print(f"Error reverting migration: {e}")
# Verify the category column is gone
try:
await db.execute("SELECT category FROM products LIMIT 1")
print("Category column still exists (revert failed)")
except Exception:
print("Category column removed successfully (revert worked)")
# Reapply the migration that was reverted
print("Applying all migrations again")
await migration.migrate()
print("Migrations reapplied successfully")
# Verify all columns are present
products = await db.execute("SELECT * FROM products")
for product in products:
print(f"Product after reapplying: {product['name']}, Price: ${product['price']:.2f}, " +
f"Category: {product['category']}, Description: {product['description']}")
finally:
# Clean up
try:
await db.drop_table("products", if_exists=True)
await db.drop_table("_migrations", if_exists=True)
except Exception as e:
print(f"Cleanup error: {e}")
# Close database connection
await db.close()
# Remove migrations directory
shutil.rmtree(migrations_dir)
if __name__ == "__main__":
asyncio.run(async_migration_example())
Migration with Transactions
This example demonstrates how to use transactions with migrations for atomic schema changes:
import os
from manticore_cockroachdb import Database, Migration
def transaction_migration_example():
# Connect to database
db = Database(database="testdb", host="localhost")
db.connect()
try:
# Create migration directory
migrations_dir = "transaction_migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create a migration with multiple operations in a transaction
Migration.create_migration(
"001_create_related_tables",
"""
-- Create categories table
CREATE TABLE categories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL UNIQUE,
description TEXT
);
-- Create items table with foreign key
CREATE TABLE items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
category_id UUID NOT NULL REFERENCES categories(id),
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Create index for faster lookups
CREATE INDEX items_category_idx ON items(category_id);
""",
"""
DROP TABLE IF EXISTS items;
DROP TABLE IF EXISTS categories;
""",
directory=migrations_dir
)
# Load and apply migrations
migrations = Migration.load_migrations(migrations_dir)
applied = Migration.apply_migrations(db, migrations)
print("Applied {} migrations".format(len(applied)))
# Insert test data using transactions
def insert_test_data(conn):
with conn.cursor() as cur:
# Insert categories
cur.execute(
"INSERT INTO categories (name, description) VALUES (%s, %s) RETURNING id",
("Electronics", "Electronic devices and gadgets")
)
electronics_id = cur.fetchone()["id"]
cur.execute(
"INSERT INTO categories (name, description) VALUES (%s, %s) RETURNING id",
("Books", "Books and publications")
)
books_id = cur.fetchone()["id"]
# Insert items
items = [
("Smartphone", electronics_id, 699.99),
("Tablet", electronics_id, 349.99),
("Novel", books_id, 14.99),
("Textbook", books_id, 79.99)
]
for name, category_id, price in items:
cur.execute(
"INSERT INTO items (name, category_id, price) VALUES (%s, %s, %s)",
(name, category_id, price)
)
return {
"electronics_id": electronics_id,
"books_id": books_id
}
# Run in transaction
result = db.run_in_transaction(insert_test_data)
print("Inserted test data with category IDs: {}".format(result))
# Query the data with a join
with db.cursor() as cursor:
cursor.execute("""
SELECT i.name as item_name, i.price, c.name as category_name
FROM items i
JOIN categories c ON i.category_id = c.id
ORDER BY c.name, i.price DESC
""")
items = cursor.fetchall()
print("\nItems by category:")
for item in items:
print(" {} ({}): ${:.2f}".format(
item["item_name"], item["category_name"], item["price"]
))
finally:
# Clean up
try:
db.execute("DROP TABLE IF EXISTS items")
db.execute("DROP TABLE IF EXISTS categories")
db.execute("DROP TABLE IF EXISTS migrations")
except Exception as e:
print("Cleanup error: {}".format(e))
db.close()
if __name__ == "__main__":
transaction_migration_example()
Migration Status and History
This example shows how to check migration status and history:
```python import os from manticore_cockroachdb import Database, Migration
def migration_status_example(): # Connect to database db = Database(database="testdb", host="localhost") db.connect()
try:
# Create migration directory
migrations_dir = "status_migrations"
if not os.path.exists(migrations_dir):
os.makedirs(migrations_dir)
# Create several migrations
for i in range(1, 6):
version = f"{i:03d}"
Migration.create_migration(
f"{version}_migration_{i}",
f"-- Migration {i} up\nCREATE TABLE IF NOT EXISTS table_{i} (id SERIAL PRIMARY KEY, name TEXT);",
f"-- Migration {i} down\nDROP TABLE IF EXISTS table_{i};",
directory=migrations_dir
)
# Load migrations
migrations = Migration.load_migrations(migrations_dir)
print("Loaded {} migrations".format(len(migrations)))
# Apply only the first 3 migrations
applied = Migration.apply_migrations(db, migrations[:3])
print("Applied {} migrations".format(len(applied)))
# Check current version
version = Migration.get_current_version(db)
print("Current migration version: {}".format(version))
# Get migration history
history = Migration.get_migration_history(db)
print("\nMigration history:")
for entry in history:
print(" Version: {}, Applied at: {}, Description: {}".format(
entry["version"], entry["applied_at"], entry["description"]
))
# Check pending migrations
pending = Migration.get_pending_migrations(db, migrations)
print("\nPending migrations:")
for m in pending:
print(" Version: {}, Description: {}".format(m.version, m.description))
# Apply one more migration
applied = Migration.apply_migrations(db, [pending[0]])
print("\nApplied 1 more migration: {}".format(applied[0].version))
# Check status again
version = Migration.get_current_version(db)
print("Current migration version: {}".format(version))
# Roll back to version 002
target_version = "002"
rolled_back = Migration.rollback_to_version(db, migrations, target_version)
print("\nRolled back to version {}, migrations rolled back: {}".format(
target_version, len(rolled_back)
))
# Verify current version
version = Migration.get_current_version(db)
print("Current migration version after rollback: {}".format(version))
finally:
# Clean up
try:
for i in range(1, 6):
db.execute(f"DROP TABLE IF EXISTS table_{i}")
db.execute("DROP TABLE IF EXISTS migrations")
except Exception as e:
print("Cleanup error: {}".format(e))
db.close()
if name == "main": migration_status_example()