Migrations API
The Manticore CockroachDB client provides both synchronous and asynchronous migration support to help manage database schema changes.
Synchronous Migration
Migration
Database migration.
Source code in manticore_cockroachdb/migration.py
| class Migration:
"""Database migration."""
def __init__(
self,
version: int,
description: str,
up_sql: str,
down_sql: Optional[str] = None
):
"""Initialize migration.
Args:
version: Migration version number
description: Migration description
up_sql: SQL to apply migration
down_sql: SQL to revert migration
"""
self.version = version
self.description = description
self.up_sql = up_sql
self.down_sql = down_sql
# Add name attribute for tests
self.name = description
|
Functions
__init__(version, description, up_sql, down_sql=None)
Initialize migration.
Parameters:
| Name |
Type |
Description |
Default |
version
|
int
|
|
required
|
description
|
str
|
|
required
|
up_sql
|
str
|
|
required
|
down_sql
|
Optional[str]
|
|
None
|
Source code in manticore_cockroachdb/migration.py
| def __init__(
self,
version: int,
description: str,
up_sql: str,
down_sql: Optional[str] = None
):
"""Initialize migration.
Args:
version: Migration version number
description: Migration description
up_sql: SQL to apply migration
down_sql: SQL to revert migration
"""
self.version = version
self.description = description
self.up_sql = up_sql
self.down_sql = down_sql
# Add name attribute for tests
self.name = description
|
Asynchronous Migration
Note: The AsyncMigrator class supports two methods for reverting migrations:
1. Using the migrate() method with a target_version parameter to revert to a specific version
2. Manually executing the down SQL and removing migration records for more fine-grained control
AsyncMigrator
Asynchronous database schema migrator.
This class provides tools for managing database schema migrations asynchronously.
It supports:
- Creating migration files with up and down SQL
- Loading migrations from the filesystem
- Applying migrations to update the database schema
- Reverting migrations to roll back schema changes
- Tracking migration history in the database
Source code in manticore_cockroachdb/async_migration.py
| class AsyncMigrator:
"""Asynchronous database schema migrator.
This class provides tools for managing database schema migrations asynchronously.
It supports:
- Creating migration files with up and down SQL
- Loading migrations from the filesystem
- Applying migrations to update the database schema
- Reverting migrations to roll back schema changes
- Tracking migration history in the database
"""
def __init__(self, db: AsyncDatabase, migrations_dir: str = "migrations"):
"""Initialize migrator.
Args:
db: AsyncDatabase instance
migrations_dir: Directory containing migration files
"""
self.db = db
self.migrations_dir = migrations_dir
async def initialize(self) -> None:
"""Initialize migrator and ensure migrations table exists."""
await self._ensure_migrations_table()
async def _ensure_migrations_table(self) -> None:
"""Create migrations table if it doesn't exist."""
await self.db.execute(
"""
CREATE TABLE IF NOT EXISTS _migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
async def get_applied_versions(self) -> List[int]:
"""Get list of applied migration versions.
Returns:
List of applied version numbers
"""
result = await self.db.execute(
"SELECT version FROM _migrations ORDER BY version"
)
return [row["version"] for row in result]
async def load_migrations(self) -> List[AsyncMigration]:
"""Load migrations from migrations directory.
Returns:
List of migrations
"""
migrations = []
migrations_path = Path(self.migrations_dir)
if not migrations_path.exists():
logger.warning(f"Migrations directory {self.migrations_dir} does not exist")
return migrations
# Get all SQL files
sql_files = sorted(migrations_path.glob("V*__*.sql"))
for file_path in sql_files:
try:
# Parse version and description from filename
# Filename format: V{version}__{description}.sql
file_name = file_path.name
version_part, description_part = file_name.split("__", 1)
version = int(version_part[1:]) # Remove 'V' prefix
description = description_part.split(".")[0].replace("_", " ")
# Read SQL file
with open(file_path, "r") as f:
up_sql = f.read()
# Check for down migration
down_path = file_path.with_name(f"U{version_part[1:]}__undo_{description_part}")
down_sql = None
if down_path.exists():
with open(down_path, "r") as f:
down_sql = f.read()
# Create migration
migration = AsyncMigration(
version=version,
description=description,
up_sql=up_sql,
down_sql=down_sql
)
migrations.append(migration)
logger.debug(f"Loaded migration V{version}: {description}")
except Exception as e:
logger.error(f"Failed to load migration {file_path}: {e}")
return sorted(migrations, key=lambda m: m.version)
async def create_migration(
self,
description: str,
up_sql: str,
down_sql: Optional[str] = None
) -> None:
"""Create a new migration file.
Args:
description: Migration description
up_sql: SQL to apply migration
down_sql: SQL to revert migration
"""
migrations_path = Path(self.migrations_dir)
# Create migrations directory if it doesn't exist
if not migrations_path.exists():
migrations_path.mkdir(parents=True)
logger.info(f"Created migrations directory {self.migrations_dir}")
# Get next version number
applied_versions = await self.get_applied_versions()
loaded_migrations = await self.load_migrations()
loaded_versions = [m.version for m in loaded_migrations]
all_versions = set(applied_versions + loaded_versions)
next_version = 1
if all_versions:
next_version = max(all_versions) + 1
# Format description for filename
file_description = description.lower().replace(" ", "_")
# Create migration file
file_path = migrations_path / f"V{next_version}__{file_description}.sql"
with open(file_path, "w") as f:
f.write(up_sql)
logger.info(f"Created migration V{next_version}: {description}")
# Create undo migration file if provided
if down_sql:
down_path = migrations_path / f"U{next_version}__undo_{file_description}.sql"
with open(down_path, "w") as f:
f.write(down_sql)
logger.info(f"Created undo migration U{next_version}")
async def migrate(self, target_version: Optional[int] = None) -> None:
"""Apply or revert migrations to reach target version.
This method can be used for both applying and reverting migrations:
- To apply migrations: Call without arguments or with a target_version higher than current
- To revert migrations: Call with a target_version lower than the current version
Args:
target_version: Target version to migrate to.
If None, migrate to latest version.
If lower than current version, revert migrations.
"""
# Ensure migrations table exists
await self._ensure_migrations_table()
# Get applied migrations
applied_versions = await self.get_applied_versions()
current_version = max(applied_versions) if applied_versions else 0
# Load migrations
all_migrations = await self.load_migrations()
available_versions = [m.version for m in all_migrations]
# Determine target version
if target_version is None:
# Migrate to latest version
target_version = max(available_versions) if available_versions else 0
# Nothing to do if we're already at the target version
if current_version == target_version:
logger.info(f"Already at target version {target_version}")
return
logger.info(f"Migrating from version {current_version} to {target_version}")
if target_version > current_version:
# Apply migrations up to target version
migrations_to_apply = [
m for m in all_migrations
if m.version > current_version and m.version <= target_version
]
logger.info(f"Applying {len(migrations_to_apply)} migrations")
for migration in migrations_to_apply:
logger.info(f"Applying migration V{migration.version}: {migration.description}")
try:
# Run migration in transaction
async def apply_migration(conn):
async with conn.cursor() as cur:
await cur.execute(migration.up_sql)
await cur.execute(
"INSERT INTO _migrations (version, description) VALUES (%s, %s)",
(migration.version, migration.description)
)
return None
await self.db.run_in_transaction(apply_migration)
logger.info(f"Applied migration V{migration.version}")
except Exception as e:
logger.error(f"Failed to apply migration V{migration.version}: {e}")
raise
else:
# Revert migrations down to target version
migrations_to_revert = [
m for m in all_migrations
if m.version > target_version and m.version <= current_version
]
# Sort migrations in reverse order for reverting
migrations_to_revert.sort(key=lambda m: m.version, reverse=True)
logger.info(f"Reverting {len(migrations_to_revert)} migrations")
for migration in migrations_to_revert:
if not migration.down_sql:
logger.error(f"Cannot revert migration V{migration.version}: No down SQL")
raise ValueError(f"Cannot revert migration V{migration.version}: No down SQL")
logger.info(f"Reverting migration V{migration.version}: {migration.description}")
try:
# Run revert in transaction
async def revert_migration(conn):
async with conn.cursor() as cur:
await cur.execute(migration.down_sql)
await cur.execute(
"DELETE FROM _migrations WHERE version = %s",
(migration.version,)
)
return None
await self.db.run_in_transaction(revert_migration)
logger.info(f"Reverted migration V{migration.version}")
except Exception as e:
logger.error(f"Failed to revert migration V{migration.version}: {e}")
raise
logger.info(f"Migration to version {target_version} completed")
|
Functions
__init__(db, migrations_dir='migrations')
Initialize migrator.
Parameters:
| Name |
Type |
Description |
Default |
db
|
AsyncDatabase
|
|
required
|
migrations_dir
|
str
|
Directory containing migration files
|
'migrations'
|
Source code in manticore_cockroachdb/async_migration.py
| def __init__(self, db: AsyncDatabase, migrations_dir: str = "migrations"):
"""Initialize migrator.
Args:
db: AsyncDatabase instance
migrations_dir: Directory containing migration files
"""
self.db = db
self.migrations_dir = migrations_dir
|
initialize()
async
Initialize migrator and ensure migrations table exists.
Source code in manticore_cockroachdb/async_migration.py
| async def initialize(self) -> None:
"""Initialize migrator and ensure migrations table exists."""
await self._ensure_migrations_table()
|
_ensure_migrations_table()
async
Create migrations table if it doesn't exist.
Source code in manticore_cockroachdb/async_migration.py
| async def _ensure_migrations_table(self) -> None:
"""Create migrations table if it doesn't exist."""
await self.db.execute(
"""
CREATE TABLE IF NOT EXISTS _migrations (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
|
get_applied_versions()
async
Get list of applied migration versions.
Returns:
| Type |
Description |
List[int]
|
List of applied version numbers
|
Source code in manticore_cockroachdb/async_migration.py
| async def get_applied_versions(self) -> List[int]:
"""Get list of applied migration versions.
Returns:
List of applied version numbers
"""
result = await self.db.execute(
"SELECT version FROM _migrations ORDER BY version"
)
return [row["version"] for row in result]
|
load_migrations()
async
Load migrations from migrations directory.
Returns:
| Type |
Description |
List[AsyncMigration]
|
|
Source code in manticore_cockroachdb/async_migration.py
| async def load_migrations(self) -> List[AsyncMigration]:
"""Load migrations from migrations directory.
Returns:
List of migrations
"""
migrations = []
migrations_path = Path(self.migrations_dir)
if not migrations_path.exists():
logger.warning(f"Migrations directory {self.migrations_dir} does not exist")
return migrations
# Get all SQL files
sql_files = sorted(migrations_path.glob("V*__*.sql"))
for file_path in sql_files:
try:
# Parse version and description from filename
# Filename format: V{version}__{description}.sql
file_name = file_path.name
version_part, description_part = file_name.split("__", 1)
version = int(version_part[1:]) # Remove 'V' prefix
description = description_part.split(".")[0].replace("_", " ")
# Read SQL file
with open(file_path, "r") as f:
up_sql = f.read()
# Check for down migration
down_path = file_path.with_name(f"U{version_part[1:]}__undo_{description_part}")
down_sql = None
if down_path.exists():
with open(down_path, "r") as f:
down_sql = f.read()
# Create migration
migration = AsyncMigration(
version=version,
description=description,
up_sql=up_sql,
down_sql=down_sql
)
migrations.append(migration)
logger.debug(f"Loaded migration V{version}: {description}")
except Exception as e:
logger.error(f"Failed to load migration {file_path}: {e}")
return sorted(migrations, key=lambda m: m.version)
|
create_migration(description, up_sql, down_sql=None)
async
Create a new migration file.
Parameters:
| Name |
Type |
Description |
Default |
description
|
str
|
|
required
|
up_sql
|
str
|
|
required
|
down_sql
|
Optional[str]
|
|
None
|
Source code in manticore_cockroachdb/async_migration.py
| async def create_migration(
self,
description: str,
up_sql: str,
down_sql: Optional[str] = None
) -> None:
"""Create a new migration file.
Args:
description: Migration description
up_sql: SQL to apply migration
down_sql: SQL to revert migration
"""
migrations_path = Path(self.migrations_dir)
# Create migrations directory if it doesn't exist
if not migrations_path.exists():
migrations_path.mkdir(parents=True)
logger.info(f"Created migrations directory {self.migrations_dir}")
# Get next version number
applied_versions = await self.get_applied_versions()
loaded_migrations = await self.load_migrations()
loaded_versions = [m.version for m in loaded_migrations]
all_versions = set(applied_versions + loaded_versions)
next_version = 1
if all_versions:
next_version = max(all_versions) + 1
# Format description for filename
file_description = description.lower().replace(" ", "_")
# Create migration file
file_path = migrations_path / f"V{next_version}__{file_description}.sql"
with open(file_path, "w") as f:
f.write(up_sql)
logger.info(f"Created migration V{next_version}: {description}")
# Create undo migration file if provided
if down_sql:
down_path = migrations_path / f"U{next_version}__undo_{file_description}.sql"
with open(down_path, "w") as f:
f.write(down_sql)
logger.info(f"Created undo migration U{next_version}")
|
migrate(target_version=None)
async
Apply or revert migrations to reach target version.
This method can be used for both applying and reverting migrations:
- To apply migrations: Call without arguments or with a target_version higher than current
- To revert migrations: Call with a target_version lower than the current version
Parameters:
| Name |
Type |
Description |
Default |
target_version
|
Optional[int]
|
Target version to migrate to.
If None, migrate to latest version.
If lower than current version, revert migrations.
|
None
|
Source code in manticore_cockroachdb/async_migration.py
| async def migrate(self, target_version: Optional[int] = None) -> None:
"""Apply or revert migrations to reach target version.
This method can be used for both applying and reverting migrations:
- To apply migrations: Call without arguments or with a target_version higher than current
- To revert migrations: Call with a target_version lower than the current version
Args:
target_version: Target version to migrate to.
If None, migrate to latest version.
If lower than current version, revert migrations.
"""
# Ensure migrations table exists
await self._ensure_migrations_table()
# Get applied migrations
applied_versions = await self.get_applied_versions()
current_version = max(applied_versions) if applied_versions else 0
# Load migrations
all_migrations = await self.load_migrations()
available_versions = [m.version for m in all_migrations]
# Determine target version
if target_version is None:
# Migrate to latest version
target_version = max(available_versions) if available_versions else 0
# Nothing to do if we're already at the target version
if current_version == target_version:
logger.info(f"Already at target version {target_version}")
return
logger.info(f"Migrating from version {current_version} to {target_version}")
if target_version > current_version:
# Apply migrations up to target version
migrations_to_apply = [
m for m in all_migrations
if m.version > current_version and m.version <= target_version
]
logger.info(f"Applying {len(migrations_to_apply)} migrations")
for migration in migrations_to_apply:
logger.info(f"Applying migration V{migration.version}: {migration.description}")
try:
# Run migration in transaction
async def apply_migration(conn):
async with conn.cursor() as cur:
await cur.execute(migration.up_sql)
await cur.execute(
"INSERT INTO _migrations (version, description) VALUES (%s, %s)",
(migration.version, migration.description)
)
return None
await self.db.run_in_transaction(apply_migration)
logger.info(f"Applied migration V{migration.version}")
except Exception as e:
logger.error(f"Failed to apply migration V{migration.version}: {e}")
raise
else:
# Revert migrations down to target version
migrations_to_revert = [
m for m in all_migrations
if m.version > target_version and m.version <= current_version
]
# Sort migrations in reverse order for reverting
migrations_to_revert.sort(key=lambda m: m.version, reverse=True)
logger.info(f"Reverting {len(migrations_to_revert)} migrations")
for migration in migrations_to_revert:
if not migration.down_sql:
logger.error(f"Cannot revert migration V{migration.version}: No down SQL")
raise ValueError(f"Cannot revert migration V{migration.version}: No down SQL")
logger.info(f"Reverting migration V{migration.version}: {migration.description}")
try:
# Run revert in transaction
async def revert_migration(conn):
async with conn.cursor() as cur:
await cur.execute(migration.down_sql)
await cur.execute(
"DELETE FROM _migrations WHERE version = %s",
(migration.version,)
)
return None
await self.db.run_in_transaction(revert_migration)
logger.info(f"Reverted migration V{migration.version}")
except Exception as e:
logger.error(f"Failed to revert migration V{migration.version}: {e}")
raise
logger.info(f"Migration to version {target_version} completed")
|
Usage Examples
Synchronous Migrations
from manticore_cockroachdb import Database, Migration
# Connect to database
db = Database(database="example_db")
# Create migration instance
migration = Migration(db, migrations_dir="./migrations")
# Create a new migration
migration.create_migration(
"create users table",
"""
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
""",
"""
DROP TABLE users;
"""
)
# Load migrations
migrations = migration.load_migrations()
print(f"Loaded {len(migrations)} migrations")
# Apply migrations
applied = migration.migrate()
print(f"Applied {applied} migrations")
# Rollback last migration
rollback_count = migration.rollback(count=1)
print(f"Rolled back {rollback_count} migrations")
Asynchronous Migrations
```python
import asyncio
from manticore_cockroachdb.async_database import AsyncDatabase
from manticore_cockroachdb.async_migration import AsyncMigrator
async def main():
# Connect to database
db = AsyncDatabase(database="example_db")
await db.connect()
try:
# Create migration instance
migration = AsyncMigrator(db, migrations_dir="./async_migrations")
# Initialize the migrator (creates the _migrations table)
await migration.initialize()
# Create a new migration
await migration.create_migration(
"create async users table",
"""
CREATE TABLE async_users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
""",
"""
DROP TABLE async_users;
"""
)
# Load migrations
migrations = await migration.load_migrations()
print(f"Loaded {len(migrations)} migrations")
# Apply migrations
await migration.migrate()
print("Migrations applied successfully")
# Method 1: Revert using migrate with target_version
# This will revert to version 0 (before any migrations)
await migration.migrate(target_version=0)
print("Reverted all migrations using migrate method")
# Method 2: Manual migration reversion
# This approach gives you more control over the reversion process
last_migration = max(migrations, key=lambda m: m.version)
# Execute the down SQL directly
if last_migration.down_sql:
await db.execute(last_migration.down_sql)
await db.execute(
"DELETE FROM _migrations WHERE version = %s",
(last_migration.version,)
)
print(f"Manually reverted migration V{last_migration.version}")
finally:
await db.close()
Run the async function
asyncio.run(main())