From 85775a772c30ac10219716dd7ef4537f76817e2c Mon Sep 17 00:00:00 2001 From: LoiTra Date: Mon, 28 Jul 2025 17:09:37 +0700 Subject: [PATCH] fix hybrid searching --- Dockerfile | 3 + backend/open_webui/retrieval/utils.py | 101 ++++++- .../vector/dbs/qdrant_multitenancy.py | 283 ++++++++++++++++-- backend/requirements.txt | 1 + 4 files changed, 369 insertions(+), 19 deletions(-) diff --git a/Dockerfile b/Dockerfile index e59c671558..0d2362de1e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -94,6 +94,9 @@ ENV TIKTOKEN_ENCODING_NAME="cl100k_base" \ ## Hugging Face download cache ## ENV HF_HOME="/app/backend/data/cache/embedding/models" +## FastEmbed cache directory ## +ENV FASTEMBED_CACHE_PATH="/app/backend/data/cache/fastembed" + ## Torch Extensions ## # ENV TORCH_EXTENSIONS_DIR="/.cache/torch_extensions" diff --git a/backend/open_webui/retrieval/utils.py b/backend/open_webui/retrieval/utils.py index 7d6fdc2b56..11f092dcd2 100644 --- a/backend/open_webui/retrieval/utils.py +++ b/backend/open_webui/retrieval/utils.py @@ -242,6 +242,103 @@ async def query_doc_with_hybrid_search( ): log.warning(f"query_doc_with_hybrid_search:no_docs {collection_name}") return {"documents": [], "metadatas": [], "distances": []} + + # Use Qdrant's integrated search + if VECTOR_DB == "qdrant": + log.info("Using Qdrant search (hybrid if enabled)") + + # Generate query embedding + query_embedding = embedding_function(query, RAG_EMBEDDING_QUERY_PREFIX) + + # Use Qdrant's search method with query_text + result = VECTOR_DB_CLIENT.search( + collection_name=collection_name, + vectors=[query_embedding], + limit=k_reranker, # Get more results for reranking + query_text=query, # type: ignore # Enables hybrid search internally + ) + + if result and result.documents and result.documents[0]: + # Convert SearchResult to the expected format + documents = result.documents[0] + metadatas = result.metadatas[0] if result.metadatas else [{}] * len(documents) + distances = result.distances[0] if result.distances else [0.0] * len(documents) + + # Apply reranking if available + if reranking_function and len(documents) > 1: + log.debug("Applying reranking to Qdrant hybrid search results") + + # Create documents for reranking + docs_for_reranking = [ + Document(page_content=doc, metadata=meta) + for doc, meta in zip(documents, metadatas) + ] + + # Apply reranking - call with correct signature (single argument: list of query-doc pairs) + query_doc_pairs = [(query, doc.page_content) for doc in docs_for_reranking] + scores = reranking_function(query_doc_pairs) + + # Process scores into ranked documents (similar to RerankCompressor) + docs_with_scores = list(zip(docs_for_reranking, scores.tolist() if not isinstance(scores, list) else scores)) + + # Filter by relevance threshold + if r > 0.0: + docs_with_scores = [(d, s) for d, s in docs_with_scores if s >= r] + + # Sort by score (highest first) and limit to k + docs_with_scores = sorted(docs_with_scores, key=lambda x: x[1], reverse=True)[:k] + + if docs_with_scores: + # Extract final results + final_documents = [doc.page_content for doc, score in docs_with_scores] + final_metadatas = [] + final_distances = [] + + for doc, score in docs_with_scores: + metadata = doc.metadata.copy() + metadata["score"] = score + final_metadatas.append(metadata) + final_distances.append(score) + + result_dict = { + "distances": [final_distances], + "documents": [final_documents], + "metadatas": [final_metadatas], + } + else: + # No documents passed relevance threshold, return empty result + result_dict = { + "distances": [[]], + "documents": [[]], + "metadatas": [[]], + } + else: + # No reranking, just apply relevance threshold and limit + if r > 0.0: + # Filter by relevance threshold + filtered_indices = [i for i, score in enumerate(distances) if score >= r][:k] + filtered_documents = [documents[i] for i in filtered_indices] + filtered_metadatas = [metadatas[i] for i in filtered_indices] + filtered_distances = [distances[i] for i in filtered_indices] + else: + # No threshold, just limit to k + filtered_documents = documents[:k] + filtered_metadatas = metadatas[:k] + filtered_distances = distances[:k] + + result_dict = { + "distances": [filtered_distances], + "documents": [filtered_documents], + "metadatas": [filtered_metadatas], + } + + log.info( + f"query_doc_with_hybrid_search:qdrant_native_result {len(result_dict['documents'][0])} documents" + ) + return result_dict + else: + log.warning("Qdrant hybrid search returned no results, falling back to LangChain approach") + log.debug(f"query_doc_with_hybrid_search:doc {collection_name}") @@ -313,7 +410,7 @@ async def query_doc_with_hybrid_search( } log.info( - "query_doc_with_hybrid_search:result " + "query_doc_with_hybrid_search:langchain_result " + f'{result["metadatas"]} {result["distances"]}' ) return result @@ -1290,6 +1387,8 @@ class RerankCompressor(BaseDocumentCompressor): scores = None if reranking: scores = self.reranking_function(query, documents) + # query_doc_pairs = [(query, doc.page_content) for doc in documents] + # scores = self.reranking_function(query_doc_pairs) else: if not SENTENCE_TRANSFORMERS_AVAILABLE: raise ImportError("sentence_transformers is not available. Please install it to use reranking functionality.") diff --git a/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py b/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py index e9fa03d459..749a373c58 100644 --- a/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py +++ b/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py @@ -2,7 +2,6 @@ import logging from typing import Optional, Tuple, List, Dict, Any from urllib.parse import urlparse -import grpc from open_webui.config import ( QDRANT_API_KEY, QDRANT_GRPC_PORT, @@ -12,6 +11,7 @@ from open_webui.config import ( QDRANT_COLLECTION_PREFIX, QDRANT_TIMEOUT, QDRANT_HNSW_M, + ENABLE_RAG_HYBRID_SEARCH, ) from open_webui.env import SRC_LOG_LEVELS from open_webui.retrieval.vector.main import ( @@ -21,7 +21,6 @@ from open_webui.retrieval.vector.main import ( VectorItem, ) from qdrant_client import QdrantClient as Qclient -from qdrant_client.http.exceptions import UnexpectedResponse from qdrant_client.http.models import PointStruct from qdrant_client.models import models @@ -141,23 +140,33 @@ class QdrantClient(VectorDBBase): ): """ Creates a collection with multi-tenancy configuration and payload indexes for tenant_id and metadata fields. + Also creates sparse vector configuration for hybrid search support. """ + # Create collection with both dense and sparse vector support self.client.create_collection( collection_name=mt_collection_name, - vectors_config=models.VectorParams( - size=dimension, - distance=models.Distance.COSINE, - on_disk=self.QDRANT_ON_DISK, - ), + vectors_config={ + "dense": models.VectorParams( + size=dimension, + distance=models.Distance.COSINE, + on_disk=self.QDRANT_ON_DISK, + ) + }, # Disable global index building due to multitenancy # For more details https://qdrant.tech/documentation/guides/multiple-partitions/#calibrate-performance hnsw_config=models.HnswConfigDiff( payload_m=self.QDRANT_HNSW_M, m=0, ), + # Add sparse vectors configuration for BM25-like hybrid search + sparse_vectors_config={ + "bm25": models.SparseVectorParams( + modifier=models.Modifier.IDF, + ) + }, ) log.info( - f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!" + f"Multi-tenant collection {mt_collection_name} created with dimension {dimension} and sparse vector support!" ) self.client.create_payload_index( @@ -208,6 +217,190 @@ class QdrantClient(VectorDBBase): if not self.client.collection_exists(collection_name=mt_collection_name): self._create_multi_tenant_collection(mt_collection_name, dimension) + def _hybrid_search( + self, + collection_name: str, + query_vector: List[float], + query_text: str, + limit: int, + ) -> Optional[SearchResult]: + """ + Perform Qdrant native hybrid search using prefetch + RRF fusion. + + This method uses Qdrant's named vectors (dense + sparse) with prefetch queries + and applies RRF (Reciprocal Rank Fusion) for optimal result combination. + + Args: + collection_name: Name of the collection to search + query_vector: Dense vector representation of the query + query_text: Text query for sparse vector generation + limit: Maximum number of results to return + + Returns: + SearchResult with RRF-fused hybrid results + """ + if not self.client or not query_vector: + return None + + mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) + if not self.client.collection_exists(collection_name=mt_collection): + log.debug(f"Collection {mt_collection} doesn't exist, hybrid search returns None") + return None + + tenant_filter = _tenant_filter(tenant_id) + + try: + # Use Qdrant's native hybrid search with prefetch + RRF fusion (like user's example) + # Generate sparse vector from query text using FastEmbed or fallback + sparse_vector = self._query_to_sparse_vector(query_text) + + # Create prefetch queries for both dense and sparse vectors + prefetch_queries = [ + # Dense vector prefetch + models.Prefetch( + query=query_vector, + using="dense", + limit=limit * 2, # Get more candidates for better fusion + filter=models.Filter(must=[tenant_filter]), + ) + ] + + # Add sparse vector prefetch if we have terms + if sparse_vector["indices"]: + prefetch_queries.append( + models.Prefetch( + query=models.SparseVector( + indices=sparse_vector["indices"], + values=sparse_vector["values"], + ), + using="bm25", + limit=limit * 2, + filter=models.Filter(must=[tenant_filter]), + ) + ) + + # Use Qdrant's native fusion - RRF is currently the most robust option + # For custom weighting, Qdrant supports score formulas (see alternative implementation below) + query_response = self.client.query_points( + collection_name=mt_collection, + prefetch=prefetch_queries, + query=models.FusionQuery( + fusion=models.Fusion.RRF, + ), + limit=limit, + with_payload=True, + ) + + get_result = self._result_to_get_result(query_response.points) + return SearchResult( + ids=get_result.ids, + documents=get_result.documents, + metadatas=get_result.metadatas, + distances=[[(point.score + 1.0) / 2.0 for point in query_response.points]], + ) + + except Exception as e: + log.warning(f"Qdrant native hybrid search failed, trying fallback: {e}") + + # Fallback to client-side hybrid scoring if native approach fails + try: + candidates_limit = max(limit * 3, 100) + dense_results = self.client.query_points( + collection_name=mt_collection, + query=query_vector, + using="dense", # Use named dense vector + limit=candidates_limit, + query_filter=models.Filter(must=[tenant_filter]), + ) + + # Apply simple score normalization for fallback + get_result = self._result_to_get_result(dense_results.points) + return SearchResult( + ids=get_result.ids, + documents=get_result.documents, + metadatas=get_result.metadatas, + distances=[[(point.score + 1.0) / 2.0 for point in dense_results.points]], + ) + + except Exception as e2: + log.warning(f"Fallback hybrid search failed: {e2}") + # Final fallback to regular dense search + return self.search(collection_name, [query_vector], limit) + + + def _get_bm25_embedding_model(self): + """ + Get or create the BM25 embedding model using FastEmbed. + """ + if not hasattr(self, '_bm25_model') or self._bm25_model is None: + try: + from fastembed import SparseTextEmbedding # type: ignore + self._bm25_model = SparseTextEmbedding("Qdrant/bm25") + log.info("Initialized FastEmbed BM25 sparse embedding model") + except ImportError: + log.warning("FastEmbed not available, will use fallback sparse vector generation") + self._bm25_model = None + except Exception as e: + log.warning(f"Failed to initialize FastEmbed BM25 model: {e}") + self._bm25_model = None + return self._bm25_model + + def _text_to_sparse_vector(self, text: str) -> Dict[str, List]: + """ + Convert text to sparse vector representation using FastEmbed's BM25 model. + Falls back to simple implementation if FastEmbed is not available. + """ + if not text: + return {"indices": [], "values": []} + + # Try to use FastEmbed's BM25 model first + bm25_model = self._get_bm25_embedding_model() + if bm25_model is not None: + try: + # Use FastEmbed to generate proper BM25 sparse embedding + sparse_embeddings = list(bm25_model.passage_embed([text])) + if sparse_embeddings and len(sparse_embeddings) > 0: + sparse_embedding = sparse_embeddings[0] + # Convert to the format expected by Qdrant + return { + "indices": sparse_embedding.indices.tolist(), + "values": sparse_embedding.values.tolist() + } + except Exception as e: + log.warning(f"FastEmbed BM25 embedding failed: {e}, using fallback") + + # No FastEmbed available, return empty sparse vector + return {"indices": [], "values": []} + + def _query_to_sparse_vector(self, query_text: str) -> Dict[str, List]: + """ + Convert query text to sparse vector representation using FastEmbed's BM25 model. + Uses query_embed for better query representation vs passage_embed. + """ + if not query_text: + return {"indices": [], "values": []} + + # Try to use FastEmbed's BM25 model for query embedding + bm25_model = self._get_bm25_embedding_model() + if bm25_model is not None: + try: + # Use query_embed for queries (optimized for query representation) + sparse_embeddings = list(bm25_model.query_embed([query_text])) + if sparse_embeddings and len(sparse_embeddings) > 0: + sparse_embedding = sparse_embeddings[0] + # Convert to the format expected by Qdrant + return { + "indices": sparse_embedding.indices.tolist(), + "values": sparse_embedding.values.tolist() + } + except Exception as e: + log.warning(f"FastEmbed BM25 query embedding failed: {e}, using fallback") + + # No FastEmbed available, return empty sparse vector + return {"indices": [], "values": []} + + + def has_collection(self, collection_name: str) -> bool: """ Check if a logical collection exists by checking for any points with the tenant ID. @@ -256,25 +449,49 @@ class QdrantClient(VectorDBBase): ) def search( - self, collection_name: str, vectors: List[List[float | int]], limit: int + self, collection_name: str, vectors: List[List[float | int]], limit: int, query_text: Optional[str] = None ) -> Optional[SearchResult]: """ Search for the nearest neighbor items based on the vectors with tenant isolation. + Uses hybrid search when ENABLE_RAG_HYBRID_SEARCH is True and query_text is provided. """ if not self.client or not vectors: return None + + # Use hybrid search if enabled and query text is available + if ENABLE_RAG_HYBRID_SEARCH and query_text: + return self._hybrid_search( + collection_name=collection_name, + query_vector=vectors[0], + query_text=query_text, + limit=limit + ) + + # Fallback to regular dense vector search mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) if not self.client.collection_exists(collection_name=mt_collection): log.debug(f"Collection {mt_collection} doesn't exist, search returns None") return None tenant_filter = _tenant_filter(tenant_id) - query_response = self.client.query_points( - collection_name=mt_collection, - query=vectors[0], - limit=limit, - query_filter=models.Filter(must=[tenant_filter]), - ) + try: + # Try to use named dense vector first (for hybrid-enabled collections) + query_response = self.client.query_points( + collection_name=mt_collection, + query=vectors[0], + using="dense", + limit=limit, + query_filter=models.Filter(must=[tenant_filter]), + ) + except Exception: + # Fallback for collections without named vectors (legacy collections) + query_response = self.client.query_points( + collection_name=mt_collection, + query=vectors[0], + limit=limit, + query_filter=models.Filter(must=[tenant_filter]), + ) + get_result = self._result_to_get_result(query_response.points) return SearchResult( ids=get_result.ids, @@ -327,14 +544,44 @@ class QdrantClient(VectorDBBase): def upsert(self, collection_name: str, items: List[VectorItem]): """ - Upsert items with tenant ID. + Upsert items with tenant ID, dense vectors, and sparse vectors for hybrid search. """ if not self.client or not items: return None mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) - dimension = len(items[0]["vector"]) + dimension = len(items[0]["vector"]) # type: ignore # Items are dicts, not VectorItem instances self._ensure_collection(mt_collection, dimension) - points = self._create_points(items, tenant_id) + + # Create points with both dense and sparse vectors + points = [] + for item in items: + # Generate sparse vector from text content for BM25-like search + sparse_vector = self._text_to_sparse_vector(item["text"]) # type: ignore + + # Create vector dict with named vectors (similar to user's example) + vector_dict = { + "dense": item["vector"], # type: ignore # Dense semantic vector + } + + # Add sparse vector if we have terms + if sparse_vector["indices"]: + vector_dict["bm25"] = models.SparseVector( # type: ignore + indices=sparse_vector["indices"], + values=sparse_vector["values"], + ) + + points.append( + PointStruct( + id=item["id"], # type: ignore + vector=vector_dict, # type: ignore # Qdrant client accepts dict for named vectors + payload={ + "text": item["text"], # type: ignore + "metadata": item["metadata"], # type: ignore + TENANT_ID_FIELD: tenant_id, + }, + ) + ) + self.client.upload_points(mt_collection, points) return None diff --git a/backend/requirements.txt b/backend/requirements.txt index e2ae61c11a..220fd3ff16 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -54,6 +54,7 @@ opensearch-py==2.8.0 transformers==4.57.3 sentence-transformers==5.1.2 +fastembed==0.7.1 accelerate pyarrow==20.0.0 # fix: pin pyarrow version to 20 for rpi compatibility #15897 einops==0.8.1