Skip to content

Migration Examples

This page provides practical examples of using the migration features of the Manticore CockroachDB client. These examples demonstrate how to manage database schema changes effectively.

Synchronous Migrations

This example shows how to create and apply migrations using the synchronous API:

import os
from manticore_cockroachdb import Database, Migration

def sync_migration_example():
    # Connect to database
    db = Database(database="testdb", host="localhost")
    db.connect()

    try:
        # Create migration directory if it doesn't exist
        migrations_dir = "migrations"
        if not os.path.exists(migrations_dir):
            os.makedirs(migrations_dir)

        # Create migration files
        Migration.create_migration(
            "001_create_users_table",
            """
            CREATE TABLE users (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                username TEXT NOT NULL UNIQUE,
                email TEXT NOT NULL,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            );
            """,
            """
            DROP TABLE users;
            """,
            directory=migrations_dir
        )

        Migration.create_migration(
            "002_add_age_column",
            """
            ALTER TABLE users ADD COLUMN age INTEGER;
            """,
            """
            ALTER TABLE users DROP COLUMN age;
            """,
            directory=migrations_dir
        )

        # Load migrations
        migrations = Migration.load_migrations(migrations_dir)
        print("Loaded {} migrations".format(len(migrations)))

        # Print migration details
        for m in migrations:
            print("  Version {}: {}".format(m.version, m.description))

        # Apply all migrations
        applied = Migration.apply_migrations(db, migrations)
        print("Applied {} migrations".format(len(applied)))

        # Insert test data
        db.execute(
            "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
            ("testuser", "test@example.com", 30)
        )

        # Query the data
        with db.cursor() as cursor:
            cursor.execute("SELECT * FROM users")
            users = cursor.fetchall()
            for user in users:
                print("User: {}, Email: {}, Age: {}".format(
                    user["username"], user["email"], user["age"]
                ))

        # Check current migration version
        version = Migration.get_current_version(db)
        print("Current migration version: {}".format(version))

        # Roll back the last migration
        rolled_back = Migration.rollback_migration(db, migrations[-1])
        print("Rolled back migration: {}".format(rolled_back.version))

        # Verify the age column is gone
        with db.cursor() as cursor:
            cursor.execute("SELECT * FROM users")
            users = cursor.fetchall()
            for user in users:
                print("User after rollback: {}, Email: {}".format(
                    user["username"], user["email"]
                ))
                # Age column should no longer exist

    finally:
        # Clean up
        try:
            db.execute("DROP TABLE IF EXISTS migrations")
            db.execute("DROP TABLE IF EXISTS users")
        except Exception as e:
            print("Cleanup error: {}".format(e))
        db.close()

if __name__ == "__main__":
    sync_migration_example()

Asynchronous Migrations

This example shows how to create and apply migrations using the asynchronous API:

import os
import asyncio
import shutil
from pathlib import Path
from manticore_cockroachdb.async_database import AsyncDatabase
from manticore_cockroachdb.async_migration import AsyncMigrator

