Coverage for src/dataknobs_data/pooling/elasticsearch.py: 35%

65 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:14 -0600

1"""Elasticsearch-specific connection pooling implementation.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from typing import Any 

7 

8from .base import BasePoolConfig 

9 

10 

11@dataclass 

12class ElasticsearchPoolConfig(BasePoolConfig): 

13 """Configuration for Elasticsearch connection pools.""" 

14 hosts: list[str] | None = None 

15 index: str = "records" 

16 api_key: str | None = None 

17 basic_auth: tuple | None = None 

18 verify_certs: bool = True 

19 ca_certs: str | None = None 

20 client_cert: str | None = None 

21 client_key: str | None = None 

22 ssl_show_warn: bool = True 

23 

24 def __post_init__(self): 

25 """Set default hosts if not provided.""" 

26 if self.hosts is None: 

27 self.hosts = ["http://localhost:9200"] 

28 

29 def to_connection_string(self) -> str: 

30 """Convert to connection string (not used for ES, but required by base).""" 

31 if self.hosts is None: 

32 raise ValueError("Elasticsearch hosts configuration is missing") 

33 return ";".join(self.hosts) 

34 

35 def to_hash_key(self) -> tuple: 

36 """Create a hashable key for this configuration.""" 

37 if self.hosts is None: 

38 raise ValueError("Elasticsearch hosts configuration is missing") 

39 return (tuple(self.hosts), self.index) 

40 

41 @classmethod 

42 def from_dict(cls, config: dict) -> ElasticsearchPoolConfig: 

43 """Create from configuration dictionary.""" 

44 # Handle both old-style (host, port) and new-style (hosts) configuration 

45 if "hosts" in config: 

46 hosts = config["hosts"] 

47 elif "host" in config: 

48 host = config["host"] 

49 port = config.get("port", 9200) 

50 # Check if it already has a scheme 

51 if host.startswith("http://") or host.startswith("https://"): 

52 hosts = [f"{host}:{port}" if ":" not in host.split("://")[1] else host] 

53 else: 

54 hosts = [f"http://{host}:{port}"] 

55 else: 

56 hosts = ["http://localhost:9200"] 

57 

58 return cls( 

59 hosts=hosts, 

60 index=config.get("index", "records"), 

61 api_key=config.get("api_key"), 

62 basic_auth=config.get("basic_auth"), 

63 verify_certs=config.get("verify_certs", True), 

64 ca_certs=config.get("ca_certs"), 

65 client_cert=config.get("client_cert"), 

66 client_key=config.get("client_key"), 

67 ssl_show_warn=config.get("ssl_show_warn", True) 

68 ) 

69 

70 

71async def create_async_elasticsearch_client(config: ElasticsearchPoolConfig): 

72 """Create an async Elasticsearch client.""" 

73 from elasticsearch import AsyncElasticsearch 

74 

75 # Ensure hosts is not None (should be set by __post_init__) 

76 if config.hosts is None: 

77 raise ValueError("Elasticsearch hosts configuration is missing") 

78 

79 # Build client configuration 

80 client_config: dict[str, Any] = { 

81 "hosts": config.hosts, 

82 } 

83 

84 # Add authentication if provided 

85 if config.api_key: 

86 client_config["api_key"] = config.api_key 

87 elif config.basic_auth: 

88 client_config["basic_auth"] = config.basic_auth 

89 

90 # Add SSL configuration 

91 if config.ca_certs: 

92 client_config["ca_certs"] = config.ca_certs 

93 if config.client_cert: 

94 client_config["client_cert"] = config.client_cert 

95 if config.client_key: 

96 client_config["client_key"] = config.client_key 

97 

98 client_config["verify_certs"] = config.verify_certs 

99 client_config["ssl_show_warn"] = config.ssl_show_warn 

100 

101 # Create and return the client 

102 return AsyncElasticsearch(**client_config) 

103 

104 

105async def validate_elasticsearch_client(client) -> None: 

106 """Validate an Elasticsearch client by pinging it.""" 

107 if not await client.ping(): 

108 raise ConnectionError("Failed to ping Elasticsearch") 

109 

110 

111async def close_elasticsearch_client(client) -> None: 

112 """Properly close an Elasticsearch client and its underlying connections.""" 

113 if client: 

114 try: 

115 await client.close() 

116 except Exception: 

117 pass # Ignore errors during cleanup