Coverage for src/dataknobs_data/pooling/elasticsearch.py: 35%
65 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1"""Elasticsearch-specific connection pooling implementation."""
3from __future__ import annotations
5from dataclasses import dataclass
6from typing import Any
8from .base import BasePoolConfig
11@dataclass
12class ElasticsearchPoolConfig(BasePoolConfig):
13 """Configuration for Elasticsearch connection pools."""
14 hosts: list[str] | None = None
15 index: str = "records"
16 api_key: str | None = None
17 basic_auth: tuple | None = None
18 verify_certs: bool = True
19 ca_certs: str | None = None
20 client_cert: str | None = None
21 client_key: str | None = None
22 ssl_show_warn: bool = True
24 def __post_init__(self):
25 """Set default hosts if not provided."""
26 if self.hosts is None:
27 self.hosts = ["http://localhost:9200"]
29 def to_connection_string(self) -> str:
30 """Convert to connection string (not used for ES, but required by base)."""
31 if self.hosts is None:
32 raise ValueError("Elasticsearch hosts configuration is missing")
33 return ";".join(self.hosts)
35 def to_hash_key(self) -> tuple:
36 """Create a hashable key for this configuration."""
37 if self.hosts is None:
38 raise ValueError("Elasticsearch hosts configuration is missing")
39 return (tuple(self.hosts), self.index)
41 @classmethod
42 def from_dict(cls, config: dict) -> ElasticsearchPoolConfig:
43 """Create from configuration dictionary."""
44 # Handle both old-style (host, port) and new-style (hosts) configuration
45 if "hosts" in config:
46 hosts = config["hosts"]
47 elif "host" in config:
48 host = config["host"]
49 port = config.get("port", 9200)
50 # Check if it already has a scheme
51 if host.startswith("http://") or host.startswith("https://"):
52 hosts = [f"{host}:{port}" if ":" not in host.split("://")[1] else host]
53 else:
54 hosts = [f"http://{host}:{port}"]
55 else:
56 hosts = ["http://localhost:9200"]
58 return cls(
59 hosts=hosts,
60 index=config.get("index", "records"),
61 api_key=config.get("api_key"),
62 basic_auth=config.get("basic_auth"),
63 verify_certs=config.get("verify_certs", True),
64 ca_certs=config.get("ca_certs"),
65 client_cert=config.get("client_cert"),
66 client_key=config.get("client_key"),
67 ssl_show_warn=config.get("ssl_show_warn", True)
68 )
71async def create_async_elasticsearch_client(config: ElasticsearchPoolConfig):
72 """Create an async Elasticsearch client."""
73 from elasticsearch import AsyncElasticsearch
75 # Ensure hosts is not None (should be set by __post_init__)
76 if config.hosts is None:
77 raise ValueError("Elasticsearch hosts configuration is missing")
79 # Build client configuration
80 client_config: dict[str, Any] = {
81 "hosts": config.hosts,
82 }
84 # Add authentication if provided
85 if config.api_key:
86 client_config["api_key"] = config.api_key
87 elif config.basic_auth:
88 client_config["basic_auth"] = config.basic_auth
90 # Add SSL configuration
91 if config.ca_certs:
92 client_config["ca_certs"] = config.ca_certs
93 if config.client_cert:
94 client_config["client_cert"] = config.client_cert
95 if config.client_key:
96 client_config["client_key"] = config.client_key
98 client_config["verify_certs"] = config.verify_certs
99 client_config["ssl_show_warn"] = config.ssl_show_warn
101 # Create and return the client
102 return AsyncElasticsearch(**client_config)
105async def validate_elasticsearch_client(client) -> None:
106 """Validate an Elasticsearch client by pinging it."""
107 if not await client.ping():
108 raise ConnectionError("Failed to ping Elasticsearch")
111async def close_elasticsearch_client(client) -> None:
112 """Properly close an Elasticsearch client and its underlying connections."""
113 if client:
114 try:
115 await client.close()
116 except Exception:
117 pass # Ignore errors during cleanup