Skip to content

pgVectorDB Core Class

pgvectordb.core.pgVectorDB

Bases: SearchMixin, DocumentsMixin, IndexingMixin, AnalyticsMixin, StorageMixin, MultimodalMixin, IntegrationsMixin

Production-ready RAG system with multi-index support.

Index Types: - HNSW: Fast queries, high recall, in-memory (best for <1M vectors) - IVFFlat: Balanced performance, configurable (best for 100K-10M vectors) - DiskANN: Scalable, disk-based, label filtering (best for >10M vectors)

Search Methods: 1. keyword_search - Pure FTS 2. universal_keyword_search - FTS + metadata fields 3. semantic_search - Vector similarity 4. metadata_filter - Pure metadata filtering (no query) 5. metadata_keyword_search - Filtered FTS 6. metadata_semantic_search - Filtered vector search 7. hybrid_search - Combined keyword + semantic (with optional RRF) 8. ensemble_search - Filtered hybrid search (with optional RRF) 9. trigram_search - Fuzzy text matching (typo-tolerant) 10. metadata_trigram_search - Filtered fuzzy search

Filter Operators (13): Comparison: $eq, $ne, $lt, $lte, $gt, $gte Range: $between Set: $in, $nin Existence: $exists Pattern: $like, $ilike Logical: $and, $or

Example: >>> from langchain_huggingface import HuggingFaceEmbeddings >>> >>> embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") >>> rag = pgVectorDB( ... collection_name="my_documents", ... embedding_model=embeddings, ... connection_string="postgresql+asyncpg://user:pass@localhost/db", ... index_type=IndexType.DISKANN ... ) >>> await rag.initialize() >>> await rag.add_documents(documents, labels=doc_labels) >>> await rag.build_index(include_labels=True) >>> results = await rag.semantic_search("AI applications", k=5, label_filter=[1, 2])

Source code in pgvectordb\core.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
class pgVectorDB(
    SearchMixin,
    DocumentsMixin,
    IndexingMixin,
    AnalyticsMixin,
    StorageMixin,
    MultimodalMixin,
    IntegrationsMixin,
):
    """
    Production-ready RAG system with multi-index support.

    **Index Types:**
    - HNSW: Fast queries, high recall, in-memory (best for <1M vectors)
    - IVFFlat: Balanced performance, configurable (best for 100K-10M vectors)
    - DiskANN: Scalable, disk-based, label filtering (best for >10M vectors)

    **Search Methods:**
    1. keyword_search - Pure FTS
    2. universal_keyword_search - FTS + metadata fields
    3. semantic_search - Vector similarity
    4. metadata_filter - Pure metadata filtering (no query)
    5. metadata_keyword_search - Filtered FTS
    6. metadata_semantic_search - Filtered vector search
    7. hybrid_search - Combined keyword + semantic (with optional RRF)
    8. ensemble_search - Filtered hybrid search (with optional RRF)
    9. trigram_search - Fuzzy text matching (typo-tolerant)
    10. metadata_trigram_search - Filtered fuzzy search

    **Filter Operators (13):**
    Comparison: $eq, $ne, $lt, $lte, $gt, $gte
    Range: $between
    Set: $in, $nin
    Existence: $exists
    Pattern: $like, $ilike
    Logical: $and, $or

    **Example:**
        >>> from langchain_huggingface import HuggingFaceEmbeddings
        >>>
        >>> embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
        >>> rag = pgVectorDB(
        ...     collection_name="my_documents",
        ...     embedding_model=embeddings,
        ...     connection_string="postgresql+asyncpg://user:pass@localhost/db",
        ...     index_type=IndexType.DISKANN
        ... )
        >>> await rag.initialize()
        >>> await rag.add_documents(documents, labels=doc_labels)
        >>> await rag.build_index(include_labels=True)
        >>> results = await rag.semantic_search("AI applications", k=5, label_filter=[1, 2])
    """

    def __init__(
        self,
        collection_name: str,
        embedding_model: Embeddings,
        connection_string: str,
        schema_name: str = "public",
        index_type: IndexType = IndexType.HNSW,
        pool_size: int = 5,
        max_overflow: int = 10,
    ):
        """
        Initialize production RAG system.

        Args:
            collection_name: Name of the document collection/table
            embedding_model: LangChain embeddings model
            connection_string: PostgreSQL connection string (asyncpg format)
            schema_name: Database schema name (default: "public")
            index_type: Type of vector index (HNSW, IVFFlat, or DiskANN)
            pool_size: Connection pool size (default: 5)
            max_overflow: Max overflow connections (default: 10)

        **Security Note:** `collection_name` must be alphanumeric (plus underscores). Special characters are not allowed to prevent SQL injection in index names.

        Raises:
            ValidationError: If inputs are invalid
            DatabaseError: If connection fails
        """
        self._validate_init_params(collection_name, connection_string)

        self.table_name = collection_name
        self.embedding_model = embedding_model
        self.connection_string = connection_string
        self.schema_name = schema_name
        self._validate_schema_name(schema_name)  # Security: validate before SQL use
        self.index_type = (
            IndexType(index_type) if isinstance(index_type, str) else index_type
        )

        # Create engine with connection pooling
        try:
            self.sqlalchemy_engine: AsyncEngine = create_async_engine(
                self.connection_string,
                pool_size=pool_size,
                max_overflow=max_overflow,
                pool_pre_ping=True,
                echo=False,
            )
        except Exception as e:
            raise DatabaseError(f"Failed to create database engine: {e}") from e

        self.engine: PGEngine = PGEngine.from_engine(self.sqlalchemy_engine)
        self._vector_store: Optional[PGVectorStore] = None
        self.vector_size = self._get_embedding_dimension()
        self._index_built = False
        self._query_params: Dict[str, Any] = {}  # Store tuning params (search)
        self._diskann_build_params: Dict[str, Any] = {}  # Store tuning params (build)

        # Load default query params from Config
        self._query_params["ivfflat.probes"] = Config.DEFAULT_IVFFLAT_PROBES
        self._query_params["hnsw.ef_search"] = Config.DEFAULT_HNSW_EF_SEARCH

        # Extension manager for graceful degradation (v2.2.0)
        self._extensions: Optional[Any] = None
        if ExtensionManager is not None:
            self._extensions = ExtensionManager(self.sqlalchemy_engine)

        logger.info(
            f"pgVectorDB initialized: '{collection_name}' with {self.index_type.value} "
            f"(vector_size={self.vector_size})"
        )

    def _validate_init_params(
        self, collection_name: str, connection_string: str
    ) -> None:
        """Validate initialization parameters."""
        if not collection_name or not isinstance(collection_name, str):
            raise ValidationError("collection_name must be a non-empty string")
        if not re.match(r"^[a-zA-Z0-9_]+$", collection_name):
            raise ValidationError(
                "collection_name must contain only alphanumeric characters and underscores"
            )
        if not connection_string or not isinstance(connection_string, str):
            raise ValidationError("connection_string must be a non-empty string")
        if not connection_string.startswith("postgresql"):
            raise ValidationError(
                "connection_string must be a valid PostgreSQL connection string"
            )

    def _validate_schema_name(self, schema_name: str) -> None:
        """Validate schema name to prevent SQL injection."""
        if not schema_name or not isinstance(schema_name, str):
            raise ValidationError("schema_name must be a non-empty string")
        if not re.match(r"^[a-zA-Z0-9_]+$", schema_name):
            raise ValidationError(
                "schema_name must contain only alphanumeric characters and underscores"
            )

    def _get_embedding_dimension(self) -> int:
        """Automatically detect embedding dimension."""
        try:
            dimension = len(self.embedding_model.embed_query("test"))
            if dimension <= 0:
                raise ValidationError("Embedding dimension must be positive")
            return dimension
        except Exception as e:
            raise ValidationError(
                f"Could not determine embedding dimension: {e}"
            ) from e

    async def _ensure_extensions(self) -> None:
        """
        Ensure all required PostgreSQL extensions are installed.

        Extensions managed:
        - vector: Core pgvector extension for vector operations (REQUIRED)
        - pg_trgm: Trigram similarity for fuzzy text matching (REQUIRED)
        - vectorscale: DiskANN index support (OPTIONAL - required for DiskANN)
        - pg_textsearch: BM25 search ranking (OPTIONAL)

        Raises:
            InitializationError: If DiskANN requested but vectorscale not available
            DatabaseError: If extension creation fails
        """
        try:
            # Use ExtensionManager if available
            if self._extensions is not None:
                await self._extensions.check_extensions()

            async with self.sqlalchemy_engine.connect() as conn:
                # Core vector extension (required for all index types)
                await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
                logger.info("✓ Extension 'vector' enabled")

                # Trigram extension for fuzzy matching
                await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
                logger.info("✓ Extension 'pg_trgm' enabled")

                # pg_textsearch extension for native BM25 (optional)
                if self._extensions is not None:
                    await self._extensions.ensure_pg_textsearch()
                else:
                    result = await conn.execute(
                        text(
                            "SELECT * FROM pg_available_extensions WHERE name = 'pg_textsearch';"
                        )
                    )
                    if result.fetchone() is not None:
                        await conn.execute(
                            text("CREATE EXTENSION IF NOT EXISTS pg_textsearch;")
                        )
                        logger.info(
                            "✓ Extension 'pg_textsearch' enabled (BM25 support)"
                        )
                    else:
                        logger.warning(
                            "pg_textsearch extension not available. BM25 search will use FTS fallback."
                        )

                # DiskANN extension (only if needed)
                if self.index_type == IndexType.DISKANN:
                    if self._extensions is not None:
                        self._extensions.require_vectorscale("initialize DiskANN index")
                        await self._extensions.ensure_vectorscale()
                    else:
                        result = await conn.execute(
                            text(
                                "SELECT * FROM pg_available_extensions WHERE name = 'vectorscale';"
                            )
                        )
                        if result.fetchone() is None:
                            raise InitializationError(
                                "Cannot use DiskANN index: vectorscale extension is not installed.\n\n"
                                "To install vectorscale:\n"
                                "1. Follow installation at https://github.com/timescale/pgvectorscale\n"
                                "2. Run: CREATE EXTENSION vectorscale CASCADE;\n\n"
                                "Alternative: Use IndexType.HNSW or IndexType.IVFFLAT instead."
                            )
                        await conn.execute(
                            text("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;")
                        )
                        logger.info("✓ Extension 'vectorscale' enabled")

                # Compare versions
                await self._check_extension_versions(conn)

                await conn.commit()
        except InitializationError:
            raise
        except Exception as e:
            raise DatabaseError(f"Failed to ensure extensions: {e}") from e

    async def _check_extension_versions(self, conn: Any) -> None:
        """Check installed extension versions against minimum requirements."""
        try:
            result = await conn.execute(
                text(
                    "SELECT extname, extversion FROM pg_extension WHERE extname IN ('vector', 'vectorscale')"
                )
            )
            for row in result.fetchall():
                ext_name, ext_ver = row[0], row[1]
                if ext_name == "vector":
                    if version.parse(ext_ver) < version.parse(
                        Config.MIN_VECTOR_VERSION
                    ):
                        logger.warning(
                            f"Extension 'vector' version {ext_ver} is older than recommended {Config.MIN_VECTOR_VERSION}"
                        )
                elif ext_name == "vectorscale":
                    if version.parse(ext_ver) < version.parse(
                        Config.MIN_VECTORSCALE_VERSION
                    ):
                        logger.warning(
                            f"Extension 'vectorscale' version {ext_ver} is older than recommended {Config.MIN_VECTORSCALE_VERSION}"
                        )
        except Exception as e:
            logger.warning(f"Could not verify extension versions: {e}")

    async def _setup_full_text_search(self) -> None:
        """Creates tsvector column, trigger, and GIN indexes for full-text and trigram search."""
        qualified_table = build_qualified_name(self.schema_name, self.table_name)

        try:
            async with self.sqlalchemy_engine.connect() as conn:
                # Add tsvector column
                await conn.execute(
                    text(
                        f"ALTER TABLE {qualified_table} "
                        f"ADD COLUMN IF NOT EXISTS content_tsvector tsvector"
                    )
                )

                # Create trigger function
                function_ddl = """
                CREATE OR REPLACE FUNCTION update_content_tsvector() RETURNS TRIGGER AS $$
                BEGIN
                    NEW.content_tsvector := to_tsvector('english', COALESCE(NEW.content, ''));
                    RETURN NEW;
                END;
                $$ LANGUAGE plpgsql;
                """
                await conn.execute(text(function_ddl))

                # Create trigger
                trigger_name = quote_identifier(f"tsvector_update_on_{self.table_name}")
                await conn.execute(
                    text(f"DROP TRIGGER IF EXISTS {trigger_name} ON {qualified_table};")
                )
                create_trigger_ddl = f"""
                CREATE TRIGGER {trigger_name} 
                BEFORE INSERT OR UPDATE ON {qualified_table}
                FOR EACH ROW EXECUTE FUNCTION update_content_tsvector();
                """
                await conn.execute(text(create_trigger_ddl))

                # Create GIN index on tsvector
                index_name = quote_identifier(f"idx_{self.table_name}_content_tsvector")
                await conn.execute(
                    text(
                        f"CREATE INDEX IF NOT EXISTS {index_name} "
                        f"ON {qualified_table} USING GIN(content_tsvector);"
                    )
                )
                logger.info("✓ Full-text search index created")

                # Create trigram GIN index for similarity search
                trigram_index_name = quote_identifier(
                    f"idx_{self.table_name}_content_trgm"
                )
                await conn.execute(
                    text(
                        f"CREATE INDEX IF NOT EXISTS {trigram_index_name} "
                        f"ON {qualified_table} USING GIN(content gin_trgm_ops);"
                    )
                )
                logger.info("✓ Trigram similarity index created")

                await conn.commit()
        except Exception as e:
            raise DatabaseError(f"Failed to setup full-text search: {e}") from e

    async def initialize(self, overwrite_existing: bool = False) -> None:
        """
        Complete system initialization - NO manual SQL required!

        This method handles everything automatically:
        1. Creates all required PostgreSQL extensions (vector, pg_trgm, vectorscale)
        2. Creates the main table with proper schema
        3. Sets up full-text search (tsvector, trigger, GIN index)
        4. Sets up trigram similarity index
        5. Initializes vector store

        Args:
            overwrite_existing: If True, drops existing table (default: False)

        Raises:
            DatabaseError: If initialization fails

        Examples:
            >>> rag = pgVectorDB(...)
            >>> await rag.initialize()  # That's it! No SQL needed
        """
        try:
            # Step 1: Ensure all extensions are installed
            logger.info("Step 1/5: Ensuring PostgreSQL extensions...")
            await self._ensure_extensions()

            # Step 2: Create table
            logger.info("Step 2/5: Creating table...")
            await self.engine.ainit_vectorstore_table(
                table_name=self.table_name,
                vector_size=self.vector_size,
                schema_name=self.schema_name,
                overwrite_existing=overwrite_existing,
            )
            logger.info(f"✓ Table '{self.table_name}' created")

            # Step 3: Setup full-text search and trigram indexes
            logger.info("Step 3/5: Setting up search indexes...")
            await self._setup_full_text_search()

            # Step 4: Initialize vector store
            logger.info("Step 4/5: Initializing vector store...")
            self._vector_store = await PGVectorStore.create(
                engine=self.engine,
                embedding_service=self.embedding_model,
                table_name=self.table_name,
                schema_name=self.schema_name,
            )
            logger.info("✓ Vector store initialized")

            # Step 5: Done!
            logger.info(
                f"✓ Step 5/5: System ready with {self.index_type.value} index (vector_size={self.vector_size})"
            )
            logger.info("=" * 80)
            logger.info("🚀 Production RAG System initialized successfully!")
            logger.info(
                "   - Extensions: vector, pg_trgm"
                + (", vectorscale" if self.index_type == IndexType.DISKANN else "")
            )
            logger.info(f"   - Table: {self.schema_name}.{self.table_name}")
            logger.info("   - Indexes: Full-text search, Trigram similarity")
            logger.info("   - Ready for: 10 search methods")
            logger.info("=" * 80)
        except Exception as e:
            raise DatabaseError(f"Failed to initialize system: {e}") from e

    def _ensure_initialized(self) -> None:
        """Ensure the system is initialized before operations."""
        if not self._vector_store:
            raise InitializationError(
                "System not initialized. Call initialize() first."
            )

    async def close(self) -> None:
        """Close database connections and cleanup resources."""
        try:
            await self.sqlalchemy_engine.dispose()
            logger.info("Database connections closed")
        except Exception as e:
            logger.error(f"Error closing connections: {e}")
            raise DatabaseError(f"Failed to close connections: {e}") from e

Functions

__init__(collection_name, embedding_model, connection_string, schema_name='public', index_type=IndexType.HNSW, pool_size=5, max_overflow=10)

Initialize production RAG system.

Parameters:

Name Type Description Default
collection_name str

Name of the document collection/table

required
embedding_model Embeddings

LangChain embeddings model

required
connection_string str

PostgreSQL connection string (asyncpg format)

required
schema_name str

Database schema name (default: "public")

'public'
index_type IndexType

Type of vector index (HNSW, IVFFlat, or DiskANN)

HNSW
pool_size int

Connection pool size (default: 5)

5
max_overflow int

Max overflow connections (default: 10)

10

Security Note: collection_name must be alphanumeric (plus underscores). Special characters are not allowed to prevent SQL injection in index names.

Raises:

Type Description
ValidationError

If inputs are invalid

DatabaseError

If connection fails

Source code in pgvectordb\core.py
def __init__(
    self,
    collection_name: str,
    embedding_model: Embeddings,
    connection_string: str,
    schema_name: str = "public",
    index_type: IndexType = IndexType.HNSW,
    pool_size: int = 5,
    max_overflow: int = 10,
):
    """
    Initialize production RAG system.

    Args:
        collection_name: Name of the document collection/table
        embedding_model: LangChain embeddings model
        connection_string: PostgreSQL connection string (asyncpg format)
        schema_name: Database schema name (default: "public")
        index_type: Type of vector index (HNSW, IVFFlat, or DiskANN)
        pool_size: Connection pool size (default: 5)
        max_overflow: Max overflow connections (default: 10)

    **Security Note:** `collection_name` must be alphanumeric (plus underscores). Special characters are not allowed to prevent SQL injection in index names.

    Raises:
        ValidationError: If inputs are invalid
        DatabaseError: If connection fails
    """
    self._validate_init_params(collection_name, connection_string)

    self.table_name = collection_name
    self.embedding_model = embedding_model
    self.connection_string = connection_string
    self.schema_name = schema_name
    self._validate_schema_name(schema_name)  # Security: validate before SQL use
    self.index_type = (
        IndexType(index_type) if isinstance(index_type, str) else index_type
    )

    # Create engine with connection pooling
    try:
        self.sqlalchemy_engine: AsyncEngine = create_async_engine(
            self.connection_string,
            pool_size=pool_size,
            max_overflow=max_overflow,
            pool_pre_ping=True,
            echo=False,
        )
    except Exception as e:
        raise DatabaseError(f"Failed to create database engine: {e}") from e

    self.engine: PGEngine = PGEngine.from_engine(self.sqlalchemy_engine)
    self._vector_store: Optional[PGVectorStore] = None
    self.vector_size = self._get_embedding_dimension()
    self._index_built = False
    self._query_params: Dict[str, Any] = {}  # Store tuning params (search)
    self._diskann_build_params: Dict[str, Any] = {}  # Store tuning params (build)

    # Load default query params from Config
    self._query_params["ivfflat.probes"] = Config.DEFAULT_IVFFLAT_PROBES
    self._query_params["hnsw.ef_search"] = Config.DEFAULT_HNSW_EF_SEARCH

    # Extension manager for graceful degradation (v2.2.0)
    self._extensions: Optional[Any] = None
    if ExtensionManager is not None:
        self._extensions = ExtensionManager(self.sqlalchemy_engine)

    logger.info(
        f"pgVectorDB initialized: '{collection_name}' with {self.index_type.value} "
        f"(vector_size={self.vector_size})"
    )

