mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-12 20:35:19 +00:00
Update prune.py
This commit is contained in:
parent
bfa2eb631d
commit
155f53b867
1 changed files with 183 additions and 1 deletions
|
|
@ -181,7 +181,7 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
|||
return count
|
||||
|
||||
def cleanup_orphaned_collections(self, active_file_ids: Set[str], active_kb_ids: Set[str]) -> int:
|
||||
"""Actually delete orphaned ChromaDB collections."""
|
||||
"""Actually delete orphaned ChromaDB collections and database records."""
|
||||
if not self.chroma_db_path.exists():
|
||||
return 0
|
||||
|
||||
|
|
@ -190,6 +190,13 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
|||
|
||||
deleted_count = 0
|
||||
|
||||
# First, clean up orphaned database records
|
||||
try:
|
||||
deleted_count += self._cleanup_orphaned_database_records()
|
||||
except Exception as e:
|
||||
log.error(f"Error cleaning orphaned database records: {e}")
|
||||
|
||||
# Then clean up physical directories
|
||||
try:
|
||||
for collection_dir in self.vector_dir.iterdir():
|
||||
if not collection_dir.is_dir() or collection_dir.name.startswith("."):
|
||||
|
|
@ -281,6 +288,181 @@ class ChromaDatabaseCleaner(VectorDatabaseCleaner):
|
|||
log.error(f"Error reading ChromaDB metadata: {e}")
|
||||
|
||||
return uuid_to_collection
|
||||
|
||||
def _cleanup_orphaned_database_records(self) -> int:
|
||||
"""
|
||||
Clean up orphaned database records that ChromaDB's delete_collection() method leaves behind.
|
||||
|
||||
This is the key fix for the file size issue - ChromaDB doesn't properly cascade
|
||||
deletions, leaving orphaned embeddings, metadata, and FTS data that prevent
|
||||
VACUUM from reclaiming space.
|
||||
|
||||
Returns:
|
||||
Number of orphaned records cleaned up
|
||||
"""
|
||||
cleaned_records = 0
|
||||
|
||||
try:
|
||||
with sqlite3.connect(str(self.chroma_db_path)) as conn:
|
||||
# Count orphaned records before cleanup
|
||||
cursor = conn.execute("""
|
||||
SELECT COUNT(*) FROM embeddings
|
||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||
""")
|
||||
orphaned_embeddings = cursor.fetchone()[0]
|
||||
|
||||
if orphaned_embeddings == 0:
|
||||
log.debug("No orphaned ChromaDB embeddings found")
|
||||
return 0
|
||||
|
||||
log.info(f"Cleaning up {orphaned_embeddings} orphaned ChromaDB embeddings and related data")
|
||||
|
||||
# Delete orphaned embedding_metadata first (child records)
|
||||
cursor = conn.execute("""
|
||||
DELETE FROM embedding_metadata
|
||||
WHERE id IN (
|
||||
SELECT id FROM embeddings
|
||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||
)
|
||||
""")
|
||||
metadata_deleted = cursor.rowcount
|
||||
cleaned_records += metadata_deleted
|
||||
|
||||
# Delete orphaned embeddings
|
||||
cursor = conn.execute("""
|
||||
DELETE FROM embeddings
|
||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||
""")
|
||||
embeddings_deleted = cursor.rowcount
|
||||
cleaned_records += embeddings_deleted
|
||||
|
||||
# Selectively clean FTS while preserving active content
|
||||
fts_cleaned = self._cleanup_fts_selectively(conn)
|
||||
log.info(f"FTS cleanup: preserved {fts_cleaned} valid text entries")
|
||||
|
||||
# Clean up orphaned collection and segment metadata
|
||||
cursor = conn.execute("""
|
||||
DELETE FROM collection_metadata
|
||||
WHERE collection_id NOT IN (SELECT id FROM collections)
|
||||
""")
|
||||
collection_meta_deleted = cursor.rowcount
|
||||
cleaned_records += collection_meta_deleted
|
||||
|
||||
cursor = conn.execute("""
|
||||
DELETE FROM segment_metadata
|
||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||
""")
|
||||
segment_meta_deleted = cursor.rowcount
|
||||
cleaned_records += segment_meta_deleted
|
||||
|
||||
# Clean up orphaned max_seq_id records
|
||||
cursor = conn.execute("""
|
||||
DELETE FROM max_seq_id
|
||||
WHERE segment_id NOT IN (SELECT id FROM segments)
|
||||
""")
|
||||
seq_id_deleted = cursor.rowcount
|
||||
cleaned_records += seq_id_deleted
|
||||
|
||||
# Force FTS index rebuild - this is crucial for VACUUM to work properly
|
||||
conn.execute("INSERT INTO embedding_fulltext_search(embedding_fulltext_search) VALUES('rebuild')")
|
||||
|
||||
# Commit changes
|
||||
conn.commit()
|
||||
|
||||
log.info(f"ChromaDB cleanup: {embeddings_deleted} embeddings, {metadata_deleted} metadata, "
|
||||
f"{collection_meta_deleted} collection metadata, {segment_meta_deleted} segment metadata, "
|
||||
f"{seq_id_deleted} sequence IDs")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error cleaning orphaned ChromaDB database records: {e}")
|
||||
raise
|
||||
|
||||
return cleaned_records
|
||||
|
||||
def _cleanup_fts_selectively(self, conn) -> int:
|
||||
"""
|
||||
Selectively clean FTS content with atomic operations, preserving only data from active embeddings.
|
||||
|
||||
This method prevents destroying valid search data by:
|
||||
1. Creating and validating temporary table with valid content
|
||||
2. Using atomic transactions for DELETE/INSERT operations
|
||||
3. Rolling back on failure to preserve existing data
|
||||
4. Conservative fallback: skip FTS cleanup if validation fails
|
||||
|
||||
Returns:
|
||||
Number of valid FTS entries preserved, or -1 if FTS cleanup was skipped
|
||||
"""
|
||||
try:
|
||||
# Step 1: Create temporary table with valid content
|
||||
conn.execute("""
|
||||
CREATE TEMPORARY TABLE temp_valid_fts AS
|
||||
SELECT DISTINCT em.string_value
|
||||
FROM embedding_metadata em
|
||||
JOIN embeddings e ON em.id = e.id
|
||||
JOIN segments s ON e.segment_id = s.id
|
||||
WHERE em.string_value IS NOT NULL
|
||||
AND em.string_value != ''
|
||||
""")
|
||||
|
||||
# Step 2: Validate temp table creation and count records
|
||||
cursor = conn.execute("SELECT COUNT(*) FROM temp_valid_fts")
|
||||
valid_count = cursor.fetchone()[0]
|
||||
|
||||
# Step 3: Validate temp table is accessible
|
||||
try:
|
||||
conn.execute("SELECT 1 FROM temp_valid_fts LIMIT 1")
|
||||
temp_table_ok = True
|
||||
except Exception:
|
||||
temp_table_ok = False
|
||||
|
||||
# Step 4: Only proceed if validation passed
|
||||
if not temp_table_ok:
|
||||
log.warning("FTS temp table validation failed, skipping FTS cleanup for safety")
|
||||
conn.execute("DROP TABLE IF EXISTS temp_valid_fts")
|
||||
return -1 # Signal FTS cleanup was skipped
|
||||
|
||||
# Step 5: Atomic FTS cleanup operation
|
||||
conn.execute("BEGIN IMMEDIATE")
|
||||
try:
|
||||
# Delete all FTS content
|
||||
conn.execute("DELETE FROM embedding_fulltext_search")
|
||||
|
||||
# Re-insert only valid content if any exists
|
||||
if valid_count > 0:
|
||||
conn.execute("""
|
||||
INSERT INTO embedding_fulltext_search(string_value)
|
||||
SELECT string_value FROM temp_valid_fts
|
||||
""")
|
||||
log.debug(f"Preserved {valid_count} valid FTS entries")
|
||||
else:
|
||||
log.debug("No valid FTS content found, cleared all entries")
|
||||
|
||||
# Rebuild FTS index
|
||||
conn.execute("INSERT INTO embedding_fulltext_search(embedding_fulltext_search) VALUES('rebuild')")
|
||||
|
||||
# Commit the atomic operation
|
||||
conn.execute("COMMIT")
|
||||
|
||||
except Exception as e:
|
||||
# Rollback on any failure to preserve existing FTS data
|
||||
conn.execute("ROLLBACK")
|
||||
log.error(f"FTS cleanup failed, rolled back changes: {e}")
|
||||
conn.execute("DROP TABLE IF EXISTS temp_valid_fts")
|
||||
return -1 # Signal FTS cleanup failed
|
||||
|
||||
# Step 6: Clean up temporary table
|
||||
conn.execute("DROP TABLE IF EXISTS temp_valid_fts")
|
||||
|
||||
return valid_count
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"FTS cleanup validation failed, leaving FTS untouched: {e}")
|
||||
# Conservative approach: don't touch FTS if anything goes wrong
|
||||
try:
|
||||
conn.execute("DROP TABLE IF EXISTS temp_valid_fts")
|
||||
except:
|
||||
pass
|
||||
return -1 # Signal FTS cleanup was skipped
|
||||
|
||||
|
||||
class PGVectorDatabaseCleaner(VectorDatabaseCleaner):
|
||||
|
|
|
|||
Loading…
Reference in a new issue