Coverage for src/dataknobs_data/pooling/s3.py: 49%

37 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:23 -0700

1"""S3-specific connection pooling implementation.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6 

7from .base import BasePoolConfig 

8 

9 

10@dataclass 

11class S3PoolConfig(BasePoolConfig): 

12 """Configuration for S3 connection pools.""" 

13 bucket: str 

14 prefix: str = "" 

15 region_name: str | None = None 

16 aws_access_key_id: str | None = None 

17 aws_secret_access_key: str | None = None 

18 aws_session_token: str | None = None 

19 endpoint_url: str | None = None 

20 

21 def to_connection_string(self) -> str: 

22 """Convert to connection string (not used for S3, but required by base).""" 

23 return f"s3://{self.bucket}/{self.prefix}" 

24 

25 def to_hash_key(self) -> tuple: 

26 """Create a hashable key for this configuration.""" 

27 return (self.bucket, self.prefix, self.region_name, self.endpoint_url) 

28 

29 @classmethod 

30 def from_dict(cls, config: dict) -> S3PoolConfig: 

31 """Create from configuration dictionary.""" 

32 bucket = config.get("bucket") 

33 if bucket is None: 

34 raise ValueError("S3 bucket configuration is required") 

35 

36 return cls( 

37 bucket=bucket, 

38 prefix=config.get("prefix", ""), 

39 region_name=config.get("region_name"), 

40 aws_access_key_id=config.get("aws_access_key_id"), 

41 aws_secret_access_key=config.get("aws_secret_access_key"), 

42 aws_session_token=config.get("aws_session_token"), 

43 endpoint_url=config.get("endpoint_url") 

44 ) 

45 

46 

47async def create_aioboto3_session(config: S3PoolConfig): 

48 """Create an aioboto3 session for S3 operations.""" 

49 import aioboto3 

50 

51 # Create session with credentials if provided 

52 session_config = {} 

53 

54 if config.aws_access_key_id: 

55 session_config["aws_access_key_id"] = config.aws_access_key_id 

56 if config.aws_secret_access_key: 

57 session_config["aws_secret_access_key"] = config.aws_secret_access_key 

58 if config.aws_session_token: 

59 session_config["aws_session_token"] = config.aws_session_token 

60 if config.region_name: 

61 session_config["region_name"] = config.region_name 

62 

63 # Create and return the session 

64 return aioboto3.Session(**session_config) 

65 

66 

67async def validate_s3_session(session, config: S3PoolConfig) -> None: 

68 """Validate an S3 session by checking bucket access.""" 

69 async with session.client("s3", endpoint_url=config.endpoint_url) as s3: 

70 # Try to head the bucket to verify access 

71 await s3.head_bucket(Bucket=config.bucket)