initialize(overwrite_existing=False) async

Complete system initialization - NO manual SQL required!

This method handles everything automatically: 1. Creates all required PostgreSQL extensions (vector, pg_trgm, vectorscale) 2. Creates the main table with proper schema 3. Sets up full-text search (tsvector, trigger, GIN index) 4. Sets up trigram similarity index 5. Initializes vector store

Parameters:

Name Type Description Default
overwrite_existing bool

If True, drops existing table (default: False)

False

Raises:

Type Description
DatabaseError

If initialization fails

Examples:

>>> rag = pgVectorDB(...)
>>> await rag.initialize()  # That's it! No SQL needed
Source code in pgvectordb\core.py
async def initialize(self, overwrite_existing: bool = False) -> None:
    """
    Complete system initialization - NO manual SQL required!

    This method handles everything automatically:
    1. Creates all required PostgreSQL extensions (vector, pg_trgm, vectorscale)
    2. Creates the main table with proper schema
    3. Sets up full-text search (tsvector, trigger, GIN index)
    4. Sets up trigram similarity index
    5. Initializes vector store

    Args:
        overwrite_existing: If True, drops existing table (default: False)

    Raises:
        DatabaseError: If initialization fails

    Examples:
        >>> rag = pgVectorDB(...)
        >>> await rag.initialize()  # That's it! No SQL needed
    """
    try:
        # Step 1: Ensure all extensions are installed
        logger.info("Step 1/5: Ensuring PostgreSQL extensions...")
        await self._ensure_extensions()

        # Step 2: Create table
        logger.info("Step 2/5: Creating table...")
        await self.engine.ainit_vectorstore_table(
            table_name=self.table_name,
            vector_size=self.vector_size,
            schema_name=self.schema_name,
            overwrite_existing=overwrite_existing,
        )
        logger.info(f"✓ Table '{self.table_name}' created")

        # Step 3: Setup full-text search and trigram indexes
        logger.info("Step 3/5: Setting up search indexes...")
        await self._setup_full_text_search()

        # Step 4: Initialize vector store
        logger.info("Step 4/5: Initializing vector store...")
        self._vector_store = await PGVectorStore.create(
            engine=self.engine,
            embedding_service=self.embedding_model,
            table_name=self.table_name,
            schema_name=self.schema_name,
        )
        logger.info("✓ Vector store initialized")

        # Step 5: Done!
        logger.info(
            f"✓ Step 5/5: System ready with {self.index_type.value} index (vector_size={self.vector_size})"
        )
        logger.info("=" * 80)
        logger.info("🚀 Production RAG System initialized successfully!")
        logger.info(
            "   - Extensions: vector, pg_trgm"
            + (", vectorscale" if self.index_type == IndexType.DISKANN else "")
        )
        logger.info(f"   - Table: {self.schema_name}.{self.table_name}")
        logger.info("   - Indexes: Full-text search, Trigram similarity")
        logger.info("   - Ready for: 10 search methods")
        logger.info("=" * 80)
    except Exception as e:
        raise DatabaseError(f"Failed to initialize system: {e}") from e

close() async

Close database connections and cleanup resources.

Source code in pgvectordb\core.py
async def close(self) -> None:
    """Close database connections and cleanup resources."""
    try:
        await self.sqlalchemy_engine.dispose()
        logger.info("Database connections closed")
    except Exception as e:
        logger.error(f"Error closing connections: {e}")
        raise DatabaseError(f"Failed to close connections: {e}") from e