diff --git a/backend/open_webui/routers/prune.py b/backend/open_webui/routers/prune.py index 530cb754d0..a4d6fc588f 100644 --- a/backend/open_webui/routers/prune.py +++ b/backend/open_webui/routers/prune.py @@ -7,6 +7,7 @@ import re import sqlite3 from typing import Optional, Set, Union from pathlib import Path +from abc import ABC, abstractmethod from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel @@ -84,6 +85,276 @@ class JSONFileIDExtractor: return validated_ids +class VectorDatabaseCleaner(ABC): + """ + Abstract base class for vector database cleanup operations. + + This interface defines the contract that all vector database implementations + must follow. Community contributors can implement support for new vector + databases by extending this class. + + Supported operations: + - Count orphaned collections (for dry-run preview) + - Cleanup orphaned collections (actual deletion) + - Delete individual collections by name + """ + + @abstractmethod + def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """ + Count how many orphaned vector collections would be deleted. + + Args: + active_file_ids: Set of file IDs that are still referenced + active_kb_ids: Set of knowledge base IDs that are still active + + Returns: + Number of orphaned collections that would be deleted + """ + pass + + @abstractmethod + def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """ + Actually delete orphaned vector collections. + + Args: + active_file_ids: Set of file IDs that are still referenced + active_kb_ids: Set of knowledge base IDs that are still active + + Returns: + Number of collections that were actually deleted + """ + pass + + @abstractmethod + def delete_collection(self, collection_name: str) -> bool: + """ + Delete a specific vector collection by name. + + Args: + collection_name: Name of the collection to delete + + Returns: + True if deletion was successful, False otherwise + """ + pass + + +class ChromaDatabaseCleaner(VectorDatabaseCleaner): + """ + ChromaDB-specific implementation of vector database cleanup. + + Handles ChromaDB's specific storage structure including: + - SQLite metadata database (chroma.sqlite3) + - Physical vector storage directories + - Collection name to UUID mapping + - Segment-based storage architecture + """ + + def __init__(self): + self.vector_dir = Path(CACHE_DIR).parent / "vector_db" + self.chroma_db_path = self.vector_dir / "chroma.sqlite3" + + def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """Count orphaned ChromaDB collections for preview.""" + if not self.chroma_db_path.exists(): + return 0 + + expected_collections = self._build_expected_collections(active_file_ids, active_kb_ids) + uuid_to_collection = self._get_collection_mappings() + + count = 0 + try: + for collection_dir in self.vector_dir.iterdir(): + if not collection_dir.is_dir() or collection_dir.name.startswith("."): + continue + + dir_uuid = collection_dir.name + collection_name = uuid_to_collection.get(dir_uuid) + + if collection_name is None or collection_name not in expected_collections: + count += 1 + except Exception as e: + log.debug(f"Error counting orphaned ChromaDB collections: {e}") + + return count + + def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """Actually delete orphaned ChromaDB collections.""" + if not self.chroma_db_path.exists(): + return 0 + + expected_collections = self._build_expected_collections(active_file_ids, active_kb_ids) + uuid_to_collection = self._get_collection_mappings() + + deleted_count = 0 + + try: + for collection_dir in self.vector_dir.iterdir(): + if not collection_dir.is_dir() or collection_dir.name.startswith("."): + continue + + dir_uuid = collection_dir.name + collection_name = uuid_to_collection.get(dir_uuid) + + # Delete if no corresponding collection name or collection is not expected + if collection_name is None: + try: + shutil.rmtree(collection_dir) + deleted_count += 1 + log.debug(f"Deleted orphaned ChromaDB directory: {dir_uuid}") + except Exception as e: + log.error(f"Failed to delete orphaned directory {dir_uuid}: {e}") + + elif collection_name not in expected_collections: + try: + shutil.rmtree(collection_dir) + deleted_count += 1 + log.debug(f"Deleted orphaned ChromaDB collection: {collection_name}") + except Exception as e: + log.error(f"Failed to delete collection directory {dir_uuid}: {e}") + + except Exception as e: + log.error(f"Error cleaning ChromaDB collections: {e}") + + if deleted_count > 0: + log.info(f"Deleted {deleted_count} orphaned ChromaDB collections") + + return deleted_count + + def delete_collection(self, collection_name: str) -> bool: + """Delete a specific ChromaDB collection by name.""" + try: + # Attempt to delete via ChromaDB client first + try: + VECTOR_DB_CLIENT.delete_collection(collection_name=collection_name) + log.debug(f"Deleted ChromaDB collection via client: {collection_name}") + except Exception as e: + log.debug(f"Collection {collection_name} may not exist in ChromaDB: {e}") + + # Also clean up physical directory if it exists + # Note: ChromaDB uses UUID directories, so we'd need to map collection name to UUID + # For now, let the cleanup_orphaned_collections method handle physical cleanup + return True + + except Exception as e: + log.error(f"Error deleting ChromaDB collection {collection_name}: {e}") + return False + + def _build_expected_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> Set[str]: + """Build set of collection names that should exist.""" + expected_collections = set() + + # File collections use "file-{id}" pattern + for file_id in active_file_ids: + expected_collections.add(f"file-{file_id}") + + # Knowledge base collections use the KB ID directly + for kb_id in active_kb_ids: + expected_collections.add(kb_id) + + return expected_collections + + def _get_collection_mappings(self) -> dict: + """Get mapping from ChromaDB directory UUID to collection name.""" + uuid_to_collection = {} + + try: + with sqlite3.connect(str(self.chroma_db_path)) as conn: + # First, get collection ID to name mapping + collection_id_to_name = {} + cursor = conn.execute("SELECT id, name FROM collections") + for collection_id, collection_name in cursor.fetchall(): + collection_id_to_name[collection_id] = collection_name + + # Then, get segment ID to collection mapping (segments are the directory UUIDs) + cursor = conn.execute("SELECT id, collection FROM segments WHERE scope = 'VECTOR'") + for segment_id, collection_id in cursor.fetchall(): + if collection_id in collection_id_to_name: + collection_name = collection_id_to_name[collection_id] + uuid_to_collection[segment_id] = collection_name + + log.debug(f"Found {len(uuid_to_collection)} ChromaDB vector segments") + + except Exception as e: + log.error(f"Error reading ChromaDB metadata: {e}") + + return uuid_to_collection + + +class PGVectorDatabaseCleaner(VectorDatabaseCleaner): + """ + Placeholder implementation for PGVector database cleanup. + + This is a stub implementation that can be extended by the community + to support PGVector-specific cleanup operations. + + According to PR feedback, PGVector stores data in document_chunk table + and cleanup should involve finding rows with matching file IDs. + """ + + def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """Count orphaned PGVector collections - to be implemented by community.""" + log.debug("PGVector collection counting not yet implemented") + return 0 + + def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """Cleanup orphaned PGVector collections - to be implemented by community.""" + log.debug("PGVector collection cleanup not yet implemented") + return 0 + + def delete_collection(self, collection_name: str) -> bool: + """Delete PGVector collection - to be implemented by community.""" + log.debug(f"PGVector collection deletion not yet implemented: {collection_name}") + return True + + +class NoOpVectorDatabaseCleaner(VectorDatabaseCleaner): + """ + No-operation implementation for unsupported vector databases. + + This implementation does nothing and is used when the configured + vector database is not supported by the cleanup system. + """ + + def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """No orphaned collections to count for unsupported databases.""" + return 0 + + def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: + """No collections to cleanup for unsupported databases.""" + return 0 + + def delete_collection(self, collection_name: str) -> bool: + """No collection to delete for unsupported databases.""" + return True + + +def get_vector_database_cleaner() -> VectorDatabaseCleaner: + """ + Factory function to get the appropriate vector database cleaner. + + This function detects the configured vector database type and returns + the appropriate cleaner implementation. Community contributors can + extend this function to support additional vector databases. + + Returns: + VectorDatabaseCleaner: Appropriate implementation for the configured database + """ + vector_db_type = VECTOR_DB.lower() + + if "chroma" in vector_db_type: + log.debug("Using ChromaDB cleaner") + return ChromaDatabaseCleaner() + elif "pgvector" in vector_db_type: + log.debug("Using PGVector cleaner (placeholder implementation)") + return PGVectorDatabaseCleaner() + else: + log.debug(f"No specific cleaner for vector database type: {VECTOR_DB}, using no-op cleaner") + return NoOpVectorDatabaseCleaner() + + class PruneDataForm(BaseModel): days: Optional[int] = None exempt_archived_chats: bool = False @@ -284,55 +555,6 @@ def count_orphaned_uploads(active_file_ids: Set[str]) -> int: return count -def count_orphaned_vector_collections(active_file_ids: Set[str], active_kb_ids: Set[str]) -> int: - """Count orphaned vector collections.""" - if "chroma" not in VECTOR_DB.lower(): - return 0 - - vector_dir = Path(CACHE_DIR).parent / "vector_db" - if not vector_dir.exists(): - return 0 - - chroma_db_path = vector_dir / "chroma.sqlite3" - if not chroma_db_path.exists(): - return 0 - - expected_collections = set() - for file_id in active_file_ids: - expected_collections.add(f"file-{file_id}") - for kb_id in active_kb_ids: - expected_collections.add(kb_id) - - count = 0 - try: - uuid_to_collection = {} - with sqlite3.connect(str(chroma_db_path)) as conn: - collection_id_to_name = {} - cursor = conn.execute("SELECT id, name FROM collections") - for collection_id, collection_name in cursor.fetchall(): - collection_id_to_name[collection_id] = collection_name - - cursor = conn.execute("SELECT id, collection FROM segments WHERE scope = 'VECTOR'") - for segment_id, collection_id in cursor.fetchall(): - if collection_id in collection_id_to_name: - collection_name = collection_id_to_name[collection_id] - uuid_to_collection[segment_id] = collection_name - - for collection_dir in vector_dir.iterdir(): - if not collection_dir.is_dir() or collection_dir.name.startswith("."): - continue - - dir_uuid = collection_dir.name - collection_name = uuid_to_collection.get(dir_uuid) - - if collection_name is None or collection_name not in expected_collections: - count += 1 - except Exception as e: - log.debug(f"Error counting orphaned vector collections: {e}") - - return count - - def count_audio_cache_files(max_age_days: Optional[int]) -> int: """Count audio cache files that would be deleted.""" if max_age_days is None: @@ -469,29 +691,6 @@ def get_active_file_ids() -> Set[str]: return active_file_ids -def safe_delete_vector_collection(collection_name: str) -> bool: - """ - Safely delete a vector collection, handling both logical and physical cleanup. - """ - try: - try: - VECTOR_DB_CLIENT.delete_collection(collection_name=collection_name) - except Exception as e: - log.debug(f"Collection {collection_name} may not exist in DB: {e}") - - if "chroma" in VECTOR_DB.lower(): - vector_dir = Path(CACHE_DIR).parent / "vector_db" / collection_name - if vector_dir.exists() and vector_dir.is_dir(): - shutil.rmtree(vector_dir) - return True - - return True - - except Exception as e: - log.error(f"Error deleting vector collection {collection_name}: {e}") - return False - - def safe_delete_file_by_id(file_id: str) -> bool: """ Safely delete a file record and its associated vector collection. @@ -501,11 +700,12 @@ def safe_delete_file_by_id(file_id: str) -> bool: if not file_record: return True + # Use modular vector database cleaner + vector_cleaner = get_vector_database_cleaner() collection_name = f"file-{file_id}" - safe_delete_vector_collection(collection_name) + vector_cleaner.delete_collection(collection_name) Files.delete_file_by_id(file_id) - return True except Exception as e: @@ -560,97 +760,6 @@ def cleanup_orphaned_uploads(active_file_ids: Set[str]) -> None: log.info(f"Deleted {deleted_count} orphaned upload files") -def cleanup_orphaned_vector_collections( - active_file_ids: Set[str], active_kb_ids: Set[str] -) -> None: - """ - Clean up orphaned vector collections by querying ChromaDB metadata. - """ - if "chroma" not in VECTOR_DB.lower(): - return - - vector_dir = Path(CACHE_DIR).parent / "vector_db" - if not vector_dir.exists(): - return - - chroma_db_path = vector_dir / "chroma.sqlite3" - if not chroma_db_path.exists(): - return - - expected_collections = set() - - for file_id in active_file_ids: - expected_collections.add(f"file-{file_id}") - - for kb_id in active_kb_ids: - expected_collections.add(kb_id) - - uuid_to_collection = {} - try: - - with sqlite3.connect(str(chroma_db_path)) as conn: - collection_id_to_name = {} - cursor = conn.execute("SELECT id, name FROM collections") - rows = cursor.fetchall() - - for row in rows: - collection_id, collection_name = row - collection_id_to_name[collection_id] = collection_name - - cursor = conn.execute( - "SELECT id, collection FROM segments WHERE scope = 'VECTOR'" - ) - segment_rows = cursor.fetchall() - - for row in segment_rows: - segment_id, collection_id = row - if collection_id in collection_id_to_name: - collection_name = collection_id_to_name[collection_id] - uuid_to_collection[segment_id] = collection_name - - log.info( - f"Found {len(uuid_to_collection)} vector segments in ChromaDB metadata" - ) - - except Exception as e: - log.error(f"Error reading ChromaDB metadata: {e}") - return - - deleted_count = 0 - - try: - for collection_dir in vector_dir.iterdir(): - if not collection_dir.is_dir(): - continue - - dir_uuid = collection_dir.name - - if dir_uuid.startswith("."): - continue - - collection_name = uuid_to_collection.get(dir_uuid) - - if collection_name is None: - try: - shutil.rmtree(collection_dir) - deleted_count += 1 - except Exception as e: - log.error(f"Failed to delete orphaned directory {dir_uuid}: {e}") - - elif collection_name not in expected_collections: - try: - shutil.rmtree(collection_dir) - deleted_count += 1 - except Exception as e: - log.error(f"Failed to delete collection directory {dir_uuid}: {e}") - - except Exception as e: - log.error(f"Error cleaning vector collections: {e}") - - if deleted_count > 0: - log.info(f"Deleted {deleted_count} orphaned vector collections") - - def delete_inactive_users( inactive_days: int, exempt_admin: bool = True, @@ -755,6 +864,9 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)): If dry_run=False, performs actual deletion and returns True on success. """ try: + # Get vector database cleaner based on configuration + vector_cleaner = get_vector_database_cleaner() + if form_data.dry_run: log.info("Starting data pruning preview (dry run)") @@ -786,7 +898,7 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)): orphaned_notes=orphaned_counts["notes"], orphaned_folders=orphaned_counts["folders"], orphaned_uploads=count_orphaned_uploads(active_file_ids), - orphaned_vector_collections=count_orphaned_vector_collections(active_file_ids, active_kb_ids), + orphaned_vector_collections=vector_cleaner.count_orphaned_collections(active_file_ids, active_kb_ids), audio_cache_files=count_audio_cache_files(form_data.audio_cache_max_age_days) ) @@ -877,7 +989,7 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)): if form_data.delete_orphaned_knowledge_bases: for kb in knowledge_bases: if kb.user_id not in active_user_ids: - if safe_delete_vector_collection(kb.id): + if vector_cleaner.delete_collection(kb.id): Knowledges.delete_knowledge_by_id(kb.id) deleted_kbs += 1 @@ -984,7 +1096,9 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)): final_active_kb_ids = {kb.id for kb in Knowledges.get_knowledge_bases()} cleanup_orphaned_uploads(final_active_file_ids) - cleanup_orphaned_vector_collections(final_active_file_ids, final_active_kb_ids) + + # Use modular vector database cleanup + vector_cleaner.cleanup_orphaned_collections(final_active_file_ids, final_active_kb_ids) # Stage 5: Audio cache cleanup log.info("Cleaning audio cache") @@ -999,15 +1113,14 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)): except Exception as e: log.error(f"Failed to vacuum main database: {e}") - if "chroma" in VECTOR_DB.lower(): - chroma_db_path = Path(CACHE_DIR).parent / "vector_db" / "chroma.sqlite3" - if chroma_db_path.exists(): - try: - - with sqlite3.connect(str(chroma_db_path)) as conn: - conn.execute("VACUUM") - except Exception as e: - log.error(f"Failed to vacuum ChromaDB database: {e}") + # Vector database-specific optimization + if isinstance(vector_cleaner, ChromaDatabaseCleaner): + try: + with sqlite3.connect(str(vector_cleaner.chroma_db_path)) as conn: + conn.execute("VACUUM") + log.info("Vacuumed ChromaDB database") + except Exception as e: + log.error(f"Failed to vacuum ChromaDB database: {e}") log.info("Data pruning completed successfully") return True