Exceptions
mydborm provides 24 typed exceptions. Catch exactly what you need — no more parsing error strings or catching generic Exception.
Full exception hierarchy
Import exceptions
from mydborm import (
MydbormError,
ConnectionError,
ConnectionTimeoutError,
NotConfiguredError,
QueryError,
RecordNotFoundError,
MultipleRecordsError,
ValidationError,
FieldRequiredError,
FieldTypeError,
FieldLengthError,
BulkOperationError,
BulkInsertError,
BulkUpdateError,
BulkUpsertError,
TransactionError,
SavepointError,
DeadlockError,
RetryExhaustedError,
MigrationError,
MigrationAlreadyAppliedError,
MigrationNotFoundError,
SchemaError,
UnsupportedDialectError,
)
Connection exceptions
NotConfiguredError
Raised when you call any DB operation before db.configure().
from mydborm import db, BaseModel, IntField, NotConfiguredError
class User(BaseModel):
__tablename__ = "users"
id = IntField(primary_key=True)
# WRONG — configure not called
try:
users = User.all()
except NotConfiguredError as e:
print(f"Error: {e}")
# Fix it:
db.configure(
dialect = "mysql",
host = "127.0.0.1",
port = 3306,
user = "root",
password = "yourpassword",
database = "mydb",
)
users = User.all() # now works
ConnectionError
Raised when the database server cannot be reached.
from mydborm import db, ConnectionError
db.configure(
dialect = "mysql",
host = "192.168.1.999", # wrong host
port = 3306,
user = "root",
password = "root",
database = "mydb",
)
try:
with db.connect() as conn:
pass
except ConnectionError as e:
print(f"Cannot connect to database")
print(f" Dialect: {e.dialect}") # mysql
print(f" Host: {e.host}") # 192.168.1.999
print(f" Port: {e.port}") # 3306
print(f" Error: {e.message}")
# Retry with correct host
db.configure(dialect="mysql", host="127.0.0.1", port=3306,
user="root", password="root", database="mydb")
ConnectionTimeoutError
Raised when the connection attempt times out.
from mydborm import ConnectionTimeoutError
try:
with db.connect() as conn:
pass
except ConnectionTimeoutError as e:
print(f"Connection timed out after {e.timeout}s")
print("Check if the database server is running")
print(f"Host: {e.host}:{e.port}")
Validation exceptions
FieldRequiredError
Raised when a nullable=False field receives None.
from mydborm import BaseModel, IntField, StrField, FieldRequiredError
class Product(BaseModel):
__tablename__ = "products"
id = IntField(primary_key=True)
name = StrField(max_length=100, nullable=False) # required
sku = StrField(max_length=20, nullable=False) # required
price = FloatField(nullable=False) # required
try:
Product.create(name="Laptop", sku=None, price=999.99)
except FieldRequiredError as e:
print(f"Missing required field: '{e.field}'")
# Missing required field: 'sku'
try:
Product.create(name=None, sku="LAP-001", price=999.99)
except FieldRequiredError as e:
print(f"Field '{e.field}' is required — got None")
# Field 'name' is required — got None
FieldTypeError
Raised when a field receives the wrong Python type.
from mydborm import BaseModel, IntField, BoolField, FieldTypeError
class Order(BaseModel):
__tablename__ = "orders"
id = IntField(primary_key=True)
shipped = BoolField(nullable=False)
# BoolField requires True or False — not strings or integers
try:
Order.create(shipped="yes")
except FieldTypeError as e:
print(f"Wrong type for '{e.field}'")
print(f" Got: {type(e.value).__name__} = {e.value!r}")
print(f" Expected: bool")
# Wrong type for 'shipped'
# Got: str = 'yes'
# Expected: bool
try:
Order.create(shipped=1) # use True not 1
except FieldTypeError as e:
print(f"Use True/False not 1/0 for BoolField")
FieldLengthError
Raised when a string exceeds max_length.
from mydborm import BaseModel, IntField, StrField, FieldLengthError
class Tag(BaseModel):
__tablename__ = "tags"
id = IntField(primary_key=True)
name = StrField(max_length=20, nullable=False)
try:
Tag.create(name="this-tag-name-is-way-too-long-for-the-field")
except FieldLengthError as e:
print(f"Field '{e.field}' too long:")
print(f" Max: 20 characters")
print(f" Got: {len(str(e.value))} characters")
print(f" Value: {e.value!r}")
ValidationError (base)
Catch all validation errors at once:
from mydborm import ValidationError
try:
Product.create(
name = None, # FieldRequiredError
sku = "A" * 100, # FieldLengthError
price = -5.0, # custom RangeValidator
)
except ValidationError as e:
print(f"Validation failed on field '{e.field}': {e.message}")
Bulk operation exceptions
BulkInsertError
Raised by chunked_bulk_create(..., raise_on_error=True) when a chunk fails. Contains partial success info.
from mydborm import BaseModel, IntField, StrField, BulkInsertError
from mydborm.bulk import chunked_bulk_create
class Product(BaseModel):
__tablename__ = "products"
id = IntField(primary_key=True)
sku = StrField(max_length=20, nullable=False)
name = StrField(max_length=100, nullable=False)
# 10,000 products in chunks of 100
records = [{"sku": f"SKU{i:05d}", "name": f"Product {i}"} for i in range(10000)]
try:
result = chunked_bulk_create(
Product,
records,
chunk_size = 100,
retries = 2,
raise_on_error = True, # raise on first chunk failure
)
except BulkInsertError as e:
print(f"Bulk insert partially failed:")
print(f" Inserted: {e.inserted:,} rows")
print(f" Failed: {e.failed:,} rows")
print(f" Errors: {len(e.errors)} chunks")
for err in e.errors:
print(f" Chunk {err['chunk']}: {err['records']} rows — {err['error']}")
Without raise_on_error — get a result object:
# Continues even if some chunks fail
result = chunked_bulk_create(Product, records, chunk_size=100)
print(result.summary())
# Operation : insert
# Total : 10000
# Inserted : 9850
# Failed : 150
# Chunks : 100
# Retries : 3
# Success : 98.5%
# Duration : 2.4s
if result.has_errors:
for err in result.errors:
print(f"Chunk {err['chunk']} failed: {err['error']}")
BulkUpdateError
from mydborm import BulkUpdateError
from mydborm.bulk import chunked_bulk_update
updates = [{"id": i, "price": float(i)} for i in range(1000)]
try:
result = chunked_bulk_update(
Product, updates, key="id",
chunk_size=100, raise_on_error=True
)
except BulkUpdateError as e:
print(f"Updated: {e.inserted}, Failed: {e.failed}")
BulkUpsertError
from mydborm import BulkUpsertError
try:
Product.bulk_upsert(
records,
conflict_key = "sku",
update_fields = ["name", "price"],
)
except BulkUpsertError as e:
print(f"Upsert failed: inserted={e.inserted}, failed={e.failed}")
for err in e.errors:
print(f" Error: {err['error']}")
Transaction exceptions
DeadlockError
Raised when the database detects a deadlock between concurrent transactions.
from mydborm import db, DeadlockError, RetryExhaustedError
# Use transaction_with_retry — auto-retries on deadlock
try:
with db.transaction_with_retry(retries=3, retry_delay=0.5):
# Transfer money between accounts
db.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
[100, 1]
)
db.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
[100, 2]
)
# If another transaction causes a deadlock:
# → auto-retries with 0.5s, 1s, 2s delays
# → raises RetryExhaustedError after 3 attempts
except RetryExhaustedError as e:
print(f"Transfer failed after {e.attempts} attempts")
print(f"Last error: {e.last_error}")
# Alert: manual intervention needed
except DeadlockError as e:
# Only raised if NOT using transaction_with_retry
print("Deadlock detected — retry the transaction")
RetryExhaustedError
from mydborm import RetryExhaustedError
try:
with db.transaction_with_retry(retries=5):
db.execute("UPDATE stock SET qty = qty - 1 WHERE product_id = %s", [42])
except RetryExhaustedError as e:
print(f"Gave up after {e.attempts} attempts")
print(f"Last error type: {type(e.last_error).__name__}")
print(f"Last error: {e.last_error}")
SavepointError
from mydborm import db, SavepointError
try:
with db.transaction():
db.execute("INSERT INTO orders (user_id, total) VALUES (%s, %s)", [1, 99.99])
with db.savepoint("after_order") as sp:
print(f"Savepoint created: {sp}")
db.execute("INSERT INTO order_items ...")
except SavepointError as e:
print(f"Savepoint '{e.savepoint}' failed: {e.message}")
Schema exceptions
SchemaError
Raised by validate_schema(strict=True) when model definition doesn't match the live database.
from mydborm import BaseModel, IntField, StrField, SchemaError
class User(BaseModel):
__tablename__ = "users"
id = IntField(primary_key=True)
username = StrField(max_length=100, nullable=False)
email = StrField(max_length=255, nullable=False)
phone = StrField(max_length=20, nullable=True) # not yet in DB
# 'old_field' is in DB but not in model
# Non-strict — returns dict, never raises
result = User.validate_schema()
print(result)
# {
# 'table': 'users',
# 'valid': False,
# 'missing_in_db': ['phone'], # in model, not in DB
# 'extra_in_db': ['old_field'], # in DB, not in model
# 'matched': ['id', 'username', 'email']
# }
if not result["valid"]:
if result["missing_in_db"]:
print("Run migrations to add:", result["missing_in_db"])
if result["extra_in_db"]:
print("Consider removing from model:", result["extra_in_db"])
# Strict — raises on mismatch
try:
User.validate_schema(strict=True)
print("Schema is valid!")
except SchemaError as e:
print(f"Schema mismatch in table '{e.table}'")
print(f" Missing in DB : {e.missing_columns}") # ['phone']
print(f" Extra in DB : {e.extra_columns}") # ['old_field']
print(f" Full message : {str(e)}")
# Schema mismatch for table 'users' | missing in DB: ['phone'] | extra in DB: ['old_field']
Migration exceptions
MigrationError
from mydborm import MigrationError
from mydborm.migrations import migrate
try:
result = migrate(User, description="add phone column")
except MigrationError as e:
print(f"Migration failed!")
print(f" Version: {e.version}")
print(f" SQL: {e.sql}")
print(f" Error: {e.message}")
MigrationAlreadyAppliedError
from mydborm import MigrationAlreadyAppliedError
from mydborm.migrations import migrate
try:
migrate(User, description="create users")
migrate(User, description="create users") # second call
except MigrationAlreadyAppliedError as e:
print(f"Migration {e.version} was already applied — skipping")
print("This is usually safe to ignore")
UnsupportedDialectError
from mydborm import db, UnsupportedDialectError
try:
db.configure(dialect="oracle", host="localhost", user="sa", password="pw", database="db")
except UnsupportedDialectError as e:
print(f"Dialect '{e.dialect}' is not supported")
print(f"Supported dialects: {e.supported}")
# Supported dialects: ['mysql', 'yugabyte', 'postgres']
Exception attributes reference
| Exception | Attributes |
|---|---|
ConnectionError |
dialect, host, port, message |
ConnectionTimeoutError |
timeout, dialect, host, port |
ValidationError |
field, value, reason, message |
FieldRequiredError |
field |
FieldTypeError |
field, value |
FieldLengthError |
field, value |
QueryError |
sql, params, message |
RecordNotFoundError |
model, filters |
MultipleRecordsError |
model, count |
BulkOperationError |
inserted, failed, errors |
SavepointError |
savepoint, message |
DeadlockError |
message |
RetryExhaustedError |
attempts, last_error |
MigrationError |
version, sql, message |
SchemaError |
table, missing_columns, extra_columns |
UnsupportedDialectError |
dialect, supported |
Best practices
Catch the most specific exception
# Good — catches exactly what you handle
try:
uid = User.create(username="alice", email="bad-email")
except FieldRequiredError as e:
return {"error": f"Field '{e.field}' is required"}
except ValidationError as e:
return {"error": f"Invalid value for '{e.field}': {e.message}"}
except ConnectionError as e:
return {"error": "Database unavailable — please try again"}
except MydbormError as e:
return {"error": f"Database error: {e}"}
# Bad — swallows everything
try:
uid = User.create(username="alice", email="bad-email")
except Exception:
return {"error": "Something went wrong"}
Log errors with context
import logging
from mydborm import MydbormError, BulkInsertError
logger = logging.getLogger(__name__)
def sync_products(records):
try:
from mydborm.bulk import chunked_bulk_create
result = chunked_bulk_create(Product, records, chunk_size=500)
logger.info(f"Synced {result.inserted} products in {result.duration}s")
return result
except BulkInsertError as e:
logger.error(
f"Bulk insert failed: inserted={e.inserted}, failed={e.failed}",
extra={"errors": e.errors}
)
raise
except MydbormError as e:
logger.error(f"DB error during product sync: {e}", exc_info=True)
raise
Retry pattern without transaction_with_retry
import time
from mydborm import DeadlockError
def with_retry(fn, retries=3, delay=0.5):
for attempt in range(retries + 1):
try:
return fn()
except DeadlockError:
if attempt < retries:
time.sleep(delay * (2 ** attempt))
else:
raise
result = with_retry(lambda: transfer_funds(from_id=1, to_id=2, amount=100))