mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-14 21:35:19 +00:00
fix hybrid searching
This commit is contained in:
parent
3210b2fa45
commit
85775a772c
4 changed files with 369 additions and 19 deletions
|
|
@ -94,6 +94,9 @@ ENV TIKTOKEN_ENCODING_NAME="cl100k_base" \
|
|||
## Hugging Face download cache ##
|
||||
ENV HF_HOME="/app/backend/data/cache/embedding/models"
|
||||
|
||||
## FastEmbed cache directory ##
|
||||
ENV FASTEMBED_CACHE_PATH="/app/backend/data/cache/fastembed"
|
||||
|
||||
## Torch Extensions ##
|
||||
# ENV TORCH_EXTENSIONS_DIR="/.cache/torch_extensions"
|
||||
|
||||
|
|
|
|||
|
|
@ -243,6 +243,103 @@ async def query_doc_with_hybrid_search(
|
|||
log.warning(f"query_doc_with_hybrid_search:no_docs {collection_name}")
|
||||
return {"documents": [], "metadatas": [], "distances": []}
|
||||
|
||||
# Use Qdrant's integrated search
|
||||
if VECTOR_DB == "qdrant":
|
||||
log.info("Using Qdrant search (hybrid if enabled)")
|
||||
|
||||
# Generate query embedding
|
||||
query_embedding = embedding_function(query, RAG_EMBEDDING_QUERY_PREFIX)
|
||||
|
||||
# Use Qdrant's search method with query_text
|
||||
result = VECTOR_DB_CLIENT.search(
|
||||
collection_name=collection_name,
|
||||
vectors=[query_embedding],
|
||||
limit=k_reranker, # Get more results for reranking
|
||||
query_text=query, # type: ignore # Enables hybrid search internally
|
||||
)
|
||||
|
||||
if result and result.documents and result.documents[0]:
|
||||
# Convert SearchResult to the expected format
|
||||
documents = result.documents[0]
|
||||
metadatas = result.metadatas[0] if result.metadatas else [{}] * len(documents)
|
||||
distances = result.distances[0] if result.distances else [0.0] * len(documents)
|
||||
|
||||
# Apply reranking if available
|
||||
if reranking_function and len(documents) > 1:
|
||||
log.debug("Applying reranking to Qdrant hybrid search results")
|
||||
|
||||
# Create documents for reranking
|
||||
docs_for_reranking = [
|
||||
Document(page_content=doc, metadata=meta)
|
||||
for doc, meta in zip(documents, metadatas)
|
||||
]
|
||||
|
||||
# Apply reranking - call with correct signature (single argument: list of query-doc pairs)
|
||||
query_doc_pairs = [(query, doc.page_content) for doc in docs_for_reranking]
|
||||
scores = reranking_function(query_doc_pairs)
|
||||
|
||||
# Process scores into ranked documents (similar to RerankCompressor)
|
||||
docs_with_scores = list(zip(docs_for_reranking, scores.tolist() if not isinstance(scores, list) else scores))
|
||||
|
||||
# Filter by relevance threshold
|
||||
if r > 0.0:
|
||||
docs_with_scores = [(d, s) for d, s in docs_with_scores if s >= r]
|
||||
|
||||
# Sort by score (highest first) and limit to k
|
||||
docs_with_scores = sorted(docs_with_scores, key=lambda x: x[1], reverse=True)[:k]
|
||||
|
||||
if docs_with_scores:
|
||||
# Extract final results
|
||||
final_documents = [doc.page_content for doc, score in docs_with_scores]
|
||||
final_metadatas = []
|
||||
final_distances = []
|
||||
|
||||
for doc, score in docs_with_scores:
|
||||
metadata = doc.metadata.copy()
|
||||
metadata["score"] = score
|
||||
final_metadatas.append(metadata)
|
||||
final_distances.append(score)
|
||||
|
||||
result_dict = {
|
||||
"distances": [final_distances],
|
||||
"documents": [final_documents],
|
||||
"metadatas": [final_metadatas],
|
||||
}
|
||||
else:
|
||||
# No documents passed relevance threshold, return empty result
|
||||
result_dict = {
|
||||
"distances": [[]],
|
||||
"documents": [[]],
|
||||
"metadatas": [[]],
|
||||
}
|
||||
else:
|
||||
# No reranking, just apply relevance threshold and limit
|
||||
if r > 0.0:
|
||||
# Filter by relevance threshold
|
||||
filtered_indices = [i for i, score in enumerate(distances) if score >= r][:k]
|
||||
filtered_documents = [documents[i] for i in filtered_indices]
|
||||
filtered_metadatas = [metadatas[i] for i in filtered_indices]
|
||||
filtered_distances = [distances[i] for i in filtered_indices]
|
||||
else:
|
||||
# No threshold, just limit to k
|
||||
filtered_documents = documents[:k]
|
||||
filtered_metadatas = metadatas[:k]
|
||||
filtered_distances = distances[:k]
|
||||
|
||||
result_dict = {
|
||||
"distances": [filtered_distances],
|
||||
"documents": [filtered_documents],
|
||||
"metadatas": [filtered_metadatas],
|
||||
}
|
||||
|
||||
log.info(
|
||||
f"query_doc_with_hybrid_search:qdrant_native_result {len(result_dict['documents'][0])} documents"
|
||||
)
|
||||
return result_dict
|
||||
else:
|
||||
log.warning("Qdrant hybrid search returned no results, falling back to LangChain approach")
|
||||
|
||||
|
||||
log.debug(f"query_doc_with_hybrid_search:doc {collection_name}")
|
||||
|
||||
bm25_texts = (
|
||||
|
|
@ -313,7 +410,7 @@ async def query_doc_with_hybrid_search(
|
|||
}
|
||||
|
||||
log.info(
|
||||
"query_doc_with_hybrid_search:result "
|
||||
"query_doc_with_hybrid_search:langchain_result "
|
||||
+ f'{result["metadatas"]} {result["distances"]}'
|
||||
)
|
||||
return result
|
||||
|
|
@ -1290,6 +1387,8 @@ class RerankCompressor(BaseDocumentCompressor):
|
|||
scores = None
|
||||
if reranking:
|
||||
scores = self.reranking_function(query, documents)
|
||||
# query_doc_pairs = [(query, doc.page_content) for doc in documents]
|
||||
# scores = self.reranking_function(query_doc_pairs)
|
||||
else:
|
||||
if not SENTENCE_TRANSFORMERS_AVAILABLE:
|
||||
raise ImportError("sentence_transformers is not available. Please install it to use reranking functionality.")
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ import logging
|
|||
from typing import Optional, Tuple, List, Dict, Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import grpc
|
||||
from open_webui.config import (
|
||||
QDRANT_API_KEY,
|
||||
QDRANT_GRPC_PORT,
|
||||
|
|
@ -12,6 +11,7 @@ from open_webui.config import (
|
|||
QDRANT_COLLECTION_PREFIX,
|
||||
QDRANT_TIMEOUT,
|
||||
QDRANT_HNSW_M,
|
||||
ENABLE_RAG_HYBRID_SEARCH,
|
||||
)
|
||||
from open_webui.env import SRC_LOG_LEVELS
|
||||
from open_webui.retrieval.vector.main import (
|
||||
|
|
@ -21,7 +21,6 @@ from open_webui.retrieval.vector.main import (
|
|||
VectorItem,
|
||||
)
|
||||
from qdrant_client import QdrantClient as Qclient
|
||||
from qdrant_client.http.exceptions import UnexpectedResponse
|
||||
from qdrant_client.http.models import PointStruct
|
||||
from qdrant_client.models import models
|
||||
|
||||
|
|
@ -141,23 +140,33 @@ class QdrantClient(VectorDBBase):
|
|||
):
|
||||
"""
|
||||
Creates a collection with multi-tenancy configuration and payload indexes for tenant_id and metadata fields.
|
||||
Also creates sparse vector configuration for hybrid search support.
|
||||
"""
|
||||
# Create collection with both dense and sparse vector support
|
||||
self.client.create_collection(
|
||||
collection_name=mt_collection_name,
|
||||
vectors_config=models.VectorParams(
|
||||
size=dimension,
|
||||
distance=models.Distance.COSINE,
|
||||
on_disk=self.QDRANT_ON_DISK,
|
||||
),
|
||||
vectors_config={
|
||||
"dense": models.VectorParams(
|
||||
size=dimension,
|
||||
distance=models.Distance.COSINE,
|
||||
on_disk=self.QDRANT_ON_DISK,
|
||||
)
|
||||
},
|
||||
# Disable global index building due to multitenancy
|
||||
# For more details https://qdrant.tech/documentation/guides/multiple-partitions/#calibrate-performance
|
||||
hnsw_config=models.HnswConfigDiff(
|
||||
payload_m=self.QDRANT_HNSW_M,
|
||||
m=0,
|
||||
),
|
||||
# Add sparse vectors configuration for BM25-like hybrid search
|
||||
sparse_vectors_config={
|
||||
"bm25": models.SparseVectorParams(
|
||||
modifier=models.Modifier.IDF,
|
||||
)
|
||||
},
|
||||
)
|
||||
log.info(
|
||||
f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
|
||||
f"Multi-tenant collection {mt_collection_name} created with dimension {dimension} and sparse vector support!"
|
||||
)
|
||||
|
||||
self.client.create_payload_index(
|
||||
|
|
@ -208,6 +217,190 @@ class QdrantClient(VectorDBBase):
|
|||
if not self.client.collection_exists(collection_name=mt_collection_name):
|
||||
self._create_multi_tenant_collection(mt_collection_name, dimension)
|
||||
|
||||
def _hybrid_search(
|
||||
self,
|
||||
collection_name: str,
|
||||
query_vector: List[float],
|
||||
query_text: str,
|
||||
limit: int,
|
||||
) -> Optional[SearchResult]:
|
||||
"""
|
||||
Perform Qdrant native hybrid search using prefetch + RRF fusion.
|
||||
|
||||
This method uses Qdrant's named vectors (dense + sparse) with prefetch queries
|
||||
and applies RRF (Reciprocal Rank Fusion) for optimal result combination.
|
||||
|
||||
Args:
|
||||
collection_name: Name of the collection to search
|
||||
query_vector: Dense vector representation of the query
|
||||
query_text: Text query for sparse vector generation
|
||||
limit: Maximum number of results to return
|
||||
|
||||
Returns:
|
||||
SearchResult with RRF-fused hybrid results
|
||||
"""
|
||||
if not self.client or not query_vector:
|
||||
return None
|
||||
|
||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||
if not self.client.collection_exists(collection_name=mt_collection):
|
||||
log.debug(f"Collection {mt_collection} doesn't exist, hybrid search returns None")
|
||||
return None
|
||||
|
||||
tenant_filter = _tenant_filter(tenant_id)
|
||||
|
||||
try:
|
||||
# Use Qdrant's native hybrid search with prefetch + RRF fusion (like user's example)
|
||||
# Generate sparse vector from query text using FastEmbed or fallback
|
||||
sparse_vector = self._query_to_sparse_vector(query_text)
|
||||
|
||||
# Create prefetch queries for both dense and sparse vectors
|
||||
prefetch_queries = [
|
||||
# Dense vector prefetch
|
||||
models.Prefetch(
|
||||
query=query_vector,
|
||||
using="dense",
|
||||
limit=limit * 2, # Get more candidates for better fusion
|
||||
filter=models.Filter(must=[tenant_filter]),
|
||||
)
|
||||
]
|
||||
|
||||
# Add sparse vector prefetch if we have terms
|
||||
if sparse_vector["indices"]:
|
||||
prefetch_queries.append(
|
||||
models.Prefetch(
|
||||
query=models.SparseVector(
|
||||
indices=sparse_vector["indices"],
|
||||
values=sparse_vector["values"],
|
||||
),
|
||||
using="bm25",
|
||||
limit=limit * 2,
|
||||
filter=models.Filter(must=[tenant_filter]),
|
||||
)
|
||||
)
|
||||
|
||||
# Use Qdrant's native fusion - RRF is currently the most robust option
|
||||
# For custom weighting, Qdrant supports score formulas (see alternative implementation below)
|
||||
query_response = self.client.query_points(
|
||||
collection_name=mt_collection,
|
||||
prefetch=prefetch_queries,
|
||||
query=models.FusionQuery(
|
||||
fusion=models.Fusion.RRF,
|
||||
),
|
||||
limit=limit,
|
||||
with_payload=True,
|
||||
)
|
||||
|
||||
get_result = self._result_to_get_result(query_response.points)
|
||||
return SearchResult(
|
||||
ids=get_result.ids,
|
||||
documents=get_result.documents,
|
||||
metadatas=get_result.metadatas,
|
||||
distances=[[(point.score + 1.0) / 2.0 for point in query_response.points]],
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log.warning(f"Qdrant native hybrid search failed, trying fallback: {e}")
|
||||
|
||||
# Fallback to client-side hybrid scoring if native approach fails
|
||||
try:
|
||||
candidates_limit = max(limit * 3, 100)
|
||||
dense_results = self.client.query_points(
|
||||
collection_name=mt_collection,
|
||||
query=query_vector,
|
||||
using="dense", # Use named dense vector
|
||||
limit=candidates_limit,
|
||||
query_filter=models.Filter(must=[tenant_filter]),
|
||||
)
|
||||
|
||||
# Apply simple score normalization for fallback
|
||||
get_result = self._result_to_get_result(dense_results.points)
|
||||
return SearchResult(
|
||||
ids=get_result.ids,
|
||||
documents=get_result.documents,
|
||||
metadatas=get_result.metadatas,
|
||||
distances=[[(point.score + 1.0) / 2.0 for point in dense_results.points]],
|
||||
)
|
||||
|
||||
except Exception as e2:
|
||||
log.warning(f"Fallback hybrid search failed: {e2}")
|
||||
# Final fallback to regular dense search
|
||||
return self.search(collection_name, [query_vector], limit)
|
||||
|
||||
|
||||
def _get_bm25_embedding_model(self):
|
||||
"""
|
||||
Get or create the BM25 embedding model using FastEmbed.
|
||||
"""
|
||||
if not hasattr(self, '_bm25_model') or self._bm25_model is None:
|
||||
try:
|
||||
from fastembed import SparseTextEmbedding # type: ignore
|
||||
self._bm25_model = SparseTextEmbedding("Qdrant/bm25")
|
||||
log.info("Initialized FastEmbed BM25 sparse embedding model")
|
||||
except ImportError:
|
||||
log.warning("FastEmbed not available, will use fallback sparse vector generation")
|
||||
self._bm25_model = None
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to initialize FastEmbed BM25 model: {e}")
|
||||
self._bm25_model = None
|
||||
return self._bm25_model
|
||||
|
||||
def _text_to_sparse_vector(self, text: str) -> Dict[str, List]:
|
||||
"""
|
||||
Convert text to sparse vector representation using FastEmbed's BM25 model.
|
||||
Falls back to simple implementation if FastEmbed is not available.
|
||||
"""
|
||||
if not text:
|
||||
return {"indices": [], "values": []}
|
||||
|
||||
# Try to use FastEmbed's BM25 model first
|
||||
bm25_model = self._get_bm25_embedding_model()
|
||||
if bm25_model is not None:
|
||||
try:
|
||||
# Use FastEmbed to generate proper BM25 sparse embedding
|
||||
sparse_embeddings = list(bm25_model.passage_embed([text]))
|
||||
if sparse_embeddings and len(sparse_embeddings) > 0:
|
||||
sparse_embedding = sparse_embeddings[0]
|
||||
# Convert to the format expected by Qdrant
|
||||
return {
|
||||
"indices": sparse_embedding.indices.tolist(),
|
||||
"values": sparse_embedding.values.tolist()
|
||||
}
|
||||
except Exception as e:
|
||||
log.warning(f"FastEmbed BM25 embedding failed: {e}, using fallback")
|
||||
|
||||
# No FastEmbed available, return empty sparse vector
|
||||
return {"indices": [], "values": []}
|
||||
|
||||
def _query_to_sparse_vector(self, query_text: str) -> Dict[str, List]:
|
||||
"""
|
||||
Convert query text to sparse vector representation using FastEmbed's BM25 model.
|
||||
Uses query_embed for better query representation vs passage_embed.
|
||||
"""
|
||||
if not query_text:
|
||||
return {"indices": [], "values": []}
|
||||
|
||||
# Try to use FastEmbed's BM25 model for query embedding
|
||||
bm25_model = self._get_bm25_embedding_model()
|
||||
if bm25_model is not None:
|
||||
try:
|
||||
# Use query_embed for queries (optimized for query representation)
|
||||
sparse_embeddings = list(bm25_model.query_embed([query_text]))
|
||||
if sparse_embeddings and len(sparse_embeddings) > 0:
|
||||
sparse_embedding = sparse_embeddings[0]
|
||||
# Convert to the format expected by Qdrant
|
||||
return {
|
||||
"indices": sparse_embedding.indices.tolist(),
|
||||
"values": sparse_embedding.values.tolist()
|
||||
}
|
||||
except Exception as e:
|
||||
log.warning(f"FastEmbed BM25 query embedding failed: {e}, using fallback")
|
||||
|
||||
# No FastEmbed available, return empty sparse vector
|
||||
return {"indices": [], "values": []}
|
||||
|
||||
|
||||
|
||||
def has_collection(self, collection_name: str) -> bool:
|
||||
"""
|
||||
Check if a logical collection exists by checking for any points with the tenant ID.
|
||||
|
|
@ -256,25 +449,49 @@ class QdrantClient(VectorDBBase):
|
|||
)
|
||||
|
||||
def search(
|
||||
self, collection_name: str, vectors: List[List[float | int]], limit: int
|
||||
self, collection_name: str, vectors: List[List[float | int]], limit: int, query_text: Optional[str] = None
|
||||
) -> Optional[SearchResult]:
|
||||
"""
|
||||
Search for the nearest neighbor items based on the vectors with tenant isolation.
|
||||
Uses hybrid search when ENABLE_RAG_HYBRID_SEARCH is True and query_text is provided.
|
||||
"""
|
||||
if not self.client or not vectors:
|
||||
return None
|
||||
|
||||
# Use hybrid search if enabled and query text is available
|
||||
if ENABLE_RAG_HYBRID_SEARCH and query_text:
|
||||
return self._hybrid_search(
|
||||
collection_name=collection_name,
|
||||
query_vector=vectors[0],
|
||||
query_text=query_text,
|
||||
limit=limit
|
||||
)
|
||||
|
||||
# Fallback to regular dense vector search
|
||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||
if not self.client.collection_exists(collection_name=mt_collection):
|
||||
log.debug(f"Collection {mt_collection} doesn't exist, search returns None")
|
||||
return None
|
||||
|
||||
tenant_filter = _tenant_filter(tenant_id)
|
||||
query_response = self.client.query_points(
|
||||
collection_name=mt_collection,
|
||||
query=vectors[0],
|
||||
limit=limit,
|
||||
query_filter=models.Filter(must=[tenant_filter]),
|
||||
)
|
||||
try:
|
||||
# Try to use named dense vector first (for hybrid-enabled collections)
|
||||
query_response = self.client.query_points(
|
||||
collection_name=mt_collection,
|
||||
query=vectors[0],
|
||||
using="dense",
|
||||
limit=limit,
|
||||
query_filter=models.Filter(must=[tenant_filter]),
|
||||
)
|
||||
except Exception:
|
||||
# Fallback for collections without named vectors (legacy collections)
|
||||
query_response = self.client.query_points(
|
||||
collection_name=mt_collection,
|
||||
query=vectors[0],
|
||||
limit=limit,
|
||||
query_filter=models.Filter(must=[tenant_filter]),
|
||||
)
|
||||
|
||||
get_result = self._result_to_get_result(query_response.points)
|
||||
return SearchResult(
|
||||
ids=get_result.ids,
|
||||
|
|
@ -327,14 +544,44 @@ class QdrantClient(VectorDBBase):
|
|||
|
||||
def upsert(self, collection_name: str, items: List[VectorItem]):
|
||||
"""
|
||||
Upsert items with tenant ID.
|
||||
Upsert items with tenant ID, dense vectors, and sparse vectors for hybrid search.
|
||||
"""
|
||||
if not self.client or not items:
|
||||
return None
|
||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||
dimension = len(items[0]["vector"])
|
||||
dimension = len(items[0]["vector"]) # type: ignore # Items are dicts, not VectorItem instances
|
||||
self._ensure_collection(mt_collection, dimension)
|
||||
points = self._create_points(items, tenant_id)
|
||||
|
||||
# Create points with both dense and sparse vectors
|
||||
points = []
|
||||
for item in items:
|
||||
# Generate sparse vector from text content for BM25-like search
|
||||
sparse_vector = self._text_to_sparse_vector(item["text"]) # type: ignore
|
||||
|
||||
# Create vector dict with named vectors (similar to user's example)
|
||||
vector_dict = {
|
||||
"dense": item["vector"], # type: ignore # Dense semantic vector
|
||||
}
|
||||
|
||||
# Add sparse vector if we have terms
|
||||
if sparse_vector["indices"]:
|
||||
vector_dict["bm25"] = models.SparseVector( # type: ignore
|
||||
indices=sparse_vector["indices"],
|
||||
values=sparse_vector["values"],
|
||||
)
|
||||
|
||||
points.append(
|
||||
PointStruct(
|
||||
id=item["id"], # type: ignore
|
||||
vector=vector_dict, # type: ignore # Qdrant client accepts dict for named vectors
|
||||
payload={
|
||||
"text": item["text"], # type: ignore
|
||||
"metadata": item["metadata"], # type: ignore
|
||||
TENANT_ID_FIELD: tenant_id,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
self.client.upload_points(mt_collection, points)
|
||||
return None
|
||||
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ opensearch-py==2.8.0
|
|||
|
||||
transformers==4.57.3
|
||||
sentence-transformers==5.1.2
|
||||
fastembed==0.7.1
|
||||
accelerate
|
||||
pyarrow==20.0.0 # fix: pin pyarrow version to 20 for rpi compatibility #15897
|
||||
einops==0.8.1
|
||||
|
|
|
|||
Loading…
Reference in a new issue