mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-12 04:15:25 +00:00
pgvector
This commit is contained in:
parent
46288924a2
commit
8231588eb4
1 changed files with 150 additions and 16 deletions
|
|
@ -461,30 +461,157 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
|||
|
||||
class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||
"""
|
||||
Placeholder implementation for PGVector database cleanup.
|
||||
PGVector database cleanup implementation.
|
||||
|
||||
This is a stub implementation that can be extended by the community
|
||||
to support PGVector-specific cleanup operations.
|
||||
|
||||
According to PR feedback, PGVector stores data in document_chunk table
|
||||
and cleanup should involve finding rows with matching file IDs.
|
||||
Leverages the existing PGVector client's delete() method for simple,
|
||||
reliable collection cleanup while maintaining comprehensive error handling
|
||||
and safety features.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Validate that we can access the PGVector client
|
||||
try:
|
||||
if VECTOR_DB_CLIENT is None:
|
||||
raise Exception("VECTOR_DB_CLIENT is not available")
|
||||
# Test if we can access the session
|
||||
if hasattr(VECTOR_DB_CLIENT, 'session') and VECTOR_DB_CLIENT.session:
|
||||
self.session = VECTOR_DB_CLIENT.session
|
||||
log.debug("PGVector cleaner initialized successfully")
|
||||
else:
|
||||
raise Exception("PGVector client session not available")
|
||||
except Exception as e:
|
||||
log.error(f"Failed to initialize PGVector client for cleanup: {e}")
|
||||
self.session = None
|
||||
|
||||
def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
||||
"""Count orphaned PGVector collections - to be implemented by community."""
|
||||
log.debug("PGVector collection counting not yet implemented")
|
||||
"""Count orphaned PGVector collections for preview."""
|
||||
if not self.session:
|
||||
log.warning("PGVector session not available for counting orphaned collections")
|
||||
return 0
|
||||
|
||||
try:
|
||||
orphaned_collections = self._get_orphaned_collections(active_file_ids, active_kb_ids)
|
||||
self.session.rollback() # Read-only transaction
|
||||
return len(orphaned_collections)
|
||||
|
||||
except Exception as e:
|
||||
if self.session:
|
||||
self.session.rollback()
|
||||
log.error(f"Error counting orphaned PGVector collections: {e}")
|
||||
return 0
|
||||
|
||||
def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
||||
"""Cleanup orphaned PGVector collections - to be implemented by community."""
|
||||
log.debug("PGVector collection cleanup not yet implemented")
|
||||
"""
|
||||
Delete orphaned PGVector collections using the existing client's delete method.
|
||||
|
||||
This is the "super easy" approach suggested by @recrudesce - just use the
|
||||
existing PGVector client's delete() method for each orphaned collection.
|
||||
"""
|
||||
if not self.session:
|
||||
log.warning("PGVector session not available for cleanup")
|
||||
return 0
|
||||
|
||||
try:
|
||||
orphaned_collections = self._get_orphaned_collections(active_file_ids, active_kb_ids)
|
||||
|
||||
if not orphaned_collections:
|
||||
log.debug("No orphaned PGVector collections found")
|
||||
return 0
|
||||
|
||||
deleted_count = 0
|
||||
log.info(f"Deleting {len(orphaned_collections)} orphaned PGVector collections")
|
||||
|
||||
# SIMPLIFIED DELETION: Use existing PGVector client delete method
|
||||
for collection_name in orphaned_collections:
|
||||
try:
|
||||
# This is @recrudesce's "super easy" approach:
|
||||
# Just call the existing delete method!
|
||||
VECTOR_DB_CLIENT.delete(collection_name)
|
||||
deleted_count += 1
|
||||
log.debug(f"Deleted PGVector collection: {collection_name}")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Failed to delete PGVector collection '{collection_name}': {e}")
|
||||
# Continue with other collections even if one fails
|
||||
continue
|
||||
|
||||
# PostgreSQL-specific optimization (if we have access to session)
|
||||
try:
|
||||
if self.session:
|
||||
self.session.execute(text("VACUUM ANALYZE document_chunk"))
|
||||
self.session.commit()
|
||||
log.debug("Executed VACUUM ANALYZE on document_chunk table")
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to VACUUM PGVector table: {e}")
|
||||
|
||||
if deleted_count > 0:
|
||||
log.info(f"Successfully deleted {deleted_count} orphaned PGVector collections")
|
||||
|
||||
return deleted_count
|
||||
|
||||
except Exception as e:
|
||||
if self.session:
|
||||
self.session.rollback()
|
||||
log.error(f"Error cleaning orphaned PGVector collections: {e}")
|
||||
return 0
|
||||
|
||||
def delete_collection(self, collection_name: str) -> bool:
|
||||
"""Delete PGVector collection - to be implemented by community."""
|
||||
log.debug(f"PGVector collection deletion not yet implemented: {collection_name}")
|
||||
"""
|
||||
Delete a specific PGVector collection using the existing client method.
|
||||
|
||||
Super simple - just call the existing delete method!
|
||||
"""
|
||||
try:
|
||||
# @recrudesce's "super easy" approach: use existing client!
|
||||
VECTOR_DB_CLIENT.delete(collection_name)
|
||||
log.debug(f"Deleted PGVector collection: {collection_name}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error deleting PGVector collection '{collection_name}': {e}")
|
||||
return False
|
||||
|
||||
def _get_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> Set[str]:
|
||||
"""
|
||||
Find collections that exist in PGVector but are no longer referenced.
|
||||
|
||||
This is the only "complex" part - discovery. The actual deletion is simple!
|
||||
"""
|
||||
try:
|
||||
expected_collections = self._build_expected_collections(active_file_ids, active_kb_ids)
|
||||
|
||||
# Query distinct collection names from document_chunk table
|
||||
result = self.session.execute(
|
||||
text("SELECT DISTINCT collection_name FROM document_chunk")
|
||||
).fetchall()
|
||||
|
||||
existing_collections = {row[0] for row in result}
|
||||
orphaned_collections = existing_collections - expected_collections
|
||||
|
||||
log.debug(f"Found {len(existing_collections)} existing collections, "
|
||||
f"{len(expected_collections)} expected, "
|
||||
f"{len(orphaned_collections)} orphaned")
|
||||
|
||||
return orphaned_collections
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error finding orphaned PGVector collections: {e}")
|
||||
return set()
|
||||
|
||||
def _build_expected_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> Set[str]:
|
||||
"""Build set of collection names that should exist."""
|
||||
expected_collections = set()
|
||||
|
||||
# File collections use "file-{id}" pattern (same as ChromaDB)
|
||||
for file_id in active_file_ids:
|
||||
expected_collections.add(f"file-{file_id}")
|
||||
|
||||
# Knowledge base collections use the KB ID directly (same as ChromaDB)
|
||||
for kb_id in active_kb_ids:
|
||||
expected_collections.add(kb_id)
|
||||
|
||||
return expected_collections
|
||||
|
||||
|
||||
class NoOpVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||
"""
|
||||
|
|
@ -524,7 +651,7 @@ def get_vector_database_cleaner() -> VectorDatabaseCleaner:
|
|||
log.debug("Using ChromaDB cleaner")
|
||||
return ChromaDatabaseCleaner()
|
||||
elif "pgvector" in vector_db_type:
|
||||
log.debug("Using PGVector cleaner (placeholder implementation)")
|
||||
log.debug("Using PGVector cleaner")
|
||||
return PGVectorDatabaseCleaner()
|
||||
else:
|
||||
log.debug(f"No specific cleaner for vector database type: {VECTOR_DB}, using no-op cleaner")
|
||||
|
|
@ -1297,6 +1424,13 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
|||
log.info("Vacuumed ChromaDB database")
|
||||
except Exception as e:
|
||||
log.error(f"Failed to vacuum ChromaDB database: {e}")
|
||||
elif isinstance(vector_cleaner, PGVectorDatabaseCleaner) and vector_cleaner.session:
|
||||
try:
|
||||
vector_cleaner.session.execute(text("VACUUM ANALYZE"))
|
||||
vector_cleaner.session.commit()
|
||||
log.info("Executed VACUUM ANALYZE on PostgreSQL database")
|
||||
except Exception as e:
|
||||
log.error(f"Failed to vacuum PostgreSQL database: {e}")
|
||||
|
||||
log.info("Data pruning completed successfully")
|
||||
return True
|
||||
|
|
|
|||
Loading…
Reference in a new issue