mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-11 20:05:19 +00:00
parent
20187f9a2d
commit
c307d87262
1 changed files with 120 additions and 58 deletions
|
|
@ -13,11 +13,12 @@ from abc import ABC, abstractmethod
|
|||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import select, text
|
||||
|
||||
from open_webui.utils.auth import get_admin_user
|
||||
from open_webui.models.users import Users
|
||||
from open_webui.models.chats import Chat, ChatModel, Chats
|
||||
from open_webui.models.messages import Message
|
||||
from open_webui.models.files import Files
|
||||
from open_webui.models.notes import Notes
|
||||
from open_webui.models.prompts import Prompts
|
||||
|
|
@ -25,7 +26,7 @@ from open_webui.models.models import Models
|
|||
from open_webui.models.knowledge import Knowledges
|
||||
from open_webui.models.functions import Functions
|
||||
from open_webui.models.tools import Tools
|
||||
from open_webui.models.folders import Folders
|
||||
from open_webui.models.folders import Folder, Folders
|
||||
from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT, VECTOR_DB
|
||||
from open_webui.constants import ERROR_MESSAGES
|
||||
from open_webui.env import SRC_LOG_LEVELS
|
||||
|
|
@ -181,6 +182,65 @@ class JSONFileIDExtractor:
|
|||
return validated_ids
|
||||
|
||||
|
||||
# UUID pattern for direct dict traversal (Phase 1.5 optimization)
|
||||
UUID_PATTERN = re.compile(
|
||||
r'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$'
|
||||
)
|
||||
|
||||
|
||||
def collect_file_ids_from_dict(obj, out: Set[str], valid_ids: Set[str], _depth: int = 0) -> None:
|
||||
"""
|
||||
Recursively traverse dict/list structures and collect file IDs.
|
||||
|
||||
This function replaces json.dumps() + regex approach with direct dict traversal,
|
||||
reducing memory usage by ~75% on large chat databases.
|
||||
|
||||
Args:
|
||||
obj: Dict, list, or any value to traverse
|
||||
out: Set to accumulate found file IDs into
|
||||
valid_ids: Set of known valid file IDs (for O(1) validation)
|
||||
_depth: Current recursion depth (safety limit)
|
||||
|
||||
Patterns detected:
|
||||
- {"id": "uuid"}
|
||||
- {"file_id": "uuid"}
|
||||
- {"fileId": "uuid"}
|
||||
- {"file_ids": ["uuid1", "uuid2"]}
|
||||
- {"fileIds": ["uuid1", "uuid2"]}
|
||||
"""
|
||||
# Safety: Prevent excessive recursion
|
||||
if _depth > 100:
|
||||
return
|
||||
|
||||
if isinstance(obj, dict):
|
||||
# Check individual file ID fields
|
||||
for field_name in ['id', 'file_id', 'fileId']:
|
||||
fid = obj.get(field_name)
|
||||
if isinstance(fid, str) and UUID_PATTERN.fullmatch(fid):
|
||||
if fid in valid_ids:
|
||||
out.add(fid)
|
||||
|
||||
# Check file ID array fields
|
||||
for field_name in ['file_ids', 'fileIds']:
|
||||
fid_array = obj.get(field_name)
|
||||
if isinstance(fid_array, list):
|
||||
for fid in fid_array:
|
||||
if isinstance(fid, str) and UUID_PATTERN.fullmatch(fid):
|
||||
if fid in valid_ids:
|
||||
out.add(fid)
|
||||
|
||||
# Recurse into all dict values
|
||||
for value in obj.values():
|
||||
collect_file_ids_from_dict(value, out, valid_ids, _depth + 1)
|
||||
|
||||
elif isinstance(obj, list):
|
||||
# Recurse into all list items
|
||||
for item in obj:
|
||||
collect_file_ids_from_dict(item, out, valid_ids, _depth + 1)
|
||||
|
||||
# Primitives (str, int, None, etc.) - do nothing
|
||||
|
||||
|
||||
class VectorDatabaseCleaner(ABC):
|
||||
"""
|
||||
Abstract base class for vector database cleanup operations.
|
||||
|
|
@ -1122,82 +1182,84 @@ def get_active_file_ids() -> Set[str]:
|
|||
active_file_ids.add(stripped_id)
|
||||
|
||||
# Scan chats for file references
|
||||
# Stream chats to avoid loading all into memory
|
||||
# Stream chats using Core SELECT to avoid ORM overhead
|
||||
chat_count = 0
|
||||
with get_db() as db:
|
||||
for chat_orm in db.query(Chat).yield_per(1000):
|
||||
chat_count += 1
|
||||
chat = ChatModel.model_validate(chat_orm)
|
||||
stmt = select(Chat.id, Chat.chat)
|
||||
result = db.execution_options(stream_results=True).execute(stmt)
|
||||
|
||||
if not chat.chat or not isinstance(chat.chat, dict):
|
||||
continue
|
||||
while True:
|
||||
rows = result.fetchmany(1000)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
try:
|
||||
chat_json_str = json.dumps(chat.chat)
|
||||
# Extract file IDs without DB queries
|
||||
extracted_ids = JSONFileIDExtractor.extract_file_ids(chat_json_str)
|
||||
# Validate against preloaded set (O(1) per ID)
|
||||
validated_ids = extracted_ids & all_file_ids
|
||||
active_file_ids.update(validated_ids)
|
||||
for chat_id, chat_dict in rows:
|
||||
chat_count += 1
|
||||
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing chat {chat.id} for file references: {e}")
|
||||
# Skip if no chat data or not a dict
|
||||
if not chat_dict or not isinstance(chat_dict, dict):
|
||||
continue
|
||||
|
||||
try:
|
||||
# Direct dict traversal (no json.dumps needed)
|
||||
collect_file_ids_from_dict(chat_dict, active_file_ids, all_file_ids)
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing chat {chat_id} for file references: {e}")
|
||||
|
||||
log.debug(f"Scanned {chat_count} chats for file references")
|
||||
|
||||
# Scan folders for file references
|
||||
# Stream folders using Core SELECT to avoid ORM overhead
|
||||
try:
|
||||
folders = Folders.get_all_folders()
|
||||
with get_db() as db:
|
||||
stmt = select(Folder.id, Folder.items, Folder.data)
|
||||
result = db.execution_options(stream_results=True).execute(stmt)
|
||||
|
||||
for folder in folders:
|
||||
if folder.items:
|
||||
try:
|
||||
items_str = json.dumps(folder.items)
|
||||
# Extract file IDs without DB queries
|
||||
extracted_ids = JSONFileIDExtractor.extract_file_ids(items_str)
|
||||
# Validate against preloaded set (O(1) per ID)
|
||||
validated_ids = extracted_ids & all_file_ids
|
||||
active_file_ids.update(validated_ids)
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing folder {folder.id} items: {e}")
|
||||
while True:
|
||||
rows = result.fetchmany(100)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
if hasattr(folder, "data") and folder.data:
|
||||
try:
|
||||
data_str = json.dumps(folder.data)
|
||||
# Extract file IDs without DB queries
|
||||
extracted_ids = JSONFileIDExtractor.extract_file_ids(data_str)
|
||||
# Validate against preloaded set (O(1) per ID)
|
||||
validated_ids = extracted_ids & all_file_ids
|
||||
active_file_ids.update(validated_ids)
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing folder {folder.id} data: {e}")
|
||||
for folder_id, items_dict, data_dict in rows:
|
||||
# Process folder.items
|
||||
if items_dict:
|
||||
try:
|
||||
# Direct dict traversal (no json.dumps needed)
|
||||
collect_file_ids_from_dict(items_dict, active_file_ids, all_file_ids)
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing folder {folder_id} items: {e}")
|
||||
|
||||
# Process folder.data
|
||||
if data_dict:
|
||||
try:
|
||||
# Direct dict traversal (no json.dumps needed)
|
||||
collect_file_ids_from_dict(data_dict, active_file_ids, all_file_ids)
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing folder {folder_id} data: {e}")
|
||||
|
||||
except Exception as e:
|
||||
log.debug(f"Error scanning folders for file references: {e}")
|
||||
|
||||
# Scan standalone messages for file references
|
||||
# Stream messages using Core SELECT to avoid text() and yield_per issues
|
||||
try:
|
||||
with get_db() as db:
|
||||
stmt = text("SELECT id, data FROM message WHERE data IS NOT NULL")
|
||||
stmt = select(Message.id, Message.data).where(Message.data.isnot(None))
|
||||
result = db.execution_options(stream_results=True).execute(stmt)
|
||||
|
||||
while True:
|
||||
rows = result.fetchmany(1000)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for message_id, message_data_dict in rows:
|
||||
if message_data_dict:
|
||||
try:
|
||||
# Direct dict traversal (no json.dumps needed)
|
||||
collect_file_ids_from_dict(message_data_dict, active_file_ids, all_file_ids)
|
||||
except Exception as e:
|
||||
log.debug(f"Error processing message {message_id} data: {e}")
|
||||
|
||||
for row in db.execute(stmt).yield_per(1000):
|
||||
message_id, message_data_json = row
|
||||
if message_data_json:
|
||||
try:
|
||||
data_str = (
|
||||
json.dumps(message_data_json)
|
||||
if isinstance(message_data_json, dict)
|
||||
else str(message_data_json)
|
||||
)
|
||||
# Extract file IDs without DB queries
|
||||
extracted_ids = JSONFileIDExtractor.extract_file_ids(data_str)
|
||||
# Validate against preloaded set (O(1) per ID)
|
||||
validated_ids = extracted_ids & all_file_ids
|
||||
active_file_ids.update(validated_ids)
|
||||
except Exception as e:
|
||||
log.debug(
|
||||
f"Error processing message {message_id} data: {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
log.debug(f"Error scanning messages for file references: {e}")
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue