Coverage for src/dataknobs_data/pooling/s3.py: 49%
37 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
1"""S3-specific connection pooling implementation."""
3from __future__ import annotations
5from dataclasses import dataclass
7from .base import BasePoolConfig
10@dataclass
11class S3PoolConfig(BasePoolConfig):
12 """Configuration for S3 connection pools."""
13 bucket: str
14 prefix: str = ""
15 region_name: str | None = None
16 aws_access_key_id: str | None = None
17 aws_secret_access_key: str | None = None
18 aws_session_token: str | None = None
19 endpoint_url: str | None = None
21 def to_connection_string(self) -> str:
22 """Convert to connection string (not used for S3, but required by base)."""
23 return f"s3://{self.bucket}/{self.prefix}"
25 def to_hash_key(self) -> tuple:
26 """Create a hashable key for this configuration."""
27 return (self.bucket, self.prefix, self.region_name, self.endpoint_url)
29 @classmethod
30 def from_dict(cls, config: dict) -> S3PoolConfig:
31 """Create from configuration dictionary."""
32 bucket = config.get("bucket")
33 if bucket is None:
34 raise ValueError("S3 bucket configuration is required")
36 return cls(
37 bucket=bucket,
38 prefix=config.get("prefix", ""),
39 region_name=config.get("region_name"),
40 aws_access_key_id=config.get("aws_access_key_id"),
41 aws_secret_access_key=config.get("aws_secret_access_key"),
42 aws_session_token=config.get("aws_session_token"),
43 endpoint_url=config.get("endpoint_url")
44 )
47async def create_aioboto3_session(config: S3PoolConfig):
48 """Create an aioboto3 session for S3 operations."""
49 import aioboto3
51 # Create session with credentials if provided
52 session_config = {}
54 if config.aws_access_key_id:
55 session_config["aws_access_key_id"] = config.aws_access_key_id
56 if config.aws_secret_access_key:
57 session_config["aws_secret_access_key"] = config.aws_secret_access_key
58 if config.aws_session_token:
59 session_config["aws_session_token"] = config.aws_session_token
60 if config.region_name:
61 session_config["region_name"] = config.region_name
63 # Create and return the session
64 return aioboto3.Session(**session_config)
67async def validate_s3_session(session, config: S3PoolConfig) -> None:
68 """Validate an S3 session by checking bucket access."""
69 async with session.client("s3", endpoint_url=config.endpoint_url) as s3:
70 # Try to head the bucket to verify access
71 await s3.head_bucket(Bucket=config.bucket)