diff --git a/backend/open_webui/routers/knowledge.py b/backend/open_webui/routers/knowledge.py index f67390518b..3a06ebfb18 100644 --- a/backend/open_webui/routers/knowledge.py +++ b/backend/open_webui/routers/knowledge.py @@ -20,6 +20,7 @@ from open_webui.routers.retrieval import ( BatchProcessFilesForm, ) from open_webui.storage.provider import Storage +from open_webui.utils.knowledge_sync import sync_files_to_knowledge from open_webui.constants import ERROR_MESSAGES from open_webui.utils.auth import get_verified_user @@ -327,6 +328,10 @@ class KnowledgeFileIdForm(BaseModel): file_id: str +class KnowledgeFileIdsForm(BaseModel): + file_ids: List[str] + + @router.post("/{id}/file/add", response_model=Optional[KnowledgeFilesResponse]) def add_file_to_knowledge_by_id( request: Request, @@ -394,6 +399,62 @@ def add_file_to_knowledge_by_id( ) +@router.post("/{id}/file/sync/batch", response_model=Optional[KnowledgeFilesResponse]) +def sync_files_to_knowledge_batch( + request: Request, + id: str, + form_data: KnowledgeFileIdsForm, + user=Depends(get_verified_user), +): + """ + Batch sync multiple files into a knowledge base. + Performing a single atomic update of the knowledge.data.file_ids. + """ + knowledge = Knowledges.get_knowledge_by_id(id=id) + if not knowledge: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=ERROR_MESSAGES.NOT_FOUND, + ) + + if ( + knowledge.user_id != user.id + and not has_access(user.id, "write", knowledge.access_control) + and user.role != "admin" + ): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=ERROR_MESSAGES.ACCESS_PROHIBITED, + ) + + try: + updated_knowledge, files_meta, warnings = sync_files_to_knowledge( + request=request, + knowledge_id=id, + new_file_ids=form_data.file_ids, + user=user, + ) + except HTTPException as e: + raise e + except Exception as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + + if warnings: + return KnowledgeFilesResponse( + **updated_knowledge.model_dump(), + files=files_meta, + warnings=warnings, + ) + + return KnowledgeFilesResponse( + **updated_knowledge.model_dump(), + files=files_meta, + ) + + @router.post("/{id}/file/update", response_model=Optional[KnowledgeFilesResponse]) def update_file_from_knowledge_by_id( request: Request, diff --git a/backend/open_webui/utils/knowledge_sync.py b/backend/open_webui/utils/knowledge_sync.py new file mode 100644 index 0000000000..3dcfa51e63 --- /dev/null +++ b/backend/open_webui/utils/knowledge_sync.py @@ -0,0 +1,221 @@ +import logging +import time +from typing import Optional + +from fastapi import HTTPException, Request, status + +from open_webui.internal.db import get_db +from open_webui.models.knowledge import Knowledge, Knowledges, KnowledgeModel +from open_webui.models.files import FileModel, FileMetadataResponse, Files +from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT +from open_webui.routers.retrieval import ( + process_file, + ProcessFileForm, + process_files_batch, + BatchProcessFilesForm, +) +from open_webui.storage.provider import Storage + +log = logging.getLogger(__name__) + + +def _update_knowledge_file_ids_atomic( + knowledge_id: str, remove_ids: set[str], add_ids: set[str] +) -> KnowledgeModel: + """ + Lock the knowledge row and atomically update file_ids by removing and adding + the provided sets. Prevents lost updates under concurrency. + """ + with get_db() as db: + row = ( + db.query(Knowledge) + .with_for_update() # row-level lock + .filter_by(id=knowledge_id) + .first() + ) + if not row: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Knowledge not found" + ) + + data = dict(row.data or {}) + current_ids = list(data.get("file_ids", [])) + new_set = set(current_ids) + if remove_ids: + new_set.difference_update(remove_ids) + if add_ids: + new_set.update(add_ids) + + data["file_ids"] = list(new_set) + + db.query(Knowledge).filter_by(id=knowledge_id).update( + {"data": data, "updated_at": int(time.time())} + ) + db.commit() + + # Return fresh model after commit + return Knowledges.get_knowledge_by_id(knowledge_id) + + +def sync_files_to_knowledge( + request: Request, knowledge_id: str, new_file_ids: list[str], user +) -> tuple[KnowledgeModel, list[FileMetadataResponse], Optional[dict]]: + """ + Batch sync a list of uploaded files into a knowledge base, handling: + - skip if same-named file with identical hash already present + - replace if same-named file with different hash exists + - add if no same-named file exists + + Steps: + 1) Ensure each incoming file is processed to compute hash/content. + 2) Compute skip/replace/add sets based on filename + hash comparison. + 3) Cleanup (vectors, storage, db) for skipped new files and replaced old files. + 4) Batch process embeddings for new additions (add + replace targets). + 5) Atomically update knowledge.data.file_ids under a row lock. + + Returns: (updated_knowledge_model, files_metadata, optional_warnings) + """ + knowledge = Knowledges.get_knowledge_by_id(id=knowledge_id) + if not knowledge: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Knowledge not found" + ) + + # Deduplicate incoming list by preserving order + seen: set[str] = set() + incoming_ids: list[str] = [] + for fid in new_file_ids: + if fid not in seen: + seen.add(fid) + incoming_ids.append(fid) + + existing_ids = (knowledge.data or {}).get("file_ids", []) + existing_files: list[FileModel] = ( + Files.get_files_by_ids(existing_ids) if existing_ids else [] + ) + + # Build lookup by filename for existing KB files + existing_by_name: dict[str, FileModel] = {} + for f in existing_files: + if f and f.filename: + existing_by_name[f.filename] = f + + to_skip_new_ids: set[str] = set() # identical by hash -> delete uploaded + to_replace_old_to_new: dict[str, str] = {} # old_id -> new_id + to_add_ids: set[str] = set() + + errors: list[str] = [] + + # Ensure each incoming file is processed enough to have hash/content + for fid in incoming_ids: + new_file = Files.get_file_by_id(fid) + if not new_file: + errors.append(f"File {fid} not found") + continue + + if not (new_file.hash and new_file.data and new_file.data.get("content")): + try: + # Process without specifying collection to generate content/hash + process_file(request, ProcessFileForm(file_id=new_file.id), user=user) + new_file = Files.get_file_by_id(new_file.id) # refresh + except Exception as e: + log.debug(e) + errors.append(f"Failed to process file {new_file.id}: {e}") + continue + + same_name_file = existing_by_name.get(new_file.filename) + + if same_name_file: + # If hashes match, skip (discard the new upload) + if ( + same_name_file.hash + and new_file.hash + and same_name_file.hash == new_file.hash + ): + to_skip_new_ids.add(new_file.id) + else: + # Hash differs -> replace old with new + to_replace_old_to_new[same_name_file.id] = new_file.id + else: + # No existing file with same name -> add + to_add_ids.add(new_file.id) + + # Clean up skipped new files (remove their own vectors/collections, storage, db) + for new_id in list(to_skip_new_ids): + try: + try: + VECTOR_DB_CLIENT.delete_collection(collection_name=f"file-{new_id}") + except Exception as ve: + log.debug(ve) + new_file = Files.get_file_by_id(new_id) + if new_file and new_file.path: + try: + Storage.delete_file(new_file.path) + except Exception as se: + log.debug(se) + Files.delete_file_by_id(new_id) + except Exception as e: + log.debug(e) + errors.append(f"Failed cleanup for skipped file {new_id}: {e}") + + # For replacements, remove old file's embeddings, collections, storage, and db record + for old_id, new_id in list(to_replace_old_to_new.items()): + try: + try: + VECTOR_DB_CLIENT.delete( + collection_name=knowledge_id, filter={"file_id": old_id} + ) + except Exception as ve: + log.debug(ve) + try: + if VECTOR_DB_CLIENT.has_collection(collection_name=f"file-{old_id}"): + VECTOR_DB_CLIENT.delete_collection(collection_name=f"file-{old_id}") + except Exception as ce: + log.debug(ce) + + old_file = Files.get_file_by_id(old_id) + if old_file and old_file.path: + try: + Storage.delete_file(old_file.path) + except Exception as se: + log.debug(se) + Files.delete_file_by_id(old_id) + except Exception as e: + log.debug(e) + errors.append(f"Failed replace cleanup for old file {old_id}: {e}") + + # Process embeddings for additions (to_add + replace targets) into KB collection + add_targets: set[str] = set(to_add_ids) | set(to_replace_old_to_new.values()) + if add_targets: + add_files: list[FileModel] = Files.get_files_by_ids(list(add_targets)) + try: + process_files_batch( + request=request, + form_data=BatchProcessFilesForm( + files=add_files, collection_name=knowledge_id + ), + user=user, + ) + except Exception as e: + log.error(f"Batch processing failed: {e}") + errors.append(f"Batch processing failed: {e}") + + # Atomically update knowledge.data.file_ids under lock + updated_knowledge = _update_knowledge_file_ids_atomic( + knowledge_id=knowledge_id, + remove_ids=set(to_replace_old_to_new.keys()), + add_ids=add_targets, + ) + + # Prepare response files + final_ids = (updated_knowledge.data or {}).get("file_ids", []) + files_meta: list[FileMetadataResponse] = Files.get_file_metadatas_by_ids(final_ids) + + warnings = None + if errors: + warnings = { + "message": "Some sync operations encountered errors", + "errors": errors, + } + + return updated_knowledge, files_meta, warnings diff --git a/src/lib/apis/knowledge/index.ts b/src/lib/apis/knowledge/index.ts index 98b2c1e5ec..a1826f6fd7 100644 --- a/src/lib/apis/knowledge/index.ts +++ b/src/lib/apis/knowledge/index.ts @@ -262,6 +262,8 @@ export const addFileToKnowledgeById = async (token: string, id: string, fileId: return res; }; + + export const updateFileFromKnowledgeById = async (token: string, id: string, fileId: string) => { let error = null; @@ -297,10 +299,17 @@ export const updateFileFromKnowledgeById = async (token: string, id: string, fil return res; }; -export const removeFileFromKnowledgeById = async (token: string, id: string, fileId: string) => { +export const removeFileFromKnowledgeById = async ( + token: string, + id: string, + fileId: string, + deleteFile: boolean = true +) => { let error = null; - const res = await fetch(`${WEBUI_API_BASE_URL}/knowledge/${id}/file/remove`, { + const res = await fetch( + `${WEBUI_API_BASE_URL}/knowledge/${id}/file/remove?delete_file=${deleteFile}`, + { method: 'POST', headers: { Accept: 'application/json', @@ -396,6 +405,37 @@ export const deleteKnowledgeById = async (token: string, id: string) => { return res; }; +export const syncFilesToKnowledgeByIdBatch = async (token: string, id: string, fileIds: string[]) => { + let error = null; + + const res = await fetch(`${WEBUI_API_BASE_URL}/knowledge/${id}/file/sync/batch`, { + method: 'POST', + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json', + authorization: `Bearer ${token}` + }, + body: JSON.stringify({ + file_ids: fileIds + }) + }) + .then(async (res) => { + if (!res.ok) throw await res.json(); + return res.json(); + }) + .catch((err) => { + error = err.detail; + console.error(err); + return null; + }); + + if (error) { + throw error; + } + + return res; +}; + export const reindexKnowledgeFiles = async (token: string) => { let error = null; diff --git a/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte b/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte index 26690b3c33..0c99e400e7 100644 --- a/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte +++ b/src/lib/components/workspace/Knowledge/KnowledgeBase.svelte @@ -29,10 +29,9 @@ getKnowledgeById, getKnowledgeBases, removeFileFromKnowledgeById, - resetKnowledgeById, updateFileFromKnowledgeById, updateKnowledgeById, - searchKnowledgeFilesById + syncFilesToKnowledgeByIdBatch } from '$lib/apis/knowledge'; import { blobToFile } from '$lib/utils'; @@ -82,6 +81,9 @@ let selectedFileContent = ''; let inputFiles = null; + let syncMode = false; + let syncCollectedNames: Set = new Set(); + let syncCollectedIds: string[] = []; let query = ''; let viewOption = null; @@ -163,12 +165,20 @@ const blob = new Blob([content], { type: 'text/plain' }); const file = blobToFile(blob, `${name}.txt`); - console.log(file); return file; }; const uploadFileHandler = async (file) => { - console.log(file); + // When syncing a directory, remember each file's relative name used on upload. + if (syncMode) { + try { + // Track only base names to match server-side storage + const baseName = file.name?.split(/[\\/]/).pop() ?? file.name; + syncCollectedNames.add(baseName); + } catch (_) { + // no-op + } + } const tempItemId = uuidv4(); const fileItem = { @@ -192,10 +202,6 @@ ($config?.file?.max_size ?? null) !== null && file.size > ($config?.file?.max_size ?? 0) * 1024 * 1024 ) { - console.log('File exceeds max size limit:', { - fileSize: file.size, - maxSize: ($config?.file?.max_size ?? 0) * 1024 * 1024 - }); toast.error( $i18n.t(`File size should not exceed {{maxSize}} MB.`, { maxSize: $config?.file?.max_size @@ -223,8 +229,7 @@ }); if (uploadedFile) { - console.log(uploadedFile); - fileItems = fileItems.map((item) => { + knowledge.files = knowledge.files.map((item) => { if (item.itemId === tempItemId) { item.id = uploadedFile.id; } @@ -233,14 +238,17 @@ delete item.itemId; return item; }); - if (uploadedFile.error) { console.warn('File upload warning:', uploadedFile.error); toast.warning(uploadedFile.error); fileItems = fileItems.filter((file) => file.id !== uploadedFile.id); } else { - await addFileHandler(uploadedFile.id); - } + if (syncMode) { + syncCollectedIds.push(uploadedFile.id); + } else { + await addFileHandler(uploadedFile.id); + } + } } else { toast.error($i18n.t('Failed to upload file.')); } @@ -338,8 +346,6 @@ if (totalFiles > 0) { await processDirectory(dirHandle); - } else { - console.log('No files to upload.'); } }; @@ -423,20 +429,52 @@ // Helper function to maintain file paths within zip const syncDirectoryHandler = async () => { - if ((knowledge?.files ?? []).length > 0) { - const res = await resetKnowledgeById(localStorage.token, id).catch((e) => { + syncMode = true; + syncCollectedNames = new Set(); + syncCollectedIds = []; + try { + await uploadDirectoryHandler(); + + // After uploading, sync all new/updated files in one batch + const batchRes = await syncFilesToKnowledgeByIdBatch(localStorage.token, id, syncCollectedIds).catch((e) => { toast.error(`${e}`); + return null; }); - - if (res) { - knowledge = res; - toast.success($i18n.t('Knowledge reset successfully.')); - - // Upload directory - uploadDirectoryHandler(); + if (batchRes) { + knowledge = batchRes; } - } else { - uploadDirectoryHandler(); + + // After batch sync, remove KB files that are not present in the directory + const dirNames = new Set(Array.from(syncCollectedNames)); + const currentFiles = knowledge?.files ?? []; + const toRemove = currentFiles.filter((f) => !dirNames.has(f?.meta?.name ?? f?.filename)); + + await Promise.all( + toRemove.map(async (f) => { + // First remove from knowledge (and KB vectors) but keep file record + await removeFileFromKnowledgeById(localStorage.token, id, f.id, false).catch((e) => { + toast.error(`${e}`); + return null; + }); + // Then delete the actual file (removes per-file vectors and storage) + await deleteFileById(localStorage.token, f.id).catch((e) => { + console.error(e); + }); + }) + ); + + // Refresh knowledge to ensure consistent state after concurrent operations + const refreshed = await getKnowledgeById(localStorage.token, id).catch((e) => { + toast.error(`${e}`); + return null; + }); + if (refreshed) { + knowledge = refreshed; + } + + toast.success($i18n.t('Directory sync completed.')); + } finally { + syncMode = false; } }; @@ -457,14 +495,29 @@ } }; + const syncFileHandler = async (fileId) => { + const updatedKnowledge = await syncFilesToKnowledgeByIdBatch(localStorage.token, id, [fileId]).catch( + (e) => { + toast.error(`${e}`); + return null; + } + ); + + if (updatedKnowledge) { + knowledge = updatedKnowledge; + toast.success($i18n.t('File synced successfully.')); + } else { + toast.error($i18n.t('Failed to sync file.')); + knowledge.files = knowledge.files.filter((file) => file.id !== fileId); + } + }; + const deleteFileHandler = async (fileId) => { try { - console.log('Starting file deletion process for:', fileId); // Remove from knowledge base only const updatedKnowledge = await removeFileFromKnowledgeById(localStorage.token, id, fileId); - console.log('Knowledge base updated:', updatedKnowledge); if (updatedKnowledge) { knowledge = updatedKnowledge; @@ -484,7 +537,6 @@ const updateFileContentHandler = async () => { if (isSaving) { - console.log('Save operation already in progress, skipping...'); return; } @@ -515,7 +567,6 @@ }; const changeDebounceHandler = () => { - console.log('debounce'); if (debounceTimeout) { clearTimeout(debounceTimeout); } @@ -689,7 +740,7 @@ { syncDirectoryHandler(); @@ -980,6 +1031,26 @@ + {#if filteredItems.length > 0} +
+ { + selectedFileId = selectedFileId === e.detail ? null : e.detail; + }} + on:delete={(e) => { + + selectedFileId = null; + deleteFileHandler(e.detail); + }} + /> +
+ {:else} +
+
+ {$i18n.t('No content found')} {#key selectedFile.id}