Merge pull request #15289 from Anush008/main

refactor: Updated Qdrant multi-tenancy implementation
This commit is contained in:
Tim Jaeryang Baek 2025-07-04 23:49:11 +04:00 committed by GitHub
commit a3add18fa9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 150 additions and 532 deletions

View file

@ -1840,10 +1840,10 @@ MILVUS_IVF_FLAT_NLIST = int(os.environ.get("MILVUS_IVF_FLAT_NLIST", "128"))
QDRANT_URI = os.environ.get("QDRANT_URI", None) QDRANT_URI = os.environ.get("QDRANT_URI", None)
QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", None) QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", None)
QDRANT_ON_DISK = os.environ.get("QDRANT_ON_DISK", "false").lower() == "true" QDRANT_ON_DISK = os.environ.get("QDRANT_ON_DISK", "false").lower() == "true"
QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "False").lower() == "true" QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "false").lower() == "true"
QDRANT_GRPC_PORT = int(os.environ.get("QDRANT_GRPC_PORT", "6334")) QDRANT_GRPC_PORT = int(os.environ.get("QDRANT_GRPC_PORT", "6334"))
ENABLE_QDRANT_MULTITENANCY_MODE = ( ENABLE_QDRANT_MULTITENANCY_MODE = (
os.environ.get("ENABLE_QDRANT_MULTITENANCY_MODE", "false").lower() == "true" os.environ.get("ENABLE_QDRANT_MULTITENANCY_MODE", "true").lower() == "true"
) )
QDRANT_COLLECTION_PREFIX = os.environ.get("QDRANT_COLLECTION_PREFIX", "open-webui") QDRANT_COLLECTION_PREFIX = os.environ.get("QDRANT_COLLECTION_PREFIX", "open-webui")

View file

