Coverage for session_buddy / adapters / lifecycle.py: 39.78%
151 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1from __future__ import annotations
3import inspect
4import typing as t
5from contextlib import suppress
7from session_buddy.adapters.settings import (
8 CacheAdapterSettings,
9 KnowledgeGraphAdapterSettings,
10 ReflectionAdapterSettings,
11 StorageAdapterSettings,
12)
13from session_buddy.di import get_sync_typed
14from session_buddy.di.container import depends
17def get_reflection_settings() -> ReflectionAdapterSettings:
18 with suppress(Exception):
19 settings = get_sync_typed(ReflectionAdapterSettings) # type: ignore[no-any-return]
20 if isinstance(settings, ReflectionAdapterSettings):
21 return settings
22 settings = ReflectionAdapterSettings.from_settings()
23 depends.set(ReflectionAdapterSettings, settings)
24 return settings
27def get_knowledge_graph_settings() -> KnowledgeGraphAdapterSettings:
28 with suppress(Exception):
29 settings = get_sync_typed(KnowledgeGraphAdapterSettings) # type: ignore[no-any-return]
30 if isinstance(settings, KnowledgeGraphAdapterSettings):
31 return settings
32 settings = KnowledgeGraphAdapterSettings.from_settings()
33 depends.set(KnowledgeGraphAdapterSettings, settings)
34 return settings
37def get_storage_settings() -> StorageAdapterSettings:
38 with suppress(Exception):
39 settings = get_sync_typed(StorageAdapterSettings) # type: ignore[no-any-return]
40 if isinstance(settings, StorageAdapterSettings):
41 return settings
42 settings = StorageAdapterSettings.from_settings()
43 depends.set(StorageAdapterSettings, settings)
44 return settings
47def get_cache_settings() -> CacheAdapterSettings:
48 with suppress(Exception):
49 settings = get_sync_typed(CacheAdapterSettings) # type: ignore[no-any-return]
50 if isinstance(settings, CacheAdapterSettings):
51 return settings
52 settings = CacheAdapterSettings()
53 depends.set(CacheAdapterSettings, settings)
54 return settings
57async def init_reflection_adapter() -> None:
58 """Initialize reflection adapter using Oneiric implementation."""
59 # Try to use Oneiric implementation first
60 try:
61 from session_buddy.adapters.reflection_adapter_oneiric import (
62 ReflectionDatabaseAdapterOneiric as ReflectionDatabaseAdapter,
63 )
64 except ImportError:
65 # Fallback to ACB implementation for compatibility
66 from session_buddy.adapters.reflection_adapter import ReflectionDatabaseAdapter
68 with suppress(Exception):
69 existing = depends.get_sync(ReflectionDatabaseAdapter)
70 if isinstance(existing, ReflectionDatabaseAdapter):
71 return
73 settings = get_reflection_settings()
74 adapter = ReflectionDatabaseAdapter(settings=settings)
75 await adapter.initialize()
76 depends.set(ReflectionDatabaseAdapter, adapter)
79def health_reflection_adapter() -> bool:
80 """Check health of reflection adapter (supports both Oneiric and ACB implementations)."""
81 # Try Oneiric implementation first
82 with suppress(ImportError):
83 from session_buddy.adapters.reflection_adapter_oneiric import (
84 ReflectionDatabaseAdapterOneiric,
85 )
87 with suppress(Exception):
88 adapter = depends.get_sync(ReflectionDatabaseAdapterOneiric)
89 return isinstance(adapter, ReflectionDatabaseAdapterOneiric)
91 # Fallback to ACB implementation
92 from session_buddy.adapters.reflection_adapter import ReflectionDatabaseAdapter
94 with suppress(Exception):
95 adapter = depends.get_sync(ReflectionDatabaseAdapter)
96 return isinstance(adapter, ReflectionDatabaseAdapter)
97 return False
100async def cleanup_reflection_adapter() -> None:
101 try:
102 from session_buddy.adapters.reflection_adapter_oneiric import (
103 ReflectionDatabaseAdapterOneiric as ReflectionDatabaseAdapter,
104 )
105 except ImportError:
106 from session_buddy.adapters.reflection_adapter import ReflectionDatabaseAdapter
108 with suppress(Exception):
109 adapter = depends.get_sync(ReflectionDatabaseAdapter)
110 if hasattr(adapter, "close"): 110 ↛ exitline 110 didn't jump to the function exit
111 result = adapter.close()
112 if inspect.isawaitable(result): 112 ↛ exitline 112 didn't jump to the function exit
113 await result
116async def init_knowledge_graph_adapter() -> None:
117 """Initialize knowledge graph adapter using Oneiric implementation."""
118 # Try to use Oneiric implementation first
119 try:
120 from session_buddy.adapters.knowledge_graph_adapter_oneiric import (
121 KnowledgeGraphDatabaseAdapterOneiric as KnowledgeGraphDatabaseAdapter,
122 )
123 except ImportError:
124 # Fallback to ACB implementation for compatibility
125 from session_buddy.adapters.knowledge_graph_adapter import (
126 KnowledgeGraphDatabaseAdapter,
127 )
129 with suppress(Exception):
130 existing = depends.get_sync(KnowledgeGraphDatabaseAdapter)
131 if isinstance(existing, KnowledgeGraphDatabaseAdapter):
132 return
134 settings = get_knowledge_graph_settings()
135 adapter = KnowledgeGraphDatabaseAdapter(settings=settings)
136 await adapter.initialize()
137 depends.set(KnowledgeGraphDatabaseAdapter, adapter)
140def health_knowledge_graph_adapter() -> bool:
141 """Check health of knowledge graph adapter (supports both Oneiric and ACB implementations)."""
142 # Try Oneiric implementation first
143 with suppress(ImportError):
144 from session_buddy.adapters.knowledge_graph_adapter_oneiric import (
145 KnowledgeGraphDatabaseAdapterOneiric,
146 )
148 with suppress(Exception):
149 adapter = depends.get_sync(KnowledgeGraphDatabaseAdapterOneiric)
150 return isinstance(adapter, KnowledgeGraphDatabaseAdapterOneiric)
152 # Fallback to ACB implementation
153 from session_buddy.adapters.knowledge_graph_adapter import (
154 KnowledgeGraphDatabaseAdapter,
155 )
157 with suppress(Exception):
158 adapter = depends.get_sync(KnowledgeGraphDatabaseAdapter)
159 return isinstance(adapter, KnowledgeGraphDatabaseAdapter)
160 return False
163def cleanup_knowledge_graph_adapter() -> None:
164 try:
165 from session_buddy.adapters.knowledge_graph_adapter_oneiric import (
166 KnowledgeGraphDatabaseAdapterOneiric as KnowledgeGraphDatabaseAdapter,
167 )
168 except ImportError:
169 from session_buddy.adapters.knowledge_graph_adapter import (
170 KnowledgeGraphDatabaseAdapter,
171 )
173 with suppress(Exception):
174 adapter = depends.get_sync(KnowledgeGraphDatabaseAdapter)
175 if hasattr(adapter, "close"): 175 ↛ exitline 175 didn't jump to the function exit
176 adapter.close()
179def init_storage_adapters() -> None:
180 """Initialize storage adapters using Oneiric implementation."""
181 # Try to use Oneiric implementation first
182 with suppress(ImportError):
183 from session_buddy.adapters.storage_oneiric import (
184 configure_storage_buckets,
185 init_storage_registry,
186 )
188 # Synchronous initialization
189 init_storage_registry()
190 settings = get_storage_settings()
191 if settings.buckets:
192 configure_storage_buckets(settings.buckets)
193 return
195 # Fallback to ACB implementation
196 from session_buddy.adapters.storage_registry import (
197 configure_storage_buckets,
198 register_storage_adapter,
199 )
201 settings = get_storage_settings()
202 if settings.buckets:
203 configure_storage_buckets(settings.buckets)
204 register_storage_adapter(settings.default_backend)
207def health_storage_adapters() -> bool:
208 """Check health of storage adapters (supports both Oneiric and ACB implementations)."""
209 # Try Oneiric implementation first
210 with suppress(ImportError):
211 from session_buddy.adapters.storage_oneiric import get_storage_adapter
213 settings = get_storage_settings()
214 with suppress(Exception):
215 adapter = get_storage_adapter(settings.default_backend)
216 return adapter is not None
218 # Fallback to ACB implementation
219 from session_buddy.adapters.storage_registry import get_storage_adapter
221 settings = get_storage_settings()
222 with suppress(Exception):
223 adapter = get_storage_adapter(settings.default_backend)
224 return adapter is not None
225 return False
228async def _cleanup_adapter(adapter: t.Any) -> bool:
229 """Helper to cleanup a single adapter."""
230 for attr in ("aclose", "close", "cleanup"): 230 ↛ 237line 230 didn't jump to line 237 because the loop on line 230 didn't complete
231 handler = getattr(adapter, attr, None)
232 if callable(handler): 232 ↛ 230line 232 didn't jump to line 230 because the condition on line 232 was always true
233 result = handler()
234 if inspect.isawaitable(result): 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was always true
235 await result
236 return True
237 return False
240async def cleanup_storage_adapters() -> None:
241 # Try Oneiric implementation first
242 with suppress(ImportError):
243 from session_buddy.adapters.storage_oneiric import get_storage_adapter
245 settings = get_storage_settings()
246 with suppress(Exception):
247 adapter = get_storage_adapter(settings.default_backend)
248 if await _cleanup_adapter(adapter): 248 ↛ 252line 248 didn't jump to line 252
249 return
251 # Fallback to ACB implementation
252 from session_buddy.adapters.storage_registry import get_storage_adapter
254 settings = get_storage_settings()
255 with suppress(Exception):
256 adapter = get_storage_adapter(settings.default_backend)
257 await _cleanup_adapter(adapter)
260def init_cache_adapters() -> None:
261 # Oneiric-only: Cache functionality handled by Oneiric adapters
262 pass
265def health_cache_adapters() -> bool:
266 # Oneiric-only: Cache health now handled by Oneiric adapters
267 return True # Oneiric adapters are always healthy
270async def cleanup_cache_adapters() -> None:
271 # Oneiric-only: Cache cleanup handled by Oneiric adapters
272 # No ACB cache cleanup needed
273 pass