mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-12 04:15:25 +00:00
Update prune.py
This commit is contained in:
parent
8231588eb4
commit
8156d0a30e
1 changed files with 355 additions and 243 deletions
|
|
@ -100,7 +100,9 @@ class VectorDatabaseCleaner(ABC):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def count_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Count how many orphaned vector collections would be deleted.
|
Count how many orphaned vector collections would be deleted.
|
||||||
|
|
||||||
|
|
@ -114,7 +116,9 @@ class VectorDatabaseCleaner(ABC):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def cleanup_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Actually delete orphaned vector collections.
|
Actually delete orphaned vector collections.
|
||||||
|
|
||||||
|
|
@ -156,12 +160,16 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
self.vector_dir = Path(CACHE_DIR).parent / "vector_db"
|
self.vector_dir = Path(CACHE_DIR).parent / "vector_db"
|
||||||
self.chroma_db_path = self.vector_dir / "chroma.sqlite3"
|
self.chroma_db_path = self.vector_dir / "chroma.sqlite3"
|
||||||
|
|
||||||
def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def count_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""Count orphaned ChromaDB collections for preview."""
|
"""Count orphaned ChromaDB collections for preview."""
|
||||||
if not self.chroma_db_path.exists():
|
if not self.chroma_db_path.exists():
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
expected_collections = self._build_expected_collections(active_file_ids, active_kb_ids)
|
expected_collections = self._build_expected_collections(
|
||||||
|
active_file_ids, active_kb_ids
|
||||||
|
)
|
||||||
uuid_to_collection = self._get_collection_mappings()
|
uuid_to_collection = self._get_collection_mappings()
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
|
|
@ -173,19 +181,26 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
dir_uuid = collection_dir.name
|
dir_uuid = collection_dir.name
|
||||||
collection_name = uuid_to_collection.get(dir_uuid)
|
collection_name = uuid_to_collection.get(dir_uuid)
|
||||||
|
|
||||||
if collection_name is None or collection_name not in expected_collections:
|
if (
|
||||||
|
collection_name is None
|
||||||
|
or collection_name not in expected_collections
|
||||||
|
):
|
||||||
count += 1
|
count += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(f"Error counting orphaned ChromaDB collections: {e}")
|
log.debug(f"Error counting orphaned ChromaDB collections: {e}")
|
||||||
|
|
||||||
return count
|
return count
|
||||||
|
|
||||||
def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def cleanup_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""Actually delete orphaned ChromaDB collections and database records."""
|
"""Actually delete orphaned ChromaDB collections and database records."""
|
||||||
if not self.chroma_db_path.exists():
|
if not self.chroma_db_path.exists():
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
expected_collections = self._build_expected_collections(active_file_ids, active_kb_ids)
|
expected_collections = self._build_expected_collections(
|
||||||
|
active_file_ids, active_kb_ids
|
||||||
|
)
|
||||||
uuid_to_collection = self._get_collection_mappings()
|
uuid_to_collection = self._get_collection_mappings()
|
||||||
|
|
||||||
deleted_count = 0
|
deleted_count = 0
|
||||||
|
|
@ -212,15 +227,21 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
deleted_count += 1
|
deleted_count += 1
|
||||||
log.debug(f"Deleted orphaned ChromaDB directory: {dir_uuid}")
|
log.debug(f"Deleted orphaned ChromaDB directory: {dir_uuid}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Failed to delete orphaned directory {dir_uuid}: {e}")
|
log.error(
|
||||||
|
f"Failed to delete orphaned directory {dir_uuid}: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
elif collection_name not in expected_collections:
|
elif collection_name not in expected_collections:
|
||||||
try:
|
try:
|
||||||
shutil.rmtree(collection_dir)
|
shutil.rmtree(collection_dir)
|
||||||
deleted_count += 1
|
deleted_count += 1
|
||||||
log.debug(f"Deleted orphaned ChromaDB collection: {collection_name}")
|
log.debug(
|
||||||
|
f"Deleted orphaned ChromaDB collection: {collection_name}"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Failed to delete collection directory {dir_uuid}: {e}")
|
log.error(
|
||||||
|
f"Failed to delete collection directory {dir_uuid}: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Error cleaning ChromaDB collections: {e}")
|
log.error(f"Error cleaning ChromaDB collections: {e}")
|
||||||
|
|
@ -238,7 +259,9 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
VECTOR_DB_CLIENT.delete_collection(collection_name=collection_name)
|
VECTOR_DB_CLIENT.delete_collection(collection_name=collection_name)
|
||||||
log.debug(f"Deleted ChromaDB collection via client: {collection_name}")
|
log.debug(f"Deleted ChromaDB collection via client: {collection_name}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(f"Collection {collection_name} may not exist in ChromaDB: {e}")
|
log.debug(
|
||||||
|
f"Collection {collection_name} may not exist in ChromaDB: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
# Also clean up physical directory if it exists
|
# Also clean up physical directory if it exists
|
||||||
# Note: ChromaDB uses UUID directories, so we'd need to map collection name to UUID
|
# Note: ChromaDB uses UUID directories, so we'd need to map collection name to UUID
|
||||||
|
|
@ -249,7 +272,9 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.error(f"Error deleting ChromaDB collection {collection_name}: {e}")
|
log.error(f"Error deleting ChromaDB collection {collection_name}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _build_expected_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> Set[str]:
|
def _build_expected_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> Set[str]:
|
||||||
"""Build set of collection names that should exist."""
|
"""Build set of collection names that should exist."""
|
||||||
expected_collections = set()
|
expected_collections = set()
|
||||||
|
|
||||||
|
|
@ -276,7 +301,9 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
collection_id_to_name[collection_id] = collection_name
|
collection_id_to_name[collection_id] = collection_name
|
||||||
|
|
||||||
# Then, get segment ID to collection mapping (segments are the directory UUIDs)
|
# Then, get segment ID to collection mapping (segments are the directory UUIDs)
|
||||||
cursor = conn.execute("SELECT id, collection FROM segments WHERE scope = 'VECTOR'")
|
cursor = conn.execute(
|
||||||
|
"SELECT id, collection FROM segments WHERE scope = 'VECTOR'"
|
||||||
|
)
|
||||||
for segment_id, collection_id in cursor.fetchall():
|
for segment_id, collection_id in cursor.fetchall():
|
||||||
if collection_id in collection_id_to_name:
|
if collection_id in collection_id_to_name:
|
||||||
collection_name = collection_id_to_name[collection_id]
|
collection_name = collection_id_to_name[collection_id]
|
||||||
|
|
@ -305,34 +332,42 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
try:
|
try:
|
||||||
with sqlite3.connect(str(self.chroma_db_path)) as conn:
|
with sqlite3.connect(str(self.chroma_db_path)) as conn:
|
||||||
# Count orphaned records before cleanup
|
# Count orphaned records before cleanup
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
SELECT COUNT(*) FROM embeddings
|
SELECT COUNT(*) FROM embeddings
|
||||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
orphaned_embeddings = cursor.fetchone()[0]
|
orphaned_embeddings = cursor.fetchone()[0]
|
||||||
|
|
||||||
if orphaned_embeddings == 0:
|
if orphaned_embeddings == 0:
|
||||||
log.debug("No orphaned ChromaDB embeddings found")
|
log.debug("No orphaned ChromaDB embeddings found")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
log.info(f"Cleaning up {orphaned_embeddings} orphaned ChromaDB embeddings and related data")
|
log.info(
|
||||||
|
f"Cleaning up {orphaned_embeddings} orphaned ChromaDB embeddings and related data"
|
||||||
|
)
|
||||||
|
|
||||||
# Delete orphaned embedding_metadata first (child records)
|
# Delete orphaned embedding_metadata first (child records)
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
DELETE FROM embedding_metadata
|
DELETE FROM embedding_metadata
|
||||||
WHERE id IN (
|
WHERE id IN (
|
||||||
SELECT id FROM embeddings
|
SELECT id FROM embeddings
|
||||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||||
)
|
)
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
metadata_deleted = cursor.rowcount
|
metadata_deleted = cursor.rowcount
|
||||||
cleaned_records += metadata_deleted
|
cleaned_records += metadata_deleted
|
||||||
|
|
||||||
# Delete orphaned embeddings
|
# Delete orphaned embeddings
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
DELETE FROM embeddings
|
DELETE FROM embeddings
|
||||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
embeddings_deleted = cursor.rowcount
|
embeddings_deleted = cursor.rowcount
|
||||||
cleaned_records += embeddings_deleted
|
cleaned_records += embeddings_deleted
|
||||||
|
|
||||||
|
|
@ -341,37 +376,47 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.info(f"FTS cleanup: preserved {fts_cleaned} valid text entries")
|
log.info(f"FTS cleanup: preserved {fts_cleaned} valid text entries")
|
||||||
|
|
||||||
# Clean up orphaned collection and segment metadata
|
# Clean up orphaned collection and segment metadata
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
DELETE FROM collection_metadata
|
DELETE FROM collection_metadata
|
||||||
WHERE collection_id NOT IN (SELECT id FROM collections)
|
WHERE collection_id NOT IN (SELECT id FROM collections)
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
collection_meta_deleted = cursor.rowcount
|
collection_meta_deleted = cursor.rowcount
|
||||||
cleaned_records += collection_meta_deleted
|
cleaned_records += collection_meta_deleted
|
||||||
|
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
DELETE FROM segment_metadata
|
DELETE FROM segment_metadata
|
||||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
segment_meta_deleted = cursor.rowcount
|
segment_meta_deleted = cursor.rowcount
|
||||||
cleaned_records += segment_meta_deleted
|
cleaned_records += segment_meta_deleted
|
||||||
|
|
||||||
# Clean up orphaned max_seq_id records
|
# Clean up orphaned max_seq_id records
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
DELETE FROM max_seq_id
|
DELETE FROM max_seq_id
|
||||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
seq_id_deleted = cursor.rowcount
|
seq_id_deleted = cursor.rowcount
|
||||||
cleaned_records += seq_id_deleted
|
cleaned_records += seq_id_deleted
|
||||||
|
|
||||||
# Force FTS index rebuild - this is crucial for VACUUM to work properly
|
# Force FTS index rebuild - this is crucial for VACUUM to work properly
|
||||||
conn.execute("INSERT INTO embedding_fulltext_search(embedding_fulltext_search) VALUES('rebuild')")
|
conn.execute(
|
||||||
|
"INSERT INTO embedding_fulltext_search(embedding_fulltext_search) VALUES('rebuild')"
|
||||||
|
)
|
||||||
|
|
||||||
# Commit changes
|
# Commit changes
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
log.info(f"ChromaDB cleanup: {embeddings_deleted} embeddings, {metadata_deleted} metadata, "
|
log.info(
|
||||||
f"{collection_meta_deleted} collection metadata, {segment_meta_deleted} segment metadata, "
|
f"ChromaDB cleanup: {embeddings_deleted} embeddings, {metadata_deleted} metadata, "
|
||||||
f"{seq_id_deleted} sequence IDs")
|
f"{collection_meta_deleted} collection metadata, {segment_meta_deleted} segment metadata, "
|
||||||
|
f"{seq_id_deleted} sequence IDs"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Error cleaning orphaned ChromaDB database records: {e}")
|
log.error(f"Error cleaning orphaned ChromaDB database records: {e}")
|
||||||
|
|
@ -394,7 +439,8 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Step 1: Create temporary table with valid content
|
# Step 1: Create temporary table with valid content
|
||||||
conn.execute("""
|
conn.execute(
|
||||||
|
"""
|
||||||
CREATE TEMPORARY TABLE temp_valid_fts AS
|
CREATE TEMPORARY TABLE temp_valid_fts AS
|
||||||
SELECT DISTINCT em.string_value
|
SELECT DISTINCT em.string_value
|
||||||
FROM embedding_metadata em
|
FROM embedding_metadata em
|
||||||
|
|
@ -402,7 +448,8 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
JOIN segments s ON e.segment_id = s.id
|
JOIN segments s ON e.segment_id = s.id
|
||||||
WHERE em.string_value IS NOT NULL
|
WHERE em.string_value IS NOT NULL
|
||||||
AND em.string_value != ''
|
AND em.string_value != ''
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
# Step 2: Validate temp table creation and count records
|
# Step 2: Validate temp table creation and count records
|
||||||
cursor = conn.execute("SELECT COUNT(*) FROM temp_valid_fts")
|
cursor = conn.execute("SELECT COUNT(*) FROM temp_valid_fts")
|
||||||
|
|
@ -417,7 +464,9 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
|
|
||||||
# Step 4: Only proceed if validation passed
|
# Step 4: Only proceed if validation passed
|
||||||
if not temp_table_ok:
|
if not temp_table_ok:
|
||||||
log.warning("FTS temp table validation failed, skipping FTS cleanup for safety")
|
log.warning(
|
||||||
|
"FTS temp table validation failed, skipping FTS cleanup for safety"
|
||||||
|
)
|
||||||
conn.execute("DROP TABLE IF EXISTS temp_valid_fts")
|
conn.execute("DROP TABLE IF EXISTS temp_valid_fts")
|
||||||
return -1 # Signal FTS cleanup was skipped
|
return -1 # Signal FTS cleanup was skipped
|
||||||
|
|
||||||
|
|
@ -428,16 +477,20 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
|
|
||||||
# Re-insert only valid content if any exists
|
# Re-insert only valid content if any exists
|
||||||
if valid_count > 0:
|
if valid_count > 0:
|
||||||
conn.execute("""
|
conn.execute(
|
||||||
|
"""
|
||||||
INSERT INTO embedding_fulltext_search(string_value)
|
INSERT INTO embedding_fulltext_search(string_value)
|
||||||
SELECT string_value FROM temp_valid_fts
|
SELECT string_value FROM temp_valid_fts
|
||||||
""")
|
"""
|
||||||
|
)
|
||||||
log.debug(f"Preserved {valid_count} valid FTS entries")
|
log.debug(f"Preserved {valid_count} valid FTS entries")
|
||||||
else:
|
else:
|
||||||
log.debug("No valid FTS content found, cleared all entries")
|
log.debug("No valid FTS content found, cleared all entries")
|
||||||
|
|
||||||
# Rebuild FTS index
|
# Rebuild FTS index
|
||||||
conn.execute("INSERT INTO embedding_fulltext_search(embedding_fulltext_search) VALUES('rebuild')")
|
conn.execute(
|
||||||
|
"INSERT INTO embedding_fulltext_search(embedding_fulltext_search) VALUES('rebuild')"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"FTS cleanup failed: {e}")
|
log.error(f"FTS cleanup failed: {e}")
|
||||||
|
|
@ -474,7 +527,7 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
if VECTOR_DB_CLIENT is None:
|
if VECTOR_DB_CLIENT is None:
|
||||||
raise Exception("VECTOR_DB_CLIENT is not available")
|
raise Exception("VECTOR_DB_CLIENT is not available")
|
||||||
# Test if we can access the session
|
# Test if we can access the session
|
||||||
if hasattr(VECTOR_DB_CLIENT, 'session') and VECTOR_DB_CLIENT.session:
|
if hasattr(VECTOR_DB_CLIENT, "session") and VECTOR_DB_CLIENT.session:
|
||||||
self.session = VECTOR_DB_CLIENT.session
|
self.session = VECTOR_DB_CLIENT.session
|
||||||
log.debug("PGVector cleaner initialized successfully")
|
log.debug("PGVector cleaner initialized successfully")
|
||||||
else:
|
else:
|
||||||
|
|
@ -483,14 +536,20 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.error(f"Failed to initialize PGVector client for cleanup: {e}")
|
log.error(f"Failed to initialize PGVector client for cleanup: {e}")
|
||||||
self.session = None
|
self.session = None
|
||||||
|
|
||||||
def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def count_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""Count orphaned PGVector collections for preview."""
|
"""Count orphaned PGVector collections for preview."""
|
||||||
if not self.session:
|
if not self.session:
|
||||||
log.warning("PGVector session not available for counting orphaned collections")
|
log.warning(
|
||||||
|
"PGVector session not available for counting orphaned collections"
|
||||||
|
)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
orphaned_collections = self._get_orphaned_collections(active_file_ids, active_kb_ids)
|
orphaned_collections = self._get_orphaned_collections(
|
||||||
|
active_file_ids, active_kb_ids
|
||||||
|
)
|
||||||
self.session.rollback() # Read-only transaction
|
self.session.rollback() # Read-only transaction
|
||||||
return len(orphaned_collections)
|
return len(orphaned_collections)
|
||||||
|
|
||||||
|
|
@ -500,7 +559,9 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.error(f"Error counting orphaned PGVector collections: {e}")
|
log.error(f"Error counting orphaned PGVector collections: {e}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def cleanup_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Delete orphaned PGVector collections using the existing client's delete method.
|
Delete orphaned PGVector collections using the existing client's delete method.
|
||||||
|
|
||||||
|
|
@ -512,14 +573,18 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
orphaned_collections = self._get_orphaned_collections(active_file_ids, active_kb_ids)
|
orphaned_collections = self._get_orphaned_collections(
|
||||||
|
active_file_ids, active_kb_ids
|
||||||
|
)
|
||||||
|
|
||||||
if not orphaned_collections:
|
if not orphaned_collections:
|
||||||
log.debug("No orphaned PGVector collections found")
|
log.debug("No orphaned PGVector collections found")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
deleted_count = 0
|
deleted_count = 0
|
||||||
log.info(f"Deleting {len(orphaned_collections)} orphaned PGVector collections")
|
log.info(
|
||||||
|
f"Deleting {len(orphaned_collections)} orphaned PGVector collections"
|
||||||
|
)
|
||||||
|
|
||||||
# SIMPLIFIED DELETION: Use existing PGVector client delete method
|
# SIMPLIFIED DELETION: Use existing PGVector client delete method
|
||||||
for collection_name in orphaned_collections:
|
for collection_name in orphaned_collections:
|
||||||
|
|
@ -531,7 +596,9 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.debug(f"Deleted PGVector collection: {collection_name}")
|
log.debug(f"Deleted PGVector collection: {collection_name}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Failed to delete PGVector collection '{collection_name}': {e}")
|
log.error(
|
||||||
|
f"Failed to delete PGVector collection '{collection_name}': {e}"
|
||||||
|
)
|
||||||
# Continue with other collections even if one fails
|
# Continue with other collections even if one fails
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
@ -545,7 +612,9 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.warning(f"Failed to VACUUM PGVector table: {e}")
|
log.warning(f"Failed to VACUUM PGVector table: {e}")
|
||||||
|
|
||||||
if deleted_count > 0:
|
if deleted_count > 0:
|
||||||
log.info(f"Successfully deleted {deleted_count} orphaned PGVector collections")
|
log.info(
|
||||||
|
f"Successfully deleted {deleted_count} orphaned PGVector collections"
|
||||||
|
)
|
||||||
|
|
||||||
return deleted_count
|
return deleted_count
|
||||||
|
|
||||||
|
|
@ -571,14 +640,18 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.error(f"Error deleting PGVector collection '{collection_name}': {e}")
|
log.error(f"Error deleting PGVector collection '{collection_name}': {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> Set[str]:
|
def _get_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> Set[str]:
|
||||||
"""
|
"""
|
||||||
Find collections that exist in PGVector but are no longer referenced.
|
Find collections that exist in PGVector but are no longer referenced.
|
||||||
|
|
||||||
This is the only "complex" part - discovery. The actual deletion is simple!
|
This is the only "complex" part - discovery. The actual deletion is simple!
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
expected_collections = self._build_expected_collections(active_file_ids, active_kb_ids)
|
expected_collections = self._build_expected_collections(
|
||||||
|
active_file_ids, active_kb_ids
|
||||||
|
)
|
||||||
|
|
||||||
# Query distinct collection names from document_chunk table
|
# Query distinct collection names from document_chunk table
|
||||||
result = self.session.execute(
|
result = self.session.execute(
|
||||||
|
|
@ -588,9 +661,11 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
existing_collections = {row[0] for row in result}
|
existing_collections = {row[0] for row in result}
|
||||||
orphaned_collections = existing_collections - expected_collections
|
orphaned_collections = existing_collections - expected_collections
|
||||||
|
|
||||||
log.debug(f"Found {len(existing_collections)} existing collections, "
|
log.debug(
|
||||||
f"{len(expected_collections)} expected, "
|
f"Found {len(existing_collections)} existing collections, "
|
||||||
f"{len(orphaned_collections)} orphaned")
|
f"{len(expected_collections)} expected, "
|
||||||
|
f"{len(orphaned_collections)} orphaned"
|
||||||
|
)
|
||||||
|
|
||||||
return orphaned_collections
|
return orphaned_collections
|
||||||
|
|
||||||
|
|
@ -598,7 +673,9 @@ class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
log.error(f"Error finding orphaned PGVector collections: {e}")
|
log.error(f"Error finding orphaned PGVector collections: {e}")
|
||||||
return set()
|
return set()
|
||||||
|
|
||||||
def _build_expected_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> Set[str]:
|
def _build_expected_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> Set[str]:
|
||||||
"""Build set of collection names that should exist."""
|
"""Build set of collection names that should exist."""
|
||||||
expected_collections = set()
|
expected_collections = set()
|
||||||
|
|
||||||
|
|
@ -621,11 +698,15 @@ class NoOpVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||||
vector database is not supported by the cleanup system.
|
vector database is not supported by the cleanup system.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def count_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def count_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""No orphaned collections to count for unsupported databases."""
|
"""No orphaned collections to count for unsupported databases."""
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
def cleanup_orphaned_collections(
|
||||||
|
self, active_file_ids: Set[str], active_kb_ids: Set[str]
|
||||||
|
) -> int:
|
||||||
"""No collections to cleanup for unsupported databases."""
|
"""No collections to cleanup for unsupported databases."""
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
@ -654,7 +735,9 @@ def get_vector_database_cleaner() -> VectorDatabaseCleaner:
|
||||||
log.debug("Using PGVector cleaner")
|
log.debug("Using PGVector cleaner")
|
||||||
return PGVectorDatabaseCleaner()
|
return PGVectorDatabaseCleaner()
|
||||||
else:
|
else:
|
||||||
log.debug(f"No specific cleaner for vector database type: {VECTOR_DB}, using no-op cleaner")
|
log.debug(
|
||||||
|
f"No specific cleaner for vector database type: {VECTOR_DB}, using no-op cleaner"
|
||||||
|
)
|
||||||
return NoOpVectorDatabaseCleaner()
|
return NoOpVectorDatabaseCleaner()
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -695,7 +778,9 @@ class PrunePreviewResult(BaseModel):
|
||||||
|
|
||||||
|
|
||||||
# Counting helper functions for dry-run preview
|
# Counting helper functions for dry-run preview
|
||||||
def count_inactive_users(inactive_days: Optional[int], exempt_admin: bool, exempt_pending: bool) -> int:
|
def count_inactive_users(
|
||||||
|
inactive_days: Optional[int], exempt_admin: bool, exempt_pending: bool
|
||||||
|
) -> int:
|
||||||
"""Count users that would be deleted for inactivity."""
|
"""Count users that would be deleted for inactivity."""
|
||||||
if inactive_days is None:
|
if inactive_days is None:
|
||||||
return 0
|
return 0
|
||||||
|
|
@ -718,7 +803,9 @@ def count_inactive_users(inactive_days: Optional[int], exempt_admin: bool, exemp
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
|
||||||
def count_old_chats(days: Optional[int], exempt_archived: bool, exempt_in_folders: bool) -> int:
|
def count_old_chats(
|
||||||
|
days: Optional[int], exempt_archived: bool, exempt_in_folders: bool
|
||||||
|
) -> int:
|
||||||
"""Count chats that would be deleted by age."""
|
"""Count chats that would be deleted by age."""
|
||||||
if days is None:
|
if days is None:
|
||||||
return 0
|
return 0
|
||||||
|
|
@ -754,7 +841,7 @@ def count_orphaned_records(form_data: PruneDataForm) -> dict:
|
||||||
"knowledge_bases": 0,
|
"knowledge_bases": 0,
|
||||||
"models": 0,
|
"models": 0,
|
||||||
"notes": 0,
|
"notes": 0,
|
||||||
"folders": 0
|
"folders": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -929,7 +1016,9 @@ def get_active_file_ids() -> Set[str]:
|
||||||
try:
|
try:
|
||||||
chat_json_str = json.dumps(chat.chat)
|
chat_json_str = json.dumps(chat.chat)
|
||||||
# Use utility to extract and validate file IDs
|
# Use utility to extract and validate file IDs
|
||||||
validated_ids = JSONFileIDExtractor.extract_and_validate_file_ids(chat_json_str)
|
validated_ids = JSONFileIDExtractor.extract_and_validate_file_ids(
|
||||||
|
chat_json_str
|
||||||
|
)
|
||||||
active_file_ids.update(validated_ids)
|
active_file_ids.update(validated_ids)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -944,7 +1033,9 @@ def get_active_file_ids() -> Set[str]:
|
||||||
try:
|
try:
|
||||||
items_str = json.dumps(folder.items)
|
items_str = json.dumps(folder.items)
|
||||||
# Use utility to extract and validate file IDs
|
# Use utility to extract and validate file IDs
|
||||||
validated_ids = JSONFileIDExtractor.extract_and_validate_file_ids(items_str)
|
validated_ids = (
|
||||||
|
JSONFileIDExtractor.extract_and_validate_file_ids(items_str)
|
||||||
|
)
|
||||||
active_file_ids.update(validated_ids)
|
active_file_ids.update(validated_ids)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(f"Error processing folder {folder.id} items: {e}")
|
log.debug(f"Error processing folder {folder.id} items: {e}")
|
||||||
|
|
@ -953,7 +1044,9 @@ def get_active_file_ids() -> Set[str]:
|
||||||
try:
|
try:
|
||||||
data_str = json.dumps(folder.data)
|
data_str = json.dumps(folder.data)
|
||||||
# Use utility to extract and validate file IDs
|
# Use utility to extract and validate file IDs
|
||||||
validated_ids = JSONFileIDExtractor.extract_and_validate_file_ids(data_str)
|
validated_ids = (
|
||||||
|
JSONFileIDExtractor.extract_and_validate_file_ids(data_str)
|
||||||
|
)
|
||||||
active_file_ids.update(validated_ids)
|
active_file_ids.update(validated_ids)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(f"Error processing folder {folder.id} data: {e}")
|
log.debug(f"Error processing folder {folder.id} data: {e}")
|
||||||
|
|
@ -977,7 +1070,11 @@ def get_active_file_ids() -> Set[str]:
|
||||||
else str(message_data_json)
|
else str(message_data_json)
|
||||||
)
|
)
|
||||||
# Use utility to extract and validate file IDs
|
# Use utility to extract and validate file IDs
|
||||||
validated_ids = JSONFileIDExtractor.extract_and_validate_file_ids(data_str)
|
validated_ids = (
|
||||||
|
JSONFileIDExtractor.extract_and_validate_file_ids(
|
||||||
|
data_str
|
||||||
|
)
|
||||||
|
)
|
||||||
active_file_ids.update(validated_ids)
|
active_file_ids.update(validated_ids)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(
|
log.debug(
|
||||||
|
|
@ -1064,9 +1161,7 @@ def cleanup_orphaned_uploads(active_file_ids: Set[str]) -> None:
|
||||||
|
|
||||||
|
|
||||||
def delete_inactive_users(
|
def delete_inactive_users(
|
||||||
inactive_days: int,
|
inactive_days: int, exempt_admin: bool = True, exempt_pending: bool = True
|
||||||
exempt_admin: bool = True,
|
|
||||||
exempt_pending: bool = True
|
|
||||||
) -> int:
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Delete users who have been inactive for the specified number of days.
|
Delete users who have been inactive for the specified number of days.
|
||||||
|
|
@ -1102,7 +1197,9 @@ def delete_inactive_users(
|
||||||
# Delete the user - this will cascade to all their data
|
# Delete the user - this will cascade to all their data
|
||||||
Users.delete_user_by_id(user.id)
|
Users.delete_user_by_id(user.id)
|
||||||
deleted_count += 1
|
deleted_count += 1
|
||||||
log.info(f"Deleted inactive user: {user.email} (last active: {user.last_active_at})")
|
log.info(
|
||||||
|
f"Deleted inactive user: {user.email} (last active: {user.last_active_at})"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Failed to delete user {user.id}: {e}")
|
log.error(f"Failed to delete user {user.id}: {e}")
|
||||||
|
|
||||||
|
|
@ -1176,7 +1273,11 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
||||||
# Get counts for all enabled operations
|
# Get counts for all enabled operations
|
||||||
active_file_ids = get_active_file_ids()
|
active_file_ids = get_active_file_ids()
|
||||||
active_user_ids = {user.id for user in Users.get_users()["users"]}
|
active_user_ids = {user.id for user in Users.get_users()["users"]}
|
||||||
active_kb_ids = {kb.id for kb in Knowledges.get_knowledge_bases() if kb.user_id in active_user_ids}
|
active_kb_ids = {
|
||||||
|
kb.id
|
||||||
|
for kb in Knowledges.get_knowledge_bases()
|
||||||
|
if kb.user_id in active_user_ids
|
||||||
|
}
|
||||||
|
|
||||||
orphaned_counts = count_orphaned_records(form_data)
|
orphaned_counts = count_orphaned_records(form_data)
|
||||||
|
|
||||||
|
|
@ -1184,12 +1285,12 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
||||||
inactive_users=count_inactive_users(
|
inactive_users=count_inactive_users(
|
||||||
form_data.delete_inactive_users_days,
|
form_data.delete_inactive_users_days,
|
||||||
form_data.exempt_admin_users,
|
form_data.exempt_admin_users,
|
||||||
form_data.exempt_pending_users
|
form_data.exempt_pending_users,
|
||||||
),
|
),
|
||||||
old_chats=count_old_chats(
|
old_chats=count_old_chats(
|
||||||
form_data.days,
|
form_data.days,
|
||||||
form_data.exempt_archived_chats,
|
form_data.exempt_archived_chats,
|
||||||
form_data.exempt_chats_in_folders
|
form_data.exempt_chats_in_folders,
|
||||||
),
|
),
|
||||||
orphaned_chats=orphaned_counts["chats"],
|
orphaned_chats=orphaned_counts["chats"],
|
||||||
orphaned_files=orphaned_counts["files"],
|
orphaned_files=orphaned_counts["files"],
|
||||||
|
|
@ -1201,8 +1302,12 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
||||||
orphaned_notes=orphaned_counts["notes"],
|
orphaned_notes=orphaned_counts["notes"],
|
||||||
orphaned_folders=orphaned_counts["folders"],
|
orphaned_folders=orphaned_counts["folders"],
|
||||||
orphaned_uploads=count_orphaned_uploads(active_file_ids),
|
orphaned_uploads=count_orphaned_uploads(active_file_ids),
|
||||||
orphaned_vector_collections=vector_cleaner.count_orphaned_collections(active_file_ids, active_kb_ids),
|
orphaned_vector_collections=vector_cleaner.count_orphaned_collections(
|
||||||
audio_cache_files=count_audio_cache_files(form_data.audio_cache_max_age_days)
|
active_file_ids, active_kb_ids
|
||||||
|
),
|
||||||
|
audio_cache_files=count_audio_cache_files(
|
||||||
|
form_data.audio_cache_max_age_days
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info("Data pruning preview completed")
|
log.info("Data pruning preview completed")
|
||||||
|
|
@ -1214,11 +1319,13 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
||||||
# Stage 0: Delete inactive users (if enabled)
|
# Stage 0: Delete inactive users (if enabled)
|
||||||
deleted_users = 0
|
deleted_users = 0
|
||||||
if form_data.delete_inactive_users_days is not None:
|
if form_data.delete_inactive_users_days is not None:
|
||||||
log.info(f"Deleting users inactive for more than {form_data.delete_inactive_users_days} days")
|
log.info(
|
||||||
|
f"Deleting users inactive for more than {form_data.delete_inactive_users_days} days"
|
||||||
|
)
|
||||||
deleted_users = delete_inactive_users(
|
deleted_users = delete_inactive_users(
|
||||||
form_data.delete_inactive_users_days,
|
form_data.delete_inactive_users_days,
|
||||||
form_data.exempt_admin_users,
|
form_data.exempt_admin_users,
|
||||||
form_data.exempt_pending_users
|
form_data.exempt_pending_users,
|
||||||
)
|
)
|
||||||
if deleted_users > 0:
|
if deleted_users > 0:
|
||||||
log.info(f"Deleted {deleted_users} inactive users")
|
log.info(f"Deleted {deleted_users} inactive users")
|
||||||
|
|
@ -1401,7 +1508,9 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
||||||
cleanup_orphaned_uploads(final_active_file_ids)
|
cleanup_orphaned_uploads(final_active_file_ids)
|
||||||
|
|
||||||
# Use modular vector database cleanup
|
# Use modular vector database cleanup
|
||||||
vector_cleaner.cleanup_orphaned_collections(final_active_file_ids, final_active_kb_ids)
|
vector_cleaner.cleanup_orphaned_collections(
|
||||||
|
final_active_file_ids, final_active_kb_ids
|
||||||
|
)
|
||||||
|
|
||||||
# Stage 5: Audio cache cleanup
|
# Stage 5: Audio cache cleanup
|
||||||
log.info("Cleaning audio cache")
|
log.info("Cleaning audio cache")
|
||||||
|
|
@ -1424,7 +1533,10 @@ async def prune_data(form_data: PruneDataForm, user=Depends(get_admin_user)):
|
||||||
log.info("Vacuumed ChromaDB database")
|
log.info("Vacuumed ChromaDB database")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Failed to vacuum ChromaDB database: {e}")
|
log.error(f"Failed to vacuum ChromaDB database: {e}")
|
||||||
elif isinstance(vector_cleaner, PGVectorDatabaseCleaner) and vector_cleaner.session:
|
elif (
|
||||||
|
isinstance(vector_cleaner, PGVectorDatabaseCleaner)
|
||||||
|
and vector_cleaner.session
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
vector_cleaner.session.execute(text("VACUUM ANALYZE"))
|
vector_cleaner.session.execute(text("VACUUM ANALYZE"))
|
||||||
vector_cleaner.session.commit()
|
vector_cleaner.session.commit()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue