mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-12 12:25:20 +00:00
225 lines
8.1 KiB
Python
225 lines
8.1 KiB
Python
import logging
|
|
import time
|
|
from typing import Optional
|
|
|
|
from fastapi import HTTPException, Request, status
|
|
|
|
from open_webui.internal.db import get_db
|
|
from open_webui.models.knowledge import Knowledge, Knowledges, KnowledgeModel
|
|
from open_webui.models.files import FileModel, FileMetadataResponse, Files
|
|
from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT
|
|
from open_webui.routers.retrieval import (
|
|
process_file,
|
|
ProcessFileForm,
|
|
process_files_batch,
|
|
BatchProcessFilesForm,
|
|
)
|
|
from open_webui.storage.provider import Storage
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _update_knowledge_file_ids_atomic(
|
|
knowledge_id: str, remove_ids: set[str], add_ids: set[str]
|
|
) -> KnowledgeModel:
|
|
"""
|
|
Lock the knowledge row and atomically update file_ids by removing and adding
|
|
the provided sets. Prevents lost updates under concurrency.
|
|
"""
|
|
with get_db() as db:
|
|
row = (
|
|
db.query(Knowledge)
|
|
.with_for_update() # row-level lock
|
|
.filter_by(id=knowledge_id)
|
|
.first()
|
|
)
|
|
if not row:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Knowledge not found"
|
|
)
|
|
|
|
data = dict(row.data or {})
|
|
current_ids = list(data.get("file_ids", []))
|
|
new_set = set(current_ids)
|
|
if remove_ids:
|
|
new_set.difference_update(remove_ids)
|
|
if add_ids:
|
|
new_set.update(add_ids)
|
|
|
|
data["file_ids"] = list(new_set)
|
|
|
|
db.query(Knowledge).filter_by(id=knowledge_id).update(
|
|
{"data": data, "updated_at": int(time.time())}
|
|
)
|
|
db.commit()
|
|
|
|
# Return fresh model after commit
|
|
return Knowledges.get_knowledge_by_id(knowledge_id)
|
|
|
|
|
|
def sync_files_to_knowledge(
|
|
request: Request, knowledge_id: str, new_file_ids: list[str], user
|
|
) -> tuple[KnowledgeModel, list[FileMetadataResponse], Optional[dict]]:
|
|
"""
|
|
Batch sync a list of uploaded files into a knowledge base, handling:
|
|
- skip if same-named file with identical hash already present
|
|
- replace if same-named file with different hash exists
|
|
- add if no same-named file exists
|
|
|
|
Steps:
|
|
1) Ensure each incoming file is processed to compute hash/content.
|
|
2) Compute skip/replace/add sets based on filename + hash comparison.
|
|
3) Cleanup (vectors, storage, db) for skipped new files and replaced old files.
|
|
4) Batch process embeddings for new additions (add + replace targets).
|
|
5) Atomically update knowledge.data.file_ids under a row lock.
|
|
|
|
Returns: (updated_knowledge_model, files_metadata, optional_warnings)
|
|
"""
|
|
knowledge = Knowledges.get_knowledge_by_id(id=knowledge_id)
|
|
if not knowledge:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Knowledge not found"
|
|
)
|
|
|
|
# Deduplicate incoming list by preserving order
|
|
seen: set[str] = set()
|
|
incoming_ids: list[str] = []
|
|
for fid in new_file_ids:
|
|
if fid not in seen:
|
|
seen.add(fid)
|
|
incoming_ids.append(fid)
|
|
|
|
existing_ids = (knowledge.data or {}).get("file_ids", [])
|
|
existing_files: list[FileModel] = (
|
|
Files.get_files_by_ids(existing_ids) if existing_ids else []
|
|
)
|
|
|
|
# Build lookup by filename for existing KB files
|
|
existing_by_name: dict[str, FileModel] = {}
|
|
for f in existing_files:
|
|
if f and f.filename:
|
|
existing_by_name[f.filename] = f
|
|
|
|
to_skip_new_ids: set[str] = set() # identical by hash -> delete uploaded
|
|
to_replace_old_to_new: dict[str, str] = {} # old_id -> new_id
|
|
to_add_ids: set[str] = set()
|
|
|
|
errors: list[str] = []
|
|
|
|
# Ensure each incoming file is processed enough to have hash/content
|
|
for fid in incoming_ids:
|
|
new_file = Files.get_file_by_id(fid)
|
|
if not new_file:
|
|
errors.append(f"File {fid} not found")
|
|
continue
|
|
|
|
if not (new_file.hash and new_file.data and new_file.data.get("content")):
|
|
try:
|
|
# Process without specifying collection to generate content/hash
|
|
process_file(request, ProcessFileForm(file_id=new_file.id), user=user)
|
|
new_file = Files.get_file_by_id(new_file.id) # refresh
|
|
except Exception as e:
|
|
log.debug(e)
|
|
errors.append(f"Failed to process file {new_file.id}: {e}")
|
|
continue
|
|
|
|
same_name_file = existing_by_name.get(new_file.filename)
|
|
|
|
if same_name_file:
|
|
# If hashes match, skip (discard the new upload)
|
|
if (
|
|
same_name_file.hash
|
|
and new_file.hash
|
|
and same_name_file.hash == new_file.hash
|
|
):
|
|
to_skip_new_ids.add(new_file.id)
|
|
else:
|
|
# Hash differs -> replace old with new
|
|
to_replace_old_to_new[same_name_file.id] = new_file.id
|
|
else:
|
|
# No existing file with same name -> add
|
|
to_add_ids.add(new_file.id)
|
|
|
|
# Clean up skipped new files (remove their own vectors/collections, storage, db)
|
|
for new_id in list(to_skip_new_ids):
|
|
try:
|
|
try:
|
|
VECTOR_DB_CLIENT.delete_collection(collection_name=f"file-{new_id}")
|
|
except Exception as ve:
|
|
log.debug(ve)
|
|
new_file = Files.get_file_by_id(new_id)
|
|
if new_file and new_file.path:
|
|
try:
|
|
Storage.delete_file(new_file.path)
|
|
except Exception as se:
|
|
log.debug(se)
|
|
Files.delete_file_by_id(new_id)
|
|
except Exception as e:
|
|
log.debug(e)
|
|
errors.append(f"Failed cleanup for skipped file {new_id}: {e}")
|
|
|
|
# For replacements, remove old file's embeddings, collections, storage, and db record
|
|
for old_id, new_id in list(to_replace_old_to_new.items()):
|
|
try:
|
|
try:
|
|
VECTOR_DB_CLIENT.delete(
|
|
collection_name=knowledge_id, filter={"file_id": old_id}
|
|
)
|
|
except Exception as ve:
|
|
log.debug(ve)
|
|
try:
|
|
if VECTOR_DB_CLIENT.has_collection(
|
|
collection_name=f"file-{old_id}"
|
|
):
|
|
VECTOR_DB_CLIENT.delete_collection(
|
|
collection_name=f"file-{old_id}"
|
|
)
|
|
except Exception as ce:
|
|
log.debug(ce)
|
|
|
|
old_file = Files.get_file_by_id(old_id)
|
|
if old_file and old_file.path:
|
|
try:
|
|
Storage.delete_file(old_file.path)
|
|
except Exception as se:
|
|
log.debug(se)
|
|
Files.delete_file_by_id(old_id)
|
|
except Exception as e:
|
|
log.debug(e)
|
|
errors.append(f"Failed replace cleanup for old file {old_id}: {e}")
|
|
|
|
# Process embeddings for additions (to_add + replace targets) into KB collection
|
|
add_targets: set[str] = set(to_add_ids) | set(to_replace_old_to_new.values())
|
|
if add_targets:
|
|
add_files: list[FileModel] = Files.get_files_by_ids(list(add_targets))
|
|
try:
|
|
process_files_batch(
|
|
request=request,
|
|
form_data=BatchProcessFilesForm(
|
|
files=add_files, collection_name=knowledge_id
|
|
),
|
|
user=user,
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Batch processing failed: {e}")
|
|
errors.append(f"Batch processing failed: {e}")
|
|
|
|
# Atomically update knowledge.data.file_ids under lock
|
|
updated_knowledge = _update_knowledge_file_ids_atomic(
|
|
knowledge_id=knowledge_id,
|
|
remove_ids=set(to_replace_old_to_new.keys()),
|
|
add_ids=add_targets,
|
|
)
|
|
|
|
# Prepare response files
|
|
final_ids = (updated_knowledge.data or {}).get("file_ids", [])
|
|
files_meta: list[FileMetadataResponse] = Files.get_file_metadatas_by_ids(final_ids)
|
|
|
|
warnings = None
|
|
if errors:
|
|
warnings = {
|
|
"message": "Some sync operations encountered errors",
|
|
"errors": errors,
|
|
}
|
|
|
|
return updated_knowledge, files_meta, warnings
|