async def async_migration_example():
    # Connect to database
    db = AsyncDatabase(database="testdb", host="localhost")
    await db.connect()

    # Setup migrations directory
    migrations_dir = Path("./async_migrations")
    if migrations_dir.exists():
        shutil.rmtree(migrations_dir)
    migrations_dir.mkdir(exist_ok=True)

    try:
        # Create migration instance
        migration = AsyncMigrator(db, migrations_dir=str(migrations_dir))

        # Ensure migrations table exists
        await migration.initialize()

        # Create migration files
        await migration.create_migration(
            "create products table",
            """
            CREATE TABLE products (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                name TEXT NOT NULL,
                price DECIMAL(10,2) NOT NULL,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            );
            """,
            """
            DROP TABLE products;
            """
        )

        await migration.create_migration(
            "add description column",
            """
            ALTER TABLE products ADD COLUMN description TEXT;
            """,
            """
            ALTER TABLE products DROP COLUMN description;
            """
        )

        await migration.create_migration(
            "add category column",
            """
            ALTER TABLE products ADD COLUMN category TEXT NOT NULL DEFAULT 'Uncategorized';
            """,
            """
            ALTER TABLE products DROP COLUMN category;
            """
        )

        # List migration files
        print("Migration files created:")
        for file in sorted(migrations_dir.glob("*.sql")):
            print(f"  {file.name}")

        # Load migrations
        migrations = await migration.load_migrations()
        print(f"Loaded {len(migrations)} migrations")

        # Print migration details
        for m in migrations:
            print(f"  Version {m.version}: {m.description}")

        # Apply all migrations
        await migration.migrate()
        print("Migrations applied successfully")

        # Insert test data
        await db.execute(
            "INSERT INTO products (name, price, description, category) VALUES (%s, %s, %s, %s)",
            ("Laptop", 1299.99, "High-performance laptop", "Electronics")
        )

        await db.execute(
            "INSERT INTO products (name, price, description, category) VALUES (%s, %s, %s, %s)",
            ("Headphones", 199.99, "Noise-cancelling headphones", "Audio")
        )

        # Query the data
        products = await db.execute("SELECT * FROM products")
        for product in products:
            print(f"Product: {product['name']}, Price: ${product['price']:.2f}, " +
                  f"Category: {product['category']}, Description: {product['description']}")

        # Manual migration reversion (AsyncMigrator doesn't have a built-in rollback method)
        print("Manually reverting the last migration")
        try:
            # Get the last migration
            last_migration = max(migrations, key=lambda m: m.version)

            # Execute the down SQL directly
            if last_migration.down_sql:
                # Execute the down SQL to revert the schema changes
                await db.execute(last_migration.down_sql)

                # Remove the migration record from the _migrations table
                await db.execute(
                    "DELETE FROM _migrations WHERE version = %s",
                    (last_migration.version,)
                )
                print(f"Manually reverted migration V{last_migration.version}")
            else:
                print(f"No down SQL for migration V{last_migration.version}")
        except Exception as e:
            print(f"Error reverting migration: {e}")

        # Verify the category column is gone
        try:
            await db.execute("SELECT category FROM products LIMIT 1")
            print("Category column still exists (revert failed)")
        except Exception:
            print("Category column removed successfully (revert worked)")

        # Reapply the migration that was reverted
        print("Applying all migrations again")
        await migration.migrate()
        print("Migrations reapplied successfully")

        # Verify all columns are present
        products = await db.execute("SELECT * FROM products")
        for product in products:
            print(f"Product after reapplying: {product['name']}, Price: ${product['price']:.2f}, " +
                  f"Category: {product['category']}, Description: {product['description']}")

    finally:
        # Clean up
        try:
            await db.drop_table("products", if_exists=True)
            await db.drop_table("_migrations", if_exists=True)
        except Exception as e:
            print(f"Cleanup error: {e}")

        # Close database connection
        await db.close()

        # Remove migrations directory
        shutil.rmtree(migrations_dir)

if __name__ == "__main__":
    asyncio.run(async_migration_example())

Migration with Transactions

This example demonstrates how to use transactions with migrations for atomic schema changes:

import os
from manticore_cockroachdb import Database, Migration

