Coverage for src/dataknobs_data/pooling/postgres.py: 75%

28 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-31 15:06 -0600

1"""PostgreSQL-specific connection pooling implementation.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from typing import Any 

7 

8from .base import BasePoolConfig 

9 

10 

11@dataclass 

12class PostgresPoolConfig(BasePoolConfig): 

13 """Configuration for PostgreSQL connection pools.""" 

14 host: str = "localhost" 

15 port: int = 5432 

16 database: str = "postgres" 

17 user: str = "postgres" 

18 password: str = "" 

19 min_size: int = 2 

20 max_size: int = 5 

21 command_timeout: float | None = None 

22 ssl: Any | None = None 

23 

24 def to_connection_string(self) -> str: 

25 """Convert to PostgreSQL connection string.""" 

26 return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" 

27 

28 def to_hash_key(self) -> tuple: 

29 """Create a hashable key for this configuration.""" 

30 return (self.host, self.port, self.database, self.user) 

31 

32 @classmethod 

33 def from_dict(cls, config: dict) -> PostgresPoolConfig: 

34 """Create from configuration dictionary.""" 

35 return cls( 

36 host=config.get("host", "localhost"), 

37 port=config.get("port", 5432), 

38 database=config.get("database", "postgres"), 

39 user=config.get("user", "postgres"), 

40 password=config.get("password", ""), 

41 min_size=config.get("min_pool_size", 2), 

42 max_size=config.get("max_pool_size", 5), 

43 command_timeout=config.get("command_timeout"), 

44 ssl=config.get("ssl") 

45 ) 

46 

47 

48async def create_asyncpg_pool(config: PostgresPoolConfig): 

49 """Create an asyncpg connection pool.""" 

50 import asyncpg 

51 return await asyncpg.create_pool( 

52 config.to_connection_string(), 

53 min_size=config.min_size, 

54 max_size=config.max_size, 

55 command_timeout=config.command_timeout, 

56 ssl=config.ssl 

57 ) 

58 

59 

60async def validate_asyncpg_pool(pool) -> None: 

61 """Validate an asyncpg pool by running a simple query.""" 

62 async with pool.acquire() as conn: 

63 await conn.fetchval("SELECT 1")