mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-12 04:15:25 +00:00
refactor: Updated Qdrant multi-tenancy implementation
Signed-off-by: Anush008 <anushshetty90@gmail.com>
This commit is contained in:
parent
b5f4c85bb1
commit
5dba298c1e
2 changed files with 88 additions and 387 deletions
|
|
@ -1794,10 +1794,10 @@ MILVUS_IVF_FLAT_NLIST = int(os.environ.get("MILVUS_IVF_FLAT_NLIST", "128"))
|
||||||
QDRANT_URI = os.environ.get("QDRANT_URI", None)
|
QDRANT_URI = os.environ.get("QDRANT_URI", None)
|
||||||
QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", None)
|
QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", None)
|
||||||
QDRANT_ON_DISK = os.environ.get("QDRANT_ON_DISK", "false").lower() == "true"
|
QDRANT_ON_DISK = os.environ.get("QDRANT_ON_DISK", "false").lower() == "true"
|
||||||
QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "False").lower() == "true"
|
QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "false").lower() == "true"
|
||||||
QDRANT_GRPC_PORT = int(os.environ.get("QDRANT_GRPC_PORT", "6334"))
|
QDRANT_GRPC_PORT = int(os.environ.get("QDRANT_GRPC_PORT", "6334"))
|
||||||
ENABLE_QDRANT_MULTITENANCY_MODE = (
|
ENABLE_QDRANT_MULTITENANCY_MODE = (
|
||||||
os.environ.get("ENABLE_QDRANT_MULTITENANCY_MODE", "false").lower() == "true"
|
os.environ.get("ENABLE_QDRANT_MULTITENANCY_MODE", "true").lower() == "true"
|
||||||
)
|
)
|
||||||
|
|
||||||
# OpenSearch
|
# OpenSearch
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ from qdrant_client.http.models import PointStruct
|
||||||
from qdrant_client.models import models
|
from qdrant_client.models import models
|
||||||
|
|
||||||
NO_LIMIT = 999999999
|
NO_LIMIT = 999999999
|
||||||
|
TENANT_ID_FIELD = "tenant_id"
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
log.setLevel(SRC_LOG_LEVELS["RAG"])
|
log.setLevel(SRC_LOG_LEVELS["RAG"])
|
||||||
|
|
@ -113,96 +114,14 @@ class QdrantClient(VectorDBBase):
|
||||||
else:
|
else:
|
||||||
return self.KNOWLEDGE_COLLECTION, tenant_id
|
return self.KNOWLEDGE_COLLECTION, tenant_id
|
||||||
|
|
||||||
def _extract_error_message(self, exception):
|
def _create_multi_tenant_collection(
|
||||||
"""
|
self,
|
||||||
Extract error message from either HTTP or gRPC exceptions
|
mt_collection_name: str,
|
||||||
|
dimension: int = 384,
|
||||||
Returns:
|
|
||||||
tuple: (status_code, error_message)
|
|
||||||
"""
|
|
||||||
# Check if it's an HTTP exception
|
|
||||||
if isinstance(exception, UnexpectedResponse):
|
|
||||||
try:
|
|
||||||
error_data = exception.structured()
|
|
||||||
error_msg = error_data.get("status", {}).get("error", "")
|
|
||||||
return exception.status_code, error_msg
|
|
||||||
except Exception as inner_e:
|
|
||||||
log.error(f"Failed to parse HTTP error: {inner_e}")
|
|
||||||
return exception.status_code, str(exception)
|
|
||||||
|
|
||||||
# Check if it's a gRPC exception
|
|
||||||
elif isinstance(exception, grpc.RpcError):
|
|
||||||
# Extract status code from gRPC error
|
|
||||||
status_code = None
|
|
||||||
if hasattr(exception, "code") and callable(exception.code):
|
|
||||||
status_code = exception.code().value[0]
|
|
||||||
|
|
||||||
# Extract error message
|
|
||||||
error_msg = str(exception)
|
|
||||||
if "details =" in error_msg:
|
|
||||||
# Parse the details line which contains the actual error message
|
|
||||||
try:
|
|
||||||
details_line = [
|
|
||||||
line.strip()
|
|
||||||
for line in error_msg.split("\n")
|
|
||||||
if "details =" in line
|
|
||||||
][0]
|
|
||||||
error_msg = details_line.split("details =")[1].strip(' "')
|
|
||||||
except (IndexError, AttributeError):
|
|
||||||
# Fall back to full message if parsing fails
|
|
||||||
pass
|
|
||||||
|
|
||||||
return status_code, error_msg
|
|
||||||
|
|
||||||
# For any other type of exception
|
|
||||||
return None, str(exception)
|
|
||||||
|
|
||||||
def _is_collection_not_found_error(self, exception):
|
|
||||||
"""
|
|
||||||
Check if the exception is due to collection not found, supporting both HTTP and gRPC
|
|
||||||
"""
|
|
||||||
status_code, error_msg = self._extract_error_message(exception)
|
|
||||||
|
|
||||||
# HTTP error (404)
|
|
||||||
if (
|
|
||||||
status_code == 404
|
|
||||||
and "Collection" in error_msg
|
|
||||||
and "doesn't exist" in error_msg
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
|
|
||||||
# gRPC error (NOT_FOUND status)
|
|
||||||
if (
|
|
||||||
isinstance(exception, grpc.RpcError)
|
|
||||||
and exception.code() == grpc.StatusCode.NOT_FOUND
|
|
||||||
):
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _is_dimension_mismatch_error(self, exception):
|
|
||||||
"""
|
|
||||||
Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC
|
|
||||||
"""
|
|
||||||
status_code, error_msg = self._extract_error_message(exception)
|
|
||||||
|
|
||||||
# Common patterns in both HTTP and gRPC
|
|
||||||
return (
|
|
||||||
"Vector dimension error" in error_msg
|
|
||||||
or "dimensions mismatch" in error_msg
|
|
||||||
or "invalid vector size" in error_msg
|
|
||||||
)
|
|
||||||
|
|
||||||
def _create_multi_tenant_collection_if_not_exists(
|
|
||||||
self, mt_collection_name: str, dimension: int = 384
|
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Creates a collection with multi-tenancy configuration if it doesn't exist.
|
Creates a collection with multi-tenancy configuration and payload indexes for tenant_id and metadata fields.
|
||||||
Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'.
|
|
||||||
When creating collections dynamically (insert/upsert), the actual vector dimensions will be used.
|
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
# Try to create the collection directly - will fail if it already exists
|
|
||||||
self.client.create_collection(
|
self.client.create_collection(
|
||||||
collection_name=mt_collection_name,
|
collection_name=mt_collection_name,
|
||||||
vectors_config=models.VectorParams(
|
vectors_config=models.VectorParams(
|
||||||
|
|
@ -210,45 +129,21 @@ class QdrantClient(VectorDBBase):
|
||||||
distance=models.Distance.COSINE,
|
distance=models.Distance.COSINE,
|
||||||
on_disk=self.QDRANT_ON_DISK,
|
on_disk=self.QDRANT_ON_DISK,
|
||||||
),
|
),
|
||||||
hnsw_config=models.HnswConfigDiff(
|
)
|
||||||
payload_m=16, # Enable per-tenant indexing
|
log.info(
|
||||||
m=0,
|
f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
|
||||||
on_disk=self.QDRANT_ON_DISK,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create tenant ID payload index
|
|
||||||
self.client.create_payload_index(
|
self.client.create_payload_index(
|
||||||
collection_name=mt_collection_name,
|
collection_name=mt_collection_name,
|
||||||
field_name="tenant_id",
|
field_name=TENANT_ID_FIELD,
|
||||||
field_schema=models.KeywordIndexParams(
|
field_schema=models.KeywordIndexParams(
|
||||||
type=models.KeywordIndexType.KEYWORD,
|
type=models.KeywordIndexType.KEYWORD,
|
||||||
is_tenant=True,
|
is_tenant=True,
|
||||||
on_disk=self.QDRANT_ON_DISK,
|
on_disk=self.QDRANT_ON_DISK,
|
||||||
),
|
),
|
||||||
wait=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
|
||||||
f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
|
|
||||||
)
|
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
# Check for the specific error indicating collection already exists
|
|
||||||
status_code, error_msg = self._extract_error_message(e)
|
|
||||||
|
|
||||||
# HTTP status code 409 or gRPC ALREADY_EXISTS
|
|
||||||
if (isinstance(e, UnexpectedResponse) and status_code == 409) or (
|
|
||||||
isinstance(e, grpc.RpcError)
|
|
||||||
and e.code() == grpc.StatusCode.ALREADY_EXISTS
|
|
||||||
):
|
|
||||||
if "already exists" in error_msg:
|
|
||||||
log.debug(f"Collection {mt_collection_name} already exists")
|
|
||||||
return
|
|
||||||
# If it's not an already exists error, re-raise
|
|
||||||
raise e
|
|
||||||
except Exception as e:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def _create_points(self, items: list[VectorItem], tenant_id: str):
|
def _create_points(self, items: list[VectorItem], tenant_id: str):
|
||||||
"""
|
"""
|
||||||
Create point structs from vector items with tenant ID.
|
Create point structs from vector items with tenant ID.
|
||||||
|
|
@ -260,50 +155,41 @@ class QdrantClient(VectorDBBase):
|
||||||
payload={
|
payload={
|
||||||
"text": item["text"],
|
"text": item["text"],
|
||||||
"metadata": item["metadata"],
|
"metadata": item["metadata"],
|
||||||
"tenant_id": tenant_id,
|
TENANT_ID_FIELD: tenant_id,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
for item in items
|
for item in items
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def _ensure_collection(
|
||||||
|
self,
|
||||||
|
mt_collection_name: str,
|
||||||
|
dimension: int = 384,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Ensure the collection exists and payload indexes are created for tenant_id and metadata fields.
|
||||||
|
"""
|
||||||
|
if self.client.collection_exists(collection_name=mt_collection_name):
|
||||||
|
return
|
||||||
|
self._create_multi_tenant_collection(mt_collection_name, dimension)
|
||||||
|
|
||||||
def has_collection(self, collection_name: str) -> bool:
|
def has_collection(self, collection_name: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if a logical collection exists by checking for any points with the tenant ID.
|
Check if a logical collection exists by checking for any points with the tenant ID.
|
||||||
"""
|
"""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
if not self.client.collection_exists(collection_name=mt_collection):
|
||||||
# Create tenant filter
|
return False
|
||||||
tenant_filter = models.FieldCondition(
|
tenant_filter = models.FieldCondition(
|
||||||
key="tenant_id", match=models.MatchValue(value=tenant_id)
|
key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id)
|
||||||
)
|
)
|
||||||
|
count_result = self.client.count(
|
||||||
try:
|
|
||||||
# Try directly querying - most of the time collection should exist
|
|
||||||
response = self.client.query_points(
|
|
||||||
collection_name=mt_collection,
|
collection_name=mt_collection,
|
||||||
query_filter=models.Filter(must=[tenant_filter]),
|
count_filter=models.Filter(must=[tenant_filter]),
|
||||||
limit=1,
|
|
||||||
)
|
)
|
||||||
|
return count_result.count > 0
|
||||||
# Collection exists with this tenant ID if there are points
|
|
||||||
return len(response.points) > 0
|
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
if self._is_collection_not_found_error(e):
|
|
||||||
log.debug(f"Collection {mt_collection} doesn't exist")
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
# For other API errors, log and return False
|
|
||||||
_, error_msg = self._extract_error_message(e)
|
|
||||||
log.warning(f"Unexpected Qdrant error: {error_msg}")
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
# For any other errors, log and return False
|
|
||||||
log.debug(f"Error checking collection {mt_collection}: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def delete(
|
def delete(
|
||||||
self,
|
self,
|
||||||
|
|
@ -317,17 +203,16 @@ class QdrantClient(VectorDBBase):
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
if not self.client.collection_exists(collection_name=mt_collection):
|
||||||
|
log.debug(f"Collection {mt_collection} doesn't exist, nothing to delete")
|
||||||
|
return None
|
||||||
|
|
||||||
# Create tenant filter
|
|
||||||
tenant_filter = models.FieldCondition(
|
tenant_filter = models.FieldCondition(
|
||||||
key="tenant_id", match=models.MatchValue(value=tenant_id)
|
key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
must_conditions = [tenant_filter]
|
must_conditions = [tenant_filter]
|
||||||
should_conditions = []
|
should_conditions = []
|
||||||
|
|
||||||
if ids:
|
if ids:
|
||||||
for id_value in ids:
|
for id_value in ids:
|
||||||
should_conditions.append(
|
should_conditions.append(
|
||||||
|
|
@ -346,7 +231,6 @@ class QdrantClient(VectorDBBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try to delete directly - most of the time collection should exist
|
|
||||||
update_result = self.client.delete(
|
update_result = self.client.delete(
|
||||||
collection_name=mt_collection,
|
collection_name=mt_collection,
|
||||||
points_selector=models.FilterSelector(
|
points_selector=models.FilterSelector(
|
||||||
|
|
@ -355,20 +239,9 @@ class QdrantClient(VectorDBBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
return update_result
|
return update_result
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
if self._is_collection_not_found_error(e):
|
|
||||||
log.debug(
|
|
||||||
f"Collection {mt_collection} doesn't exist, nothing to delete"
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# For other API errors, log and re-raise
|
|
||||||
_, error_msg = self._extract_error_message(e)
|
|
||||||
log.warning(f"Unexpected Qdrant error: {error_msg}")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# For non-Qdrant exceptions, re-raise
|
log.warning(f"Error deleting from collection {mt_collection}: {e}")
|
||||||
raise
|
return None
|
||||||
|
|
||||||
def search(
|
def search(
|
||||||
self, collection_name: str, vectors: list[list[float | int]], limit: int
|
self, collection_name: str, vectors: list[list[float | int]], limit: int
|
||||||
|
|
@ -378,26 +251,19 @@ class QdrantClient(VectorDBBase):
|
||||||
"""
|
"""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
if not self.client.collection_exists(collection_name=mt_collection):
|
||||||
|
log.debug(f"Collection {mt_collection} doesn't exist, search returns None")
|
||||||
|
return None
|
||||||
|
|
||||||
# Get the vector dimension from the query vector
|
|
||||||
dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None
|
dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try the search operation directly - most of the time collection should exist
|
|
||||||
|
|
||||||
# Create tenant filter
|
|
||||||
tenant_filter = models.FieldCondition(
|
tenant_filter = models.FieldCondition(
|
||||||
key="tenant_id", match=models.MatchValue(value=tenant_id)
|
key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ensure vector dimensions match the collection
|
|
||||||
collection_dim = self.client.get_collection(
|
collection_dim = self.client.get_collection(
|
||||||
mt_collection
|
mt_collection
|
||||||
).config.params.vectors.size
|
).config.params.vectors.size
|
||||||
|
|
||||||
if collection_dim != dimension:
|
if collection_dim != dimension:
|
||||||
if collection_dim < dimension:
|
if collection_dim < dimension:
|
||||||
vectors = [vector[:collection_dim] for vector in vectors]
|
vectors = [vector[:collection_dim] for vector in vectors]
|
||||||
|
|
@ -406,8 +272,6 @@ class QdrantClient(VectorDBBase):
|
||||||
vector + [0] * (collection_dim - dimension)
|
vector + [0] * (collection_dim - dimension)
|
||||||
for vector in vectors
|
for vector in vectors
|
||||||
]
|
]
|
||||||
|
|
||||||
# Search with tenant filter
|
|
||||||
prefetch_query = models.Prefetch(
|
prefetch_query = models.Prefetch(
|
||||||
filter=models.Filter(must=[tenant_filter]),
|
filter=models.Filter(must=[tenant_filter]),
|
||||||
limit=NO_LIMIT,
|
limit=NO_LIMIT,
|
||||||
|
|
@ -418,30 +282,16 @@ class QdrantClient(VectorDBBase):
|
||||||
prefetch=prefetch_query,
|
prefetch=prefetch_query,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
get_result = self._result_to_get_result(query_response.points)
|
get_result = self._result_to_get_result(query_response.points)
|
||||||
return SearchResult(
|
return SearchResult(
|
||||||
ids=get_result.ids,
|
ids=get_result.ids,
|
||||||
documents=get_result.documents,
|
documents=get_result.documents,
|
||||||
metadatas=get_result.metadatas,
|
metadatas=get_result.metadatas,
|
||||||
# qdrant distance is [-1, 1], normalize to [0, 1]
|
|
||||||
distances=[
|
distances=[
|
||||||
[(point.score + 1.0) / 2.0 for point in query_response.points]
|
[(point.score + 1.0) / 2.0 for point in query_response.points]
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
if self._is_collection_not_found_error(e):
|
|
||||||
log.debug(
|
|
||||||
f"Collection {mt_collection} doesn't exist, search returns None"
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# For other API errors, log and re-raise
|
|
||||||
_, error_msg = self._extract_error_message(e)
|
|
||||||
log.warning(f"Unexpected Qdrant error during search: {error_msg}")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# For non-Qdrant exceptions, log and return None
|
|
||||||
log.exception(f"Error searching collection '{collection_name}': {e}")
|
log.exception(f"Error searching collection '{collection_name}': {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
@ -451,20 +301,16 @@ class QdrantClient(VectorDBBase):
|
||||||
"""
|
"""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
if not self.client.collection_exists(collection_name=mt_collection):
|
||||||
|
log.debug(f"Collection {mt_collection} doesn't exist, query returns None")
|
||||||
|
return None
|
||||||
|
|
||||||
# Set default limit if not provided
|
|
||||||
if limit is None:
|
if limit is None:
|
||||||
limit = NO_LIMIT
|
limit = NO_LIMIT
|
||||||
|
|
||||||
# Create tenant filter
|
|
||||||
tenant_filter = models.FieldCondition(
|
tenant_filter = models.FieldCondition(
|
||||||
key="tenant_id", match=models.MatchValue(value=tenant_id)
|
key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create metadata filters
|
|
||||||
field_conditions = []
|
field_conditions = []
|
||||||
for key, value in filter.items():
|
for key, value in filter.items():
|
||||||
field_conditions.append(
|
field_conditions.append(
|
||||||
|
|
@ -472,32 +318,15 @@ class QdrantClient(VectorDBBase):
|
||||||
key=f"metadata.{key}", match=models.MatchValue(value=value)
|
key=f"metadata.{key}", match=models.MatchValue(value=value)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Combine tenant filter with metadata filters
|
|
||||||
combined_filter = models.Filter(must=[tenant_filter, *field_conditions])
|
combined_filter = models.Filter(must=[tenant_filter, *field_conditions])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try the query directly - most of the time collection should exist
|
|
||||||
points = self.client.query_points(
|
points = self.client.query_points(
|
||||||
collection_name=mt_collection,
|
collection_name=mt_collection,
|
||||||
query_filter=combined_filter,
|
query_filter=combined_filter,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._result_to_get_result(points.points)
|
return self._result_to_get_result(points.points)
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
if self._is_collection_not_found_error(e):
|
|
||||||
log.debug(
|
|
||||||
f"Collection {mt_collection} doesn't exist, query returns None"
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# For other API errors, log and re-raise
|
|
||||||
_, error_msg = self._extract_error_message(e)
|
|
||||||
log.warning(f"Unexpected Qdrant error during query: {error_msg}")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# For non-Qdrant exceptions, log and re-raise
|
|
||||||
log.exception(f"Error querying collection '{collection_name}': {e}")
|
log.exception(f"Error querying collection '{collection_name}': {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
@ -507,17 +336,15 @@ class QdrantClient(VectorDBBase):
|
||||||
"""
|
"""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
if not self.client.collection_exists(collection_name=mt_collection):
|
||||||
|
log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
|
||||||
|
return None
|
||||||
|
|
||||||
# Create tenant filter
|
|
||||||
tenant_filter = models.FieldCondition(
|
tenant_filter = models.FieldCondition(
|
||||||
key="tenant_id", match=models.MatchValue(value=tenant_id)
|
key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try to get points directly - most of the time collection should exist
|
|
||||||
points = self.client.query_points(
|
points = self.client.query_points(
|
||||||
collection_name=mt_collection,
|
collection_name=mt_collection,
|
||||||
query_filter=models.Filter(must=[tenant_filter]),
|
query_filter=models.Filter(must=[tenant_filter]),
|
||||||
|
|
@ -525,151 +352,28 @@ class QdrantClient(VectorDBBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._result_to_get_result(points.points)
|
return self._result_to_get_result(points.points)
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
if self._is_collection_not_found_error(e):
|
|
||||||
log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# For other API errors, log and re-raise
|
|
||||||
_, error_msg = self._extract_error_message(e)
|
|
||||||
log.warning(f"Unexpected Qdrant error during get: {error_msg}")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# For non-Qdrant exceptions, log and return None
|
|
||||||
log.exception(f"Error getting collection '{collection_name}': {e}")
|
log.exception(f"Error getting collection '{collection_name}': {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _handle_operation_with_error_retry(
|
|
||||||
self, operation_name, mt_collection, points, dimension
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Private helper to handle common error cases for insert and upsert operations.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
operation_name: 'insert' or 'upsert'
|
|
||||||
mt_collection: The multi-tenant collection name
|
|
||||||
points: The vector points to insert/upsert
|
|
||||||
dimension: The dimension of the vectors
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The operation result (for upsert) or None (for insert)
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if operation_name == "insert":
|
|
||||||
self.client.upload_points(mt_collection, points)
|
|
||||||
return None
|
|
||||||
else: # upsert
|
|
||||||
return self.client.upsert(mt_collection, points)
|
|
||||||
except (UnexpectedResponse, grpc.RpcError) as e:
|
|
||||||
# Handle collection not found
|
|
||||||
if self._is_collection_not_found_error(e):
|
|
||||||
log.info(
|
|
||||||
f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}."
|
|
||||||
)
|
|
||||||
# Create collection with correct dimensions from our vectors
|
|
||||||
self._create_multi_tenant_collection_if_not_exists(
|
|
||||||
mt_collection_name=mt_collection, dimension=dimension
|
|
||||||
)
|
|
||||||
# Try operation again - no need for dimension adjustment since we just created with correct dimensions
|
|
||||||
if operation_name == "insert":
|
|
||||||
self.client.upload_points(mt_collection, points)
|
|
||||||
return None
|
|
||||||
else: # upsert
|
|
||||||
return self.client.upsert(mt_collection, points)
|
|
||||||
|
|
||||||
# Handle dimension mismatch
|
|
||||||
elif self._is_dimension_mismatch_error(e):
|
|
||||||
# For dimension errors, the collection must exist, so get its configuration
|
|
||||||
mt_collection_info = self.client.get_collection(mt_collection)
|
|
||||||
existing_size = mt_collection_info.config.params.vectors.size
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if existing_size < dimension:
|
|
||||||
# Truncate vectors to fit
|
|
||||||
log.info(
|
|
||||||
f"Truncating vectors from {dimension} to {existing_size} dimensions"
|
|
||||||
)
|
|
||||||
points = [
|
|
||||||
PointStruct(
|
|
||||||
id=point.id,
|
|
||||||
vector=point.vector[:existing_size],
|
|
||||||
payload=point.payload,
|
|
||||||
)
|
|
||||||
for point in points
|
|
||||||
]
|
|
||||||
elif existing_size > dimension:
|
|
||||||
# Pad vectors with zeros
|
|
||||||
log.info(
|
|
||||||
f"Padding vectors from {dimension} to {existing_size} dimensions with zeros"
|
|
||||||
)
|
|
||||||
points = [
|
|
||||||
PointStruct(
|
|
||||||
id=point.id,
|
|
||||||
vector=point.vector
|
|
||||||
+ [0] * (existing_size - len(point.vector)),
|
|
||||||
payload=point.payload,
|
|
||||||
)
|
|
||||||
for point in points
|
|
||||||
]
|
|
||||||
# Try operation again with adjusted dimensions
|
|
||||||
if operation_name == "insert":
|
|
||||||
self.client.upload_points(mt_collection, points)
|
|
||||||
return None
|
|
||||||
else: # upsert
|
|
||||||
return self.client.upsert(mt_collection, points)
|
|
||||||
else:
|
|
||||||
# Not a known error we can handle, log and re-raise
|
|
||||||
_, error_msg = self._extract_error_message(e)
|
|
||||||
log.warning(f"Unhandled Qdrant error: {error_msg}")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
# For non-Qdrant exceptions, re-raise
|
|
||||||
raise
|
|
||||||
|
|
||||||
def insert(self, collection_name: str, items: list[VectorItem]):
|
|
||||||
"""
|
|
||||||
Insert items with tenant ID.
|
|
||||||
"""
|
|
||||||
if not self.client or not items:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
|
||||||
|
|
||||||
# Get dimensions from the actual vectors
|
|
||||||
dimension = len(items[0]["vector"]) if items else None
|
|
||||||
|
|
||||||
# Create points with tenant ID
|
|
||||||
points = self._create_points(items, tenant_id)
|
|
||||||
|
|
||||||
# Handle the operation with error retry
|
|
||||||
return self._handle_operation_with_error_retry(
|
|
||||||
"insert", mt_collection, points, dimension
|
|
||||||
)
|
|
||||||
|
|
||||||
def upsert(self, collection_name: str, items: list[VectorItem]):
|
def upsert(self, collection_name: str, items: list[VectorItem]):
|
||||||
"""
|
"""
|
||||||
Upsert items with tenant ID.
|
Upsert items with tenant ID.
|
||||||
"""
|
"""
|
||||||
if not self.client or not items:
|
if not self.client or not items:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
|
||||||
# Get dimensions from the actual vectors
|
|
||||||
dimension = len(items[0]["vector"]) if items else None
|
dimension = len(items[0]["vector"]) if items else None
|
||||||
|
self._ensure_collection(mt_collection, dimension)
|
||||||
# Create points with tenant ID
|
|
||||||
points = self._create_points(items, tenant_id)
|
points = self._create_points(items, tenant_id)
|
||||||
|
self.client.upload_points(mt_collection, points)
|
||||||
|
return None
|
||||||
|
|
||||||
# Handle the operation with error retry
|
def insert(self, collection_name: str, items: list[VectorItem]):
|
||||||
return self._handle_operation_with_error_retry(
|
"""
|
||||||
"upsert", mt_collection, points, dimension
|
Insert items with tenant ID.
|
||||||
)
|
"""
|
||||||
|
return self.upsert(collection_name, items)
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""
|
"""
|
||||||
|
|
@ -689,24 +393,21 @@ class QdrantClient(VectorDBBase):
|
||||||
"""
|
"""
|
||||||
if not self.client:
|
if not self.client:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Map to multi-tenant collection and tenant ID
|
|
||||||
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
|
||||||
|
if not self.client.collection_exists(collection_name=mt_collection):
|
||||||
|
log.debug(f"Collection {mt_collection} doesn't exist, nothing to delete")
|
||||||
|
return None
|
||||||
|
|
||||||
tenant_filter = models.FieldCondition(
|
self.client.delete(
|
||||||
key="tenant_id", match=models.MatchValue(value=tenant_id)
|
|
||||||
)
|
|
||||||
|
|
||||||
field_conditions = [tenant_filter]
|
|
||||||
|
|
||||||
update_result = self.client.delete(
|
|
||||||
collection_name=mt_collection,
|
collection_name=mt_collection,
|
||||||
points_selector=models.FilterSelector(
|
points_selector=models.FilterSelector(
|
||||||
filter=models.Filter(must=field_conditions)
|
filter=models.Filter(
|
||||||
|
must=[
|
||||||
|
models.FieldCondition(
|
||||||
|
key=TENANT_ID_FIELD,
|
||||||
|
match=models.MatchValue(value=tenant_id),
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.client.get_collection(mt_collection).points_count == 0:
|
|
||||||
self.client.delete_collection(mt_collection)
|
|
||||||
|
|
||||||
return update_result
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue