Coverage for src/dataknobs_data/factory.py: 78%

41 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:36 -0700

1"""Backend factory for dynamic database creation.""" 

2 

3import logging 

4from typing import Any 

5 

6from dataknobs_config import FactoryBase 

7 

8from dataknobs_data.backends import async_backends, sync_backends 

9from dataknobs_data.database import SyncDatabase 

10 

11# Import the VectorStoreFactory from vector.stores.factory 

12from dataknobs_data.vector.stores.factory import VectorStoreFactory 

13 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class DatabaseFactory(FactoryBase): 

19 """Factory for creating database backends dynamically. 

20  

21 This factory allows creating different database implementations 

22 based on configuration, supporting all available backends. 

23  

24 Configuration Options: 

25 backend (str): Backend type (memory, file, postgres, elasticsearch, s3) 

26 **kwargs: Backend-specific configuration options 

27  

28 Example Configuration: 

29 databases: 

30 - name: main 

31 factory: database 

32 backend: postgres 

33 host: localhost 

34 database: myapp 

35  

36 - name: cache 

37 factory: database 

38 backend: memory 

39  

40 - name: archive 

41 factory: database 

42 backend: s3 

43 bucket: my-archive-bucket 

44 prefix: archives/ 

45 """ 

46 

47 def create(self, **config: Any) -> SyncDatabase: 

48 """Create a database instance based on configuration. 

49 

50 Args: 

51 **config: Configuration including 'backend' field and backend-specific options 

52 

53 Returns: 

54 Instance of appropriate database backend 

55 

56 Raises: 

57 ValueError: If backend type is not recognized or not available 

58 """ 

59 backend_type = config.pop("backend", "memory").lower() 

60 

61 logger.info(f"Creating database with backend: {backend_type}") 

62 

63 # Check if vector_enabled is set 

64 vector_enabled = config.get("vector_enabled", False) 

65 

66 if vector_enabled: 

67 # All backends now have vector support (some native, some via Python) 

68 logger.debug(f"Vector support enabled for backend: {backend_type}") 

69 

70 # Get backend class from registry 

71 try: 

72 backend_class = sync_backends.get(backend_type) 

73 except Exception as e: 

74 # Backend not found - provide helpful error message 

75 available = sync_backends.list_keys() 

76 raise ValueError( 

77 f"Unknown backend type: {backend_type}. " 

78 f"Available backends: {', '.join(sorted(set(available)))}" 

79 ) from e 

80 

81 # Create and return backend instance 

82 return backend_class.from_config(config) 

83 

84 

85 def get_backend_info(self, backend_type: str) -> dict[str, Any]: 

86 """Get information about a specific backend. 

87 

88 Args: 

89 backend_type: Name of the backend 

90 

91 Returns: 

92 Dictionary with backend information from registry metadata 

93 """ 

94 # Normalize to lowercase for case-insensitive lookup 

95 backend_type = backend_type.lower() 

96 

97 # Check if backend exists first 

98 if not sync_backends.has(backend_type): 

99 return { 

100 "description": "Unknown backend", 

101 "error": f"Backend '{backend_type}' not recognized", 

102 } 

103 

104 # Get metadata from registry 

105 metrics = sync_backends.get_metrics(backend_type) 

106 return metrics.get("metadata", {}) 

107 

108 

109class AsyncDatabaseFactory(FactoryBase): 

110 """Factory for creating async database backends. 

111  

112 Note: Currently only some backends support async operations. 

113 """ 

114 

115 def create(self, **config: Any) -> Any: 

116 """Create an async database instance. 

117 

118 Args: 

119 **config: Configuration including 'backend' field 

120 

121 Returns: 

122 Instance of appropriate async database backend 

123 

124 Raises: 

125 ValueError: If backend doesn't support async operations 

126 """ 

127 backend_type = config.pop("backend", "memory").lower() 

128 

129 # Check if vector_enabled is set 

130 vector_enabled = config.get("vector_enabled", False) 

131 

132 if vector_enabled: 

133 # All backends now have vector support (some native, some via Python) 

134 logger.debug(f"Vector support enabled for async backend: {backend_type}") 

135 

136 # Get backend class from registry 

137 try: 

138 backend_class = async_backends.get(backend_type) 

139 except Exception as e: 

140 # Backend not found - provide helpful error message 

141 available = async_backends.list_keys() 

142 raise ValueError( 

143 f"Backend '{backend_type}' does not support async operations yet. " 

144 f"Available async backends: {', '.join(sorted(set(available)))}" 

145 ) from e 

146 

147 # Create and return backend instance 

148 return backend_class.from_config(config) 

149 

150 

151# TODO: Add AsyncVectorStoreFactory when async vector stores are implemented 

152# The async vector store implementations (AsyncFaissVectorStore, AsyncChromaVectorStore,  

153# AsyncMemoryVectorStore) and base class (AsyncVectorStore) need to be created first. 

154 

155 

156# Create singleton instances for registration 

157database_factory = DatabaseFactory() 

158async_database_factory = AsyncDatabaseFactory() 

159vector_store_factory = VectorStoreFactory() 

160# TODO: add an 'async_vector_store_factory = AsyncVectorStoreFactory()' when async vector stores are implemented