Coverage for src / dataknobs_data / pooling / postgres.py: 50%
42 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""PostgreSQL-specific connection pooling implementation."""
3from __future__ import annotations
5from dataclasses import dataclass
6from typing import Any
8from .base import BasePoolConfig
11@dataclass
12class PostgresPoolConfig(BasePoolConfig):
13 """Configuration for PostgreSQL connection pools."""
14 host: str = "localhost"
15 port: int = 5432
16 database: str = "postgres"
17 user: str = "postgres"
18 password: str = ""
19 min_size: int = 2
20 max_size: int = 5
21 command_timeout: float | None = None
22 ssl: Any | None = None
24 def to_connection_string(self) -> str:
25 """Convert to PostgreSQL connection string."""
26 return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
28 def to_hash_key(self) -> tuple:
29 """Create a hashable key for this configuration."""
30 return (self.host, self.port, self.database, self.user)
32 @classmethod
33 def from_dict(cls, config: dict) -> PostgresPoolConfig:
34 """Create from configuration dictionary.
36 Supports either a connection_string parameter or individual parameters.
38 Args:
39 config: Configuration dict with either:
40 - connection_string: PostgreSQL connection string (postgresql://user:pass@host:port/db)
41 - OR individual parameters: host, port, database, user, password
43 Returns:
44 PostgresPoolConfig instance
45 """
46 # Check if connection_string is provided
47 connection_string = config.get("connection_string")
49 if connection_string:
50 from urllib.parse import urlparse
51 parsed = urlparse(connection_string)
53 # Extract connection parameters from connection string
54 host = parsed.hostname or "localhost"
55 port = parsed.port or 5432
56 database = parsed.path[1:] if parsed.path and len(parsed.path) > 1 else "postgres"
57 user = parsed.username or "postgres"
58 password = parsed.password or ""
59 else:
60 # Use individual parameters
61 host = config.get("host", "localhost")
62 port = config.get("port", 5432)
63 database = config.get("database", "postgres")
64 user = config.get("user", "postgres")
65 password = config.get("password", "")
67 return cls(
68 host=host,
69 port=port,
70 database=database,
71 user=user,
72 password=password,
73 min_size=config.get("min_pool_size", 2),
74 max_size=config.get("max_pool_size", 5),
75 command_timeout=config.get("command_timeout"),
76 ssl=config.get("ssl")
77 )
80async def create_asyncpg_pool(config: PostgresPoolConfig):
81 """Create an asyncpg connection pool."""
82 import asyncpg
83 return await asyncpg.create_pool(
84 config.to_connection_string(),
85 min_size=config.min_size,
86 max_size=config.max_size,
87 command_timeout=config.command_timeout,
88 ssl=config.ssl
89 )
92async def validate_asyncpg_pool(pool) -> None:
93 """Validate an asyncpg pool by running a simple query."""
94 async with pool.acquire() as conn:
95 await conn.fetchval("SELECT 1")