Coverage for src / dataknobs_data / pooling / postgres.py: 50%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 15:45 -0700

1"""PostgreSQL-specific connection pooling implementation.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from typing import Any 

7 

8from .base import BasePoolConfig 

9 

10 

11@dataclass 

12class PostgresPoolConfig(BasePoolConfig): 

13 """Configuration for PostgreSQL connection pools.""" 

14 host: str = "localhost" 

15 port: int = 5432 

16 database: str = "postgres" 

17 user: str = "postgres" 

18 password: str = "" 

19 min_size: int = 2 

20 max_size: int = 5 

21 command_timeout: float | None = None 

22 ssl: Any | None = None 

23 

24 def to_connection_string(self) -> str: 

25 """Convert to PostgreSQL connection string.""" 

26 return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" 

27 

28 def to_hash_key(self) -> tuple: 

29 """Create a hashable key for this configuration.""" 

30 return (self.host, self.port, self.database, self.user) 

31 

32 @classmethod 

33 def from_dict(cls, config: dict) -> PostgresPoolConfig: 

34 """Create from configuration dictionary. 

35 

36 Supports either a connection_string parameter or individual parameters. 

37 

38 Args: 

39 config: Configuration dict with either: 

40 - connection_string: PostgreSQL connection string (postgresql://user:pass@host:port/db) 

41 - OR individual parameters: host, port, database, user, password 

42 

43 Returns: 

44 PostgresPoolConfig instance 

45 """ 

46 # Check if connection_string is provided 

47 connection_string = config.get("connection_string") 

48 

49 if connection_string: 

50 from urllib.parse import urlparse 

51 parsed = urlparse(connection_string) 

52 

53 # Extract connection parameters from connection string 

54 host = parsed.hostname or "localhost" 

55 port = parsed.port or 5432 

56 database = parsed.path[1:] if parsed.path and len(parsed.path) > 1 else "postgres" 

57 user = parsed.username or "postgres" 

58 password = parsed.password or "" 

59 else: 

60 # Use individual parameters 

61 host = config.get("host", "localhost") 

62 port = config.get("port", 5432) 

63 database = config.get("database", "postgres") 

64 user = config.get("user", "postgres") 

65 password = config.get("password", "") 

66 

67 return cls( 

68 host=host, 

69 port=port, 

70 database=database, 

71 user=user, 

72 password=password, 

73 min_size=config.get("min_pool_size", 2), 

74 max_size=config.get("max_pool_size", 5), 

75 command_timeout=config.get("command_timeout"), 

76 ssl=config.get("ssl") 

77 ) 

78 

79 

80async def create_asyncpg_pool(config: PostgresPoolConfig): 

81 """Create an asyncpg connection pool.""" 

82 import asyncpg 

83 return await asyncpg.create_pool( 

84 config.to_connection_string(), 

85 min_size=config.min_size, 

86 max_size=config.max_size, 

87 command_timeout=config.command_timeout, 

88 ssl=config.ssl 

89 ) 

90 

91 

92async def validate_asyncpg_pool(pool) -> None: 

93 """Validate an asyncpg pool by running a simple query.""" 

94 async with pool.acquire() as conn: 

95 await conn.fetchval("SELECT 1")