refac: file handling

This commit is contained in:
Timothy Jaeryang Baek 2025-07-11 12:29:17 +04:00
parent 0db8bedf45
commit 033d07ee23

View file

@ -468,8 +468,10 @@ def get_sources_from_items(
for item in items:
query_result = None
collection_names = []
if item.get("type") == "text":
# Text File
# Raw Text
# Used during temporary chat file uploads
query_result = {
"documents": [[item.get("content")]],
@ -487,24 +489,57 @@ def get_sources_from_items(
"metadatas": [[{"file_id": note.id, "name": note.title}]],
}
elif item.get("docs"):
# BYPASS_WEB_SEARCH_EMBEDDING_AND_RETRIEVAL
query_result = {
"documents": [[doc.get("content") for doc in item.get("docs")]],
"metadatas": [[doc.get("metadata") for doc in item.get("docs")]],
}
elif item.get("type") == "file":
if (
item.get("context") == "full"
or request.app.state.config.BYPASS_EMBEDDING_AND_RETRIEVAL
):
if item.get("file").get("data", {}):
# Manual Full Mode Toggle
# Used from chat file modal, we can assume that the file content will be available from item.get("file").get("data", {}).get("content")
query_result = {
"documents": [
[item.get("file").get("data", {}).get("content", "")]
],
"metadatas": [
[
{
"file_id": item.get("id"),
"name": item.get("name"),
**item.get("file")
.get("data", {})
.get("metadata", {}),
}
]
],
}
elif item.get("id"):
file_object = Files.get_file_by_id(item.get("id"))
if file_object:
query_result = {
"documents": [[file_object.data.get("content", "")]],
"metadatas": [
[
{
"file_id": item.get("id"),
"name": file_object.filename,
"source": file_object.filename,
}
]
],
}
else:
# Fallback to collection names
if item.get("legacy"):
collection_names.append(f"{item['id']}")
else:
collection_names.append(f"file-{item['id']}")
elif item.get("context") == "full":
if item.get("type") == "file":
# Manual Full Mode Toggle
# Used from chat file modal, we can assume that the file content will be available from item.get("file").get("data", {}).get("content")
query_result = {
"documents": [[item.get("file").get("data", {}).get("content")]],
"metadatas": [
[{"file_id": item.get("id"), "name": item.get("name")}]
],
}
elif item.get("type") == "collection":
elif item.get("type") == "collection":
if (
item.get("context") == "full"
or request.app.state.config.BYPASS_EMBEDDING_AND_RETRIEVAL
):
# Manual Full Mode Toggle for Collection
knowledge_base = Knowledges.get_knowledge_by_id(item.get("id"))
@ -534,71 +569,26 @@ def get_sources_from_items(
"documents": [documents],
"metadatas": [metadatas],
}
elif (
item.get("type") != "web_search"
and request.app.state.config.BYPASS_EMBEDDING_AND_RETRIEVAL
):
# BYPASS_EMBEDDING_AND_RETRIEVAL
if item.get("type") == "collection":
file_ids = item.get("data", {}).get("file_ids", [])
documents = []
metadatas = []
for file_id in file_ids:
file_object = Files.get_file_by_id(file_id)
if file_object:
documents.append(file_object.data.get("content", ""))
metadatas.append(
{
"file_id": file_id,
"name": file_object.filename,
"source": file_object.filename,
}
)
query_result = {
"documents": [documents],
"metadatas": [metadatas],
}
elif item.get("id"):
file_object = Files.get_file_by_id(item.get("id"))
if file_object:
query_result = {
"documents": [[file_object.data.get("content", "")]],
"metadatas": [
[
{
"file_id": item.get("id"),
"name": file_object.filename,
"source": file_object.filename,
}
]
],
}
elif item.get("file").get("data"):
query_result = {
"documents": [[item.get("file").get("data", {}).get("content")]],
"metadatas": [
[item.get("file").get("data", {}).get("metadata", {})]
],
}
else:
collection_names = []
if item.get("type") == "collection":
else:
# Fallback to collection names
if item.get("legacy"):
collection_names = item.get("collection_names", [])
else:
collection_names.append(item["id"])
elif item.get("collection_name"):
collection_names.append(item["collection_name"])
elif item.get("id"):
if item.get("legacy"):
collection_names.append(f"{item['id']}")
else:
collection_names.append(f"file-{item['id']}")
elif item.get("docs"):
# BYPASS_WEB_SEARCH_EMBEDDING_AND_RETRIEVAL
query_result = {
"documents": [[doc.get("content") for doc in item.get("docs")]],
"metadatas": [[doc.get("metadata") for doc in item.get("docs")]],
}
elif item.get("collection_name"):
# Direct Collection Name
collection_names.append(item["collection_name"])
# If query_result is None
# Fallback to collection names and vector search the collections
if query_result is None and collection_names:
collection_names = set(collection_names).difference(extracted_collections)
if not collection_names:
log.debug(f"skipping {item} as it has already been extracted")
@ -609,12 +599,12 @@ def get_sources_from_items(
query_result = get_all_items_from_collections(collection_names)
except Exception as e:
log.exception(e)
else:
try:
query_result = None
if item.get("type") == "text":
# Not sure when this is used, but it seems to be a fallback
# TODO: remove?
query_result = {
"documents": [
[item.get("file").get("data", {}).get("content")]