Coverage for src/dataknobs_data/pooling/postgres.py: 75%
28 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1"""PostgreSQL-specific connection pooling implementation."""
3from __future__ import annotations
5from dataclasses import dataclass
6from typing import Any
8from .base import BasePoolConfig
11@dataclass
12class PostgresPoolConfig(BasePoolConfig):
13 """Configuration for PostgreSQL connection pools."""
14 host: str = "localhost"
15 port: int = 5432
16 database: str = "postgres"
17 user: str = "postgres"
18 password: str = ""
19 min_size: int = 2
20 max_size: int = 5
21 command_timeout: float | None = None
22 ssl: Any | None = None
24 def to_connection_string(self) -> str:
25 """Convert to PostgreSQL connection string."""
26 return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
28 def to_hash_key(self) -> tuple:
29 """Create a hashable key for this configuration."""
30 return (self.host, self.port, self.database, self.user)
32 @classmethod
33 def from_dict(cls, config: dict) -> PostgresPoolConfig:
34 """Create from configuration dictionary."""
35 return cls(
36 host=config.get("host", "localhost"),
37 port=config.get("port", 5432),
38 database=config.get("database", "postgres"),
39 user=config.get("user", "postgres"),
40 password=config.get("password", ""),
41 min_size=config.get("min_pool_size", 2),
42 max_size=config.get("max_pool_size", 5),
43 command_timeout=config.get("command_timeout"),
44 ssl=config.get("ssl")
45 )
48async def create_asyncpg_pool(config: PostgresPoolConfig):
49 """Create an asyncpg connection pool."""
50 import asyncpg
51 return await asyncpg.create_pool(
52 config.to_connection_string(),
53 min_size=config.min_size,
54 max_size=config.max_size,
55 command_timeout=config.command_timeout,
56 ssl=config.ssl
57 )
60async def validate_asyncpg_pool(pool) -> None:
61 """Validate an asyncpg pool by running a simple query."""
62 async with pool.acquire() as conn:
63 await conn.fetchval("SELECT 1")