@ -1,5 +1,5 @@
import logging import logging
from typing import Optional, Tuple from typing import Optional, Tuple, List, Dict, Any
from urllib.parse import urlparse from urllib.parse import urlparse
import grpc import grpc
@ -24,11 +24,25 @@ from qdrant_client.http.models import PointStruct
from qdrant_client.models import models from qdrant_client.models import models
NO_LIMIT = 999999999 NO_LIMIT = 999999999
TENANT_ID_FIELD = "tenant_id"
DEFAULT_DIMENSION = 384
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["RAG"]) log.setLevel(SRC_LOG_LEVELS["RAG"])
def _tenant_filter(tenant_id: str) -> models.FieldCondition:
return models.FieldCondition(
key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id)
)
def _metadata_filter(key: str, value: Any) -> models.FieldCondition:
return models.FieldCondition(
key=f"metadata.{key}", match=models.MatchValue(value=value)
)
class QdrantClient(VectorDBBase): class QdrantClient(VectorDBBase):
def __init__(self): def __init__(self):
self.collection_prefix = QDRANT_COLLECTION_PREFIX self.collection_prefix = QDRANT_COLLECTION_PREFIX
@ -39,24 +53,26 @@ class QdrantClient(VectorDBBase):
self.GRPC_PORT = QDRANT_GRPC_PORT self.GRPC_PORT = QDRANT_GRPC_PORT
if not self.QDRANT_URI: if not self.QDRANT_URI:
self.client = None raise ValueError(
return "QDRANT_URI is not set. Please configure it in the environment variables."
)
# Unified handling for either scheme # Unified handling for either scheme
parsed = urlparse(self.QDRANT_URI) parsed = urlparse(self.QDRANT_URI)
host = parsed.hostname or self.QDRANT_URI host = parsed.hostname or self.QDRANT_URI
http_port = parsed.port or 6333 # default REST port http_port = parsed.port or 6333 # default REST port
if self.PREFER_GRPC: self.client = (
self.client = Qclient( Qclient(
host=host, host=host,
port=http_port, port=http_port,
grpc_port=self.GRPC_PORT, grpc_port=self.GRPC_PORT,
prefer_grpc=self.PREFER_GRPC, prefer_grpc=self.PREFER_GRPC,
api_key=self.QDRANT_API_KEY, api_key=self.QDRANT_API_KEY,
) )
else: if self.PREFER_GRPC
self.client = Qclient(url=self.QDRANT_URI, api_key=self.QDRANT_API_KEY) else Qclient(url=self.QDRANT_URI, api_key=self.QDRANT_API_KEY)
)
# Main collection types for multi-tenancy # Main collection types for multi-tenancy
self.MEMORY_COLLECTION = f"{self.collection_prefix}_memories" self.MEMORY_COLLECTION = f"{self.collection_prefix}_memories"
@ -66,23 +82,13 @@ class QdrantClient(VectorDBBase):
self.HASH_BASED_COLLECTION = f"{self.collection_prefix}_hash-based" self.HASH_BASED_COLLECTION = f"{self.collection_prefix}_hash-based"
def _result_to_get_result(self, points) -> GetResult: def _result_to_get_result(self, points) -> GetResult:
ids = [] ids, documents, metadatas = [], [], []
documents = []
metadatas = []
for point in points: for point in points:
payload = point.payload payload = point.payload
ids.append(point.id) ids.append(point.id)
documents.append(payload["text"]) documents.append(payload["text"])
metadatas.append(payload["metadata"]) metadatas.append(payload["metadata"])
return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas])
return GetResult(
**{
"ids": [ids],
"documents": [documents],
"metadatas": [metadatas],
}
)
def _get_collection_and_tenant_id(self, collection_name: str) -> Tuple[str, str]: def _get_collection_and_tenant_id(self, collection_name: str) -> Tuple[str, str]:
""" """
@ -114,162 +120,47 @@ class QdrantClient(VectorDBBase):
else: else:
return self.KNOWLEDGE_COLLECTION, tenant_id return self.KNOWLEDGE_COLLECTION, tenant_id
def _extract_error_message(self, exception): def _create_multi_tenant_collection(
""" self, mt_collection_name: str, dimension: int = DEFAULT_DIMENSION
Extract error message from either HTTP or gRPC exceptions
Returns:
tuple: (status_code, error_message)
"""
# Check if it's an HTTP exception
if isinstance(exception, UnexpectedResponse):
try:
error_data = exception.structured()
error_msg = error_data.get("status", {}).get("error", "")
return exception.status_code, error_msg
except Exception as inner_e:
log.error(f"Failed to parse HTTP error: {inner_e}")
return exception.status_code, str(exception)
# Check if it's a gRPC exception
elif isinstance(exception, grpc.RpcError):
# Extract status code from gRPC error
status_code = None
if hasattr(exception, "code") and callable(exception.code):
status_code = exception.code().value[0]
# Extract error message
error_msg = str(exception)
if "details =" in error_msg:
# Parse the details line which contains the actual error message
try:
details_line = [
line.strip()
for line in error_msg.split("\n")
if "details =" in line
][0]
error_msg = details_line.split("details =")[1].strip(' "')
except (IndexError, AttributeError):
# Fall back to full message if parsing fails
pass
return status_code, error_msg
# For any other type of exception
return None, str(exception)
def _is_collection_not_found_error(self, exception):
"""
Check if the exception is due to collection not found, supporting both HTTP and gRPC
"""
status_code, error_msg = self._extract_error_message(exception)
# HTTP error (404)
if (
status_code == 404
and "Collection" in error_msg
and "doesn't exist" in error_msg
):
return True
# gRPC error (NOT_FOUND status)
if (
isinstance(exception, grpc.RpcError)
and exception.code() == grpc.StatusCode.NOT_FOUND
):
return True
return False
def _is_dimension_mismatch_error(self, exception):
"""
Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC
"""
status_code, error_msg = self._extract_error_message(exception)
# Common patterns in both HTTP and gRPC
return (
"Vector dimension error" in error_msg
or "dimensions mismatch" in error_msg
or "invalid vector size" in error_msg
)
def _create_multi_tenant_collection_if_not_exists(
self, mt_collection_name: str, dimension: int = 384
): ):
""" """
Creates a collection with multi-tenancy configuration if it doesn't exist. Creates a collection with multi-tenancy configuration and payload indexes for tenant_id and metadata fields.
Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'.
When creating collections dynamically (insert/upsert), the actual vector dimensions will be used.
""" """
try: self.client.create_collection(
# Try to create the collection directly - will fail if it already exists collection_name=mt_collection_name,
self.client.create_collection( vectors_config=models.VectorParams(
collection_name=mt_collection_name, size=dimension,
vectors_config=models.VectorParams( distance=models.Distance.COSINE,
size=dimension, on_disk=self.QDRANT_ON_DISK,
distance=models.Distance.COSINE, ),
on_disk=self.QDRANT_ON_DISK, )
), log.info(
hnsw_config=models.HnswConfigDiff( f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
payload_m=16, # Enable per-tenant indexing )
m=0,
on_disk=self.QDRANT_ON_DISK,
),
)
# Create tenant ID payload index self.client.create_payload_index(
collection_name=mt_collection_name,
field_name=TENANT_ID_FIELD,
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
on_disk=self.QDRANT_ON_DISK,
),
)
for field in ("metadata.hash", "metadata.file_id"):
self.client.create_payload_index( self.client.create_payload_index(
collection_name=mt_collection_name, collection_name=mt_collection_name,
field_name="tenant_id", field_name=field,
field_schema=models.KeywordIndexParams( field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD, type=models.KeywordIndexType.KEYWORD,
is_tenant=True,
on_disk=self.QDRANT_ON_DISK,
),
wait=True,
)
# Create payload indexes for efficient filtering on metadata.hash and metadata.file_id
self.client.create_payload_index(
collection_name=mt_collection_name,
field_name="metadata.hash",
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=False,
on_disk=self.QDRANT_ON_DISK,
),
)
self.client.create_payload_index(
collection_name=mt_collection_name,
field_name="metadata.file_id",
field_schema=models.KeywordIndexParams(
type=models.KeywordIndexType.KEYWORD,
is_tenant=False,
on_disk=self.QDRANT_ON_DISK, on_disk=self.QDRANT_ON_DISK,
), ),
) )
log.info( def _create_points(
f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!" self, items: List[VectorItem], tenant_id: str
) ) -> List[PointStruct]:
except (UnexpectedResponse, grpc.RpcError) as e:
# Check for the specific error indicating collection already exists
status_code, error_msg = self._extract_error_message(e)
# HTTP status code 409 or gRPC ALREADY_EXISTS
if (isinstance(e, UnexpectedResponse) and status_code == 409) or (
isinstance(e, grpc.RpcError)
and e.code() == grpc.StatusCode.ALREADY_EXISTS
):
if "already exists" in error_msg:
log.debug(f"Collection {mt_collection_name} already exists")
return
# If it's not an already exists error, re-raise
raise e
except Exception as e:
raise e
def _create_points(self, items: list[VectorItem], tenant_id: str):
""" """
Create point structs from vector items with tenant ID. Create point structs from vector items with tenant ID.
""" """
@ -280,56 +171,42 @@ class QdrantClient(VectorDBBase):
payload={ payload={
"text": item["text"], "text": item["text"],
"metadata": item["metadata"], "metadata": item["metadata"],
"tenant_id": tenant_id, TENANT_ID_FIELD: tenant_id,
}, },
) )
for item in items for item in items
] ]
def _ensure_collection(
self, mt_collection_name: str, dimension: int = DEFAULT_DIMENSION
):
"""
Ensure the collection exists and payload indexes are created for tenant_id and metadata fields.
"""
if not self.client.collection_exists(collection_name=mt_collection_name):
self._create_multi_tenant_collection(mt_collection_name, dimension)
def has_collection(self, collection_name: str) -> bool: def has_collection(self, collection_name: str) -> bool:
""" """
Check if a logical collection exists by checking for any points with the tenant ID. Check if a logical collection exists by checking for any points with the tenant ID.
""" """
if not self.client: if not self.client:
return False return False
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
if not self.client.collection_exists(collection_name=mt_collection):
# Create tenant filter
tenant_filter = models.FieldCondition(
key="tenant_id", match=models.MatchValue(value=tenant_id)
)
try:
# Try directly querying - most of the time collection should exist
response = self.client.query_points(
collection_name=mt_collection,
query_filter=models.Filter(must=[tenant_filter]),
limit=1,
)
# Collection exists with this tenant ID if there are points
return len(response.points) > 0
except (UnexpectedResponse, grpc.RpcError) as e:
if self._is_collection_not_found_error(e):
log.debug(f"Collection {mt_collection} doesn't exist")
return False
else:
# For other API errors, log and return False
_, error_msg = self._extract_error_message(e)
log.warning(f"Unexpected Qdrant error: {error_msg}")
return False
except Exception as e:
# For any other errors, log and return False
log.debug(f"Error checking collection {mt_collection}: {e}")
return False return False
tenant_filter = _tenant_filter(tenant_id)
count_result = self.client.count(
collection_name=mt_collection,
count_filter=models.Filter(must=[tenant_filter]),
)
return count_result.count > 0
def delete( def delete(
self, self,
collection_name: str, collection_name: str,
ids: Optional[list[str]] = None, ids: Optional[List[str]] = None,
filter: Optional[dict] = None, filter: Optional[Dict[str, Any]] = None,
): ):
""" """
Delete vectors by ID or filter from a collection with tenant isolation. Delete vectors by ID or filter from a collection with tenant isolation.
@ -337,189 +214,76 @@ class QdrantClient(VectorDBBase):
if not self.client: if not self.client:
return None return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
if not self.client.collection_exists(collection_name=mt_collection):
log.debug(f"Collection {mt_collection} doesn't exist, nothing to delete")
return None
# Create tenant filter must_conditions = [_tenant_filter(tenant_id)]
tenant_filter = models.FieldCondition( should_conditions = []
key="tenant_id", match=models.MatchValue(value=tenant_id) if ids:
should_conditions = [_metadata_filter("id", id_value) for id_value in ids]
elif filter:
must_conditions += [_metadata_filter(k, v) for k, v in filter.items()]
return self.client.delete(
collection_name=mt_collection,
points_selector=models.FilterSelector(
filter=models.Filter(must=must_conditions, should=should_conditions)
),
) )
must_conditions = [tenant_filter]
should_conditions = []
if ids:
for id_value in ids:
should_conditions.append(
models.FieldCondition(
key="metadata.id",
match=models.MatchValue(value=id_value),
),
)
elif filter:
for key, value in filter.items():
must_conditions.append(
models.FieldCondition(
key=f"metadata.{key}",
match=models.MatchValue(value=value),
),
)
try:
# Try to delete directly - most of the time collection should exist
update_result = self.client.delete(
collection_name=mt_collection,
points_selector=models.FilterSelector(
filter=models.Filter(must=must_conditions, should=should_conditions)
),
)
return update_result
except (UnexpectedResponse, grpc.RpcError) as e:
if self._is_collection_not_found_error(e):
log.debug(
f"Collection {mt_collection} doesn't exist, nothing to delete"
)
return None
else:
# For other API errors, log and re-raise
_, error_msg = self._extract_error_message(e)
log.warning(f"Unexpected Qdrant error: {error_msg}")
raise
except Exception as e:
# For non-Qdrant exceptions, re-raise
raise
def search( def search(
self, collection_name: str, vectors: list[list[float | int]], limit: int self, collection_name: str, vectors: List[List[float | int]], limit: int
) -> Optional[SearchResult]: ) -> Optional[SearchResult]:
""" """
Search for the nearest neighbor items based on the vectors with tenant isolation. Search for the nearest neighbor items based on the vectors with tenant isolation.
""" """
if not self.client: if not self.client or not vectors:
return None return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
if not self.client.collection_exists(collection_name=mt_collection):
# Get the vector dimension from the query vector log.debug(f"Collection {mt_collection} doesn't exist, search returns None")
dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None
try:
# Try the search operation directly - most of the time collection should exist
# Create tenant filter
tenant_filter = models.FieldCondition(
key="tenant_id", match=models.MatchValue(value=tenant_id)
)
# Ensure vector dimensions match the collection
collection_dim = self.client.get_collection(
mt_collection
).config.params.vectors.size
if collection_dim != dimension:
if collection_dim < dimension:
vectors = [vector[:collection_dim] for vector in vectors]
else:
vectors = [
vector + [0] * (collection_dim - dimension)
for vector in vectors
]
# Search with tenant filter
prefetch_query = models.Prefetch(
filter=models.Filter(must=[tenant_filter]),
limit=NO_LIMIT,
)
query_response = self.client.query_points(
collection_name=mt_collection,
query=vectors[0],
prefetch=prefetch_query,
limit=limit,
)
get_result = self._result_to_get_result(query_response.points)
return SearchResult(
ids=get_result.ids,
documents=get_result.documents,
metadatas=get_result.metadatas,
# qdrant distance is [-1, 1], normalize to [0, 1]
distances=[
[(point.score + 1.0) / 2.0 for point in query_response.points]
],
)
except (UnexpectedResponse, grpc.RpcError) as e:
if self._is_collection_not_found_error(e):
log.debug(
f"Collection {mt_collection} doesn't exist, search returns None"
)
return None
else:
# For other API errors, log and re-raise
_, error_msg = self._extract_error_message(e)
log.warning(f"Unexpected Qdrant error during search: {error_msg}")
raise
except Exception as e:
# For non-Qdrant exceptions, log and return None
log.exception(f"Error searching collection '{collection_name}': {e}")
return None return None
def query(self, collection_name: str, filter: dict, limit: Optional[int] = None): tenant_filter = _tenant_filter(tenant_id)
query_response = self.client.query_points(
collection_name=mt_collection,
query=vectors[0],
limit=limit,
query_filter=models.Filter(must=[tenant_filter]),
)
get_result = self._result_to_get_result(query_response.points)
return SearchResult(
ids=get_result.ids,
documents=get_result.documents,
metadatas=get_result.metadatas,
distances=[[(point.score + 1.0) / 2.0 for point in query_response.points]],
)
def query(
self, collection_name: str, filter: Dict[str, Any], limit: Optional[int] = None
):
""" """
Query points with filters and tenant isolation. Query points with filters and tenant isolation.
""" """
if not self.client: if not self.client:
return None return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
if not self.client.collection_exists(collection_name=mt_collection):
# Set default limit if not provided log.debug(f"Collection {mt_collection} doesn't exist, query returns None")
return None
if limit is None: if limit is None:
limit = NO_LIMIT limit = NO_LIMIT
tenant_filter = _tenant_filter(tenant_id)
# Create tenant filter field_conditions = [_metadata_filter(k, v) for k, v in filter.items()]
tenant_filter = models.FieldCondition(
key="tenant_id", match=models.MatchValue(value=tenant_id)
)
# Create metadata filters
field_conditions = []
for key, value in filter.items():
field_conditions.append(
models.FieldCondition(
key=f"metadata.{key}", match=models.MatchValue(value=value)
)
)
# Combine tenant filter with metadata filters
combined_filter = models.Filter(must=[tenant_filter, *field_conditions]) combined_filter = models.Filter(must=[tenant_filter, *field_conditions])
points = self.client.query_points(
try: collection_name=mt_collection,
# Try the query directly - most of the time collection should exist query_filter=combined_filter,
points = self.client.query_points( limit=limit,
collection_name=mt_collection, )
query_filter=combined_filter, return self._result_to_get_result(points.points)
limit=limit,
)
return self._result_to_get_result(points.points)
except (UnexpectedResponse, grpc.RpcError) as e:
if self._is_collection_not_found_error(e):
log.debug(
f"Collection {mt_collection} doesn't exist, query returns None"
)
return None
else:
# For other API errors, log and re-raise
_, error_msg = self._extract_error_message(e)
log.warning(f"Unexpected Qdrant error during query: {error_msg}")
raise
except Exception as e:
# For non-Qdrant exceptions, log and re-raise
log.exception(f"Error querying collection '{collection_name}': {e}")
return None
def get(self, collection_name: str) -> Optional[GetResult]: def get(self, collection_name: str) -> Optional[GetResult]:
""" """
@ -527,169 +291,36 @@ class QdrantClient(VectorDBBase):
""" """
if not self.client: if not self.client:
return None return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
if not self.client.collection_exists(collection_name=mt_collection):
# Create tenant filter log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
tenant_filter = models.FieldCondition(
key="tenant_id", match=models.MatchValue(value=tenant_id)
)
try:
# Try to get points directly - most of the time collection should exist
points = self.client.query_points(
collection_name=mt_collection,
query_filter=models.Filter(must=[tenant_filter]),
limit=NO_LIMIT,
)
return self._result_to_get_result(points.points)
except (UnexpectedResponse, grpc.RpcError) as e:
if self._is_collection_not_found_error(e):
log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
return None
else:
# For other API errors, log and re-raise
_, error_msg = self._extract_error_message(e)
log.warning(f"Unexpected Qdrant error during get: {error_msg}")
raise
except Exception as e:
# For non-Qdrant exceptions, log and return None
log.exception(f"Error getting collection '{collection_name}': {e}")
return None return None
tenant_filter = _tenant_filter(tenant_id)
def _handle_operation_with_error_retry( points = self.client.query_points(
self, operation_name, mt_collection, points, dimension collection_name=mt_collection,
): query_filter=models.Filter(must=[tenant_filter]),
""" limit=NO_LIMIT,
Private helper to handle common error cases for insert and upsert operations.
Args:
operation_name: 'insert' or 'upsert'
mt_collection: The multi-tenant collection name
points: The vector points to insert/upsert
dimension: The dimension of the vectors
Returns:
The operation result (for upsert) or None (for insert)
"""
try:
if operation_name == "insert":
self.client.upload_points(mt_collection, points)
return None
else: # upsert
return self.client.upsert(mt_collection, points)
except (UnexpectedResponse, grpc.RpcError) as e:
# Handle collection not found
if self._is_collection_not_found_error(e):
log.info(
f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}."
)
# Create collection with correct dimensions from our vectors
self._create_multi_tenant_collection_if_not_exists(
mt_collection_name=mt_collection, dimension=dimension
)
# Try operation again - no need for dimension adjustment since we just created with correct dimensions
if operation_name == "insert":
self.client.upload_points(mt_collection, points)
return None
else: # upsert
return self.client.upsert(mt_collection, points)
# Handle dimension mismatch
elif self._is_dimension_mismatch_error(e):
# For dimension errors, the collection must exist, so get its configuration
mt_collection_info = self.client.get_collection(mt_collection)
existing_size = mt_collection_info.config.params.vectors.size
log.info(
f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}"
)
if existing_size < dimension:
# Truncate vectors to fit
log.info(
f"Truncating vectors from {dimension} to {existing_size} dimensions"
)
points = [
PointStruct(
id=point.id,
vector=point.vector[:existing_size],
payload=point.payload,
)
for point in points
]
elif existing_size > dimension:
# Pad vectors with zeros
log.info(
f"Padding vectors from {dimension} to {existing_size} dimensions with zeros"
)
points = [
PointStruct(
id=point.id,
vector=point.vector
+ [0] * (existing_size - len(point.vector)),
payload=point.payload,
)
for point in points
]
# Try operation again with adjusted dimensions
if operation_name == "insert":
self.client.upload_points(mt_collection, points)
return None
else: # upsert
return self.client.upsert(mt_collection, points)
else:
# Not a known error we can handle, log and re-raise
_, error_msg = self._extract_error_message(e)
log.warning(f"Unhandled Qdrant error: {error_msg}")
raise
except Exception as e:
# For non-Qdrant exceptions, re-raise
raise
def insert(self, collection_name: str, items: list[VectorItem]):
"""
Insert items with tenant ID.
"""
if not self.client or not items:
return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
# Get dimensions from the actual vectors
dimension = len(items[0]["vector"]) if items else None
# Create points with tenant ID
points = self._create_points(items, tenant_id)
# Handle the operation with error retry
return self._handle_operation_with_error_retry(
"insert", mt_collection, points, dimension
) )
return self._result_to_get_result(points.points)
def upsert(self, collection_name: str, items: list[VectorItem]): def upsert(self, collection_name: str, items: List[VectorItem]):
""" """
Upsert items with tenant ID. Upsert items with tenant ID.
""" """
if not self.client or not items: if not self.client or not items:
return None return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
dimension = len(items[0]["vector"])
# Get dimensions from the actual vectors self._ensure_collection(mt_collection, dimension)
dimension = len(items[0]["vector"]) if items else None
# Create points with tenant ID
points = self._create_points(items, tenant_id) points = self._create_points(items, tenant_id)
self.client.upload_points(mt_collection, points)
return None
# Handle the operation with error retry def insert(self, collection_name: str, items: List[VectorItem]):
return self._handle_operation_with_error_retry( """
"upsert", mt_collection, points, dimension Insert items with tenant ID.
) """
return self.upsert(collection_name, items)
def reset(self): def reset(self):
""" """
@ -697,11 +328,9 @@ class QdrantClient(VectorDBBase):
""" """
if not self.client: if not self.client:
return None return None
for collection in self.client.get_collections().collections:
collection_names = self.client.get_collections().collections if collection.name.startswith(self.collection_prefix):
for collection_name in collection_names: self.client.delete_collection(collection_name=collection.name)
if collection_name.name.startswith(self.collection_prefix):
self.client.delete_collection(collection_name=collection_name.name)
def delete_collection(self, collection_name: str): def delete_collection(self, collection_name: str):
""" """
@ -709,24 +338,13 @@ class QdrantClient(VectorDBBase):
""" """
if not self.client: if not self.client:
return None return None
# Map to multi-tenant collection and tenant ID
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
if not self.client.collection_exists(collection_name=mt_collection):
tenant_filter = models.FieldCondition( log.debug(f"Collection {mt_collection} doesn't exist, nothing to delete")
key="tenant_id", match=models.MatchValue(value=tenant_id) return None
) self.client.delete(
field_conditions = [tenant_filter]
update_result = self.client.delete(
collection_name=mt_collection, collection_name=mt_collection,
points_selector=models.FilterSelector( points_selector=models.FilterSelector(
filter=models.Filter(must=field_conditions) filter=models.Filter(must=[_tenant_filter(tenant_id)])
), ),
) )
if self.client.get_collection(mt_collection).points_count == 0:
self.client.delete_collection(mt_collection)
return update_result

View file

@ -49,7 +49,7 @@ langchain-community==0.3.26
fake-useragent==2.1.0 fake-useragent==2.1.0
chromadb==0.6.3 chromadb==0.6.3
pymilvus==2.5.0 pymilvus==2.5.0
qdrant-client~=1.12.0 qdrant-client==1.14.3
opensearch-py==2.8.0 opensearch-py==2.8.0
playwright==1.49.1 # Caution: version must match docker-compose.playwright.yaml playwright==1.49.1 # Caution: version must match docker-compose.playwright.yaml
elasticsearch==9.0.1 elasticsearch==9.0.1