def transaction_migration_example():
    # Connect to database
    db = Database(database="testdb", host="localhost")
    db.connect()

    try:
        # Create migration directory
        migrations_dir = "transaction_migrations"
        if not os.path.exists(migrations_dir):
            os.makedirs(migrations_dir)

        # Create a migration with multiple operations in a transaction
        Migration.create_migration(
            "001_create_related_tables",
            """
            -- Create categories table
            CREATE TABLE categories (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                name TEXT NOT NULL UNIQUE,
                description TEXT
            );

            -- Create items table with foreign key
            CREATE TABLE items (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                name TEXT NOT NULL,
                category_id UUID NOT NULL REFERENCES categories(id),
                price DECIMAL(10,2) NOT NULL,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            );

            -- Create index for faster lookups
            CREATE INDEX items_category_idx ON items(category_id);
            """,
            """
            DROP TABLE IF EXISTS items;
            DROP TABLE IF EXISTS categories;
            """,
            directory=migrations_dir
        )

        # Load and apply migrations
        migrations = Migration.load_migrations(migrations_dir)
        applied = Migration.apply_migrations(db, migrations)
        print("Applied {} migrations".format(len(applied)))

        # Insert test data using transactions
        def insert_test_data(conn):
            with conn.cursor() as cur:
                # Insert categories
                cur.execute(
                    "INSERT INTO categories (name, description) VALUES (%s, %s) RETURNING id",
                    ("Electronics", "Electronic devices and gadgets")
                )
                electronics_id = cur.fetchone()["id"]

                cur.execute(
                    "INSERT INTO categories (name, description) VALUES (%s, %s) RETURNING id",
                    ("Books", "Books and publications")
                )
                books_id = cur.fetchone()["id"]

                # Insert items
                items = [
                    ("Smartphone", electronics_id, 699.99),
                    ("Tablet", electronics_id, 349.99),
                    ("Novel", books_id, 14.99),
                    ("Textbook", books_id, 79.99)
                ]

                for name, category_id, price in items:
                    cur.execute(
                        "INSERT INTO items (name, category_id, price) VALUES (%s, %s, %s)",
                        (name, category_id, price)
                    )

                return {
                    "electronics_id": electronics_id,
                    "books_id": books_id
                }

        # Run in transaction
        result = db.run_in_transaction(insert_test_data)
        print("Inserted test data with category IDs: {}".format(result))

        # Query the data with a join
        with db.cursor() as cursor:
            cursor.execute("""
                SELECT i.name as item_name, i.price, c.name as category_name
                FROM items i
                JOIN categories c ON i.category_id = c.id
                ORDER BY c.name, i.price DESC
            """)

            items = cursor.fetchall()
            print("\nItems by category:")
            for item in items:
                print("  {} ({}): ${:.2f}".format(
                    item["item_name"], item["category_name"], item["price"]
                ))

    finally:
        # Clean up
        try:
            db.execute("DROP TABLE IF EXISTS items")
            db.execute("DROP TABLE IF EXISTS categories")
            db.execute("DROP TABLE IF EXISTS migrations")
        except Exception as e:
            print("Cleanup error: {}".format(e))
        db.close()

if __name__ == "__main__":
    transaction_migration_example()

Migration Status and History

This example shows how to check migration status and history:

```python import os from manticore_cockroachdb import Database, Migration

def migration_status_example(): # Connect to database db = Database(database="testdb", host="localhost") db.connect()

try:
    # Create migration directory
    migrations_dir = "status_migrations"
    if not os.path.exists(migrations_dir):
        os.makedirs(migrations_dir)

    # Create several migrations
    for i in range(1, 6):
        version = f"{i:03d}"
        Migration.create_migration(
            f"{version}_migration_{i}",
            f"-- Migration {i} up\nCREATE TABLE IF NOT EXISTS table_{i} (id SERIAL PRIMARY KEY, name TEXT);",
            f"-- Migration {i} down\nDROP TABLE IF EXISTS table_{i};",
            directory=migrations_dir
        )

    # Load migrations
    migrations = Migration.load_migrations(migrations_dir)
    print("Loaded {} migrations".format(len(migrations)))

    # Apply only the first 3 migrations
    applied = Migration.apply_migrations(db, migrations[:3])
    print("Applied {} migrations".format(len(applied)))

    # Check current version
    version = Migration.get_current_version(db)
    print("Current migration version: {}".format(version))

    # Get migration history
    history = Migration.get_migration_history(db)
    print("\nMigration history:")
    for entry in history:
        print("  Version: {}, Applied at: {}, Description: {}".format(
            entry["version"], entry["applied_at"], entry["description"]
        ))

    # Check pending migrations
    pending = Migration.get_pending_migrations(db, migrations)
    print("\nPending migrations:")
    for m in pending:
        print("  Version: {}, Description: {}".format(m.version, m.description))

    # Apply one more migration
    applied = Migration.apply_migrations(db, [pending[0]])
    print("\nApplied 1 more migration: {}".format(applied[0].version))

    # Check status again
    version = Migration.get_current_version(db)
    print("Current migration version: {}".format(version))

    # Roll back to version 002
    target_version = "002"
    rolled_back = Migration.rollback_to_version(db, migrations, target_version)
    print("\nRolled back to version {}, migrations rolled back: {}".format(
        target_version, len(rolled_back)
    ))

    # Verify current version
    version = Migration.get_current_version(db)
    print("Current migration version after rollback: {}".format(version))

finally:
    # Clean up
    try:
        for i in range(1, 6):
            db.execute(f"DROP TABLE IF EXISTS table_{i}")
        db.execute("DROP TABLE IF EXISTS migrations")
    except Exception as e:
        print("Cleanup error: {}".format(e))
    db.close()

if name == "main": migration_status_example()