error handling for stream path

This commit is contained in:
bkellam 2025-11-20 17:10:06 -08:00
parent 5c624cda4f
commit 55a3e9ef6a
3 changed files with 45 additions and 18 deletions

View file

@ -222,6 +222,8 @@ export const useStreamedSearch = ({ query, matches, contextLines, whole, isRegex
} : {}), } : {}),
})); }));
break; break;
case 'error':
throw new ServiceErrorException(response.error);
} }
numMessagesProcessed++; numMessagesProcessed++;

View file

@ -1,5 +1,6 @@
import { CodeHostType } from "@sourcebot/db"; import { CodeHostType } from "@sourcebot/db";
import { z } from "zod"; import { z } from "zod";
import { serviceErrorSchema } from "@/lib/serviceError";
export const locationSchema = z.object({ export const locationSchema = z.object({
byteOffset: z.number(), // 0-based byte offset from the beginning of the file byteOffset: z.number(), // 0-based byte offset from the beginning of the file
@ -126,10 +127,19 @@ export const streamedSearchFinalResponseSchema = z.object({
}); });
export type StreamedSearchFinalResponse = z.infer<typeof streamedSearchFinalResponseSchema>; export type StreamedSearchFinalResponse = z.infer<typeof streamedSearchFinalResponseSchema>;
/**
* Sent when an error occurs during streaming.
*/
export const streamedSearchErrorResponseSchema = z.object({
type: z.literal('error'),
error: serviceErrorSchema,
});
export type StreamedSearchErrorResponse = z.infer<typeof streamedSearchErrorResponseSchema>;
export const streamedSearchResponseSchema = z.discriminatedUnion('type', [ export const streamedSearchResponseSchema = z.discriminatedUnion('type', [
streamedSearchChunkResponseSchema, streamedSearchChunkResponseSchema,
streamedSearchFinalResponseSchema, streamedSearchFinalResponseSchema,
streamedSearchErrorResponseSchema,
]); ]);
export type StreamedSearchResponse = z.infer<typeof streamedSearchResponseSchema>; export type StreamedSearchResponse = z.infer<typeof streamedSearchResponseSchema>;

View file

@ -1,4 +1,5 @@
import { getCodeHostBrowseFileAtBranchUrl } from "@/lib/utils"; import { getCodeHostBrowseFileAtBranchUrl } from "@/lib/utils";
import { unexpectedError } from "@/lib/serviceError";
import type { ProtoGrpcType } from '@/proto/webserver'; import type { ProtoGrpcType } from '@/proto/webserver';
import { FileMatch__Output as ZoektGrpcFileMatch } from "@/proto/zoekt/webserver/v1/FileMatch"; import { FileMatch__Output as ZoektGrpcFileMatch } from "@/proto/zoekt/webserver/v1/FileMatch";
import { FlushReason as ZoektGrpcFlushReason } from "@/proto/zoekt/webserver/v1/FlushReason"; import { FlushReason as ZoektGrpcFlushReason } from "@/proto/zoekt/webserver/v1/FlushReason";
@ -15,7 +16,7 @@ import { PrismaClient, Repo } from "@sourcebot/db";
import { createLogger, env } from "@sourcebot/shared"; import { createLogger, env } from "@sourcebot/shared";
import path from 'path'; import path from 'path';
import { QueryIR, someInQueryIR } from './ir'; import { QueryIR, someInQueryIR } from './ir';
import { RepositoryInfo, SearchResponse, SearchResultFile, SearchStats, SourceRange, StreamedSearchResponse } from "./types"; import { RepositoryInfo, SearchResponse, SearchResultFile, SearchStats, SourceRange, StreamedSearchErrorResponse, StreamedSearchResponse } from "./types";
const logger = createLogger("zoekt-searcher"); const logger = createLogger("zoekt-searcher");
@ -177,8 +178,8 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p
isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount, isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount,
} }
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finalResponse)}\n\n`)); controller.enqueue(encodeSSEREsponseChunk(finalResponse));
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); controller.enqueue(encodeSSEREsponseChunk('[DONE]'));
controller.close(); controller.close();
client.close(); client.close();
logger.debug('SSE stream closed'); logger.debug('SSE stream closed');
@ -231,10 +232,18 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p
stats stats
} }
const sseData = `data: ${JSON.stringify(response)}\n\n`; controller.enqueue(encodeSSEREsponseChunk(response));
controller.enqueue(new TextEncoder().encode(sseData));
} catch (error) { } catch (error) {
console.error('Error encoding chunk:', error); logger.error('Error processing chunk:', error);
Sentry.captureException(error);
isStreamActive = false;
const errorMessage = error instanceof Error ? error.message : 'Unknown error processing chunk';
const errorResponse: StreamedSearchErrorResponse = {
type: 'error',
error: unexpectedError(errorMessage),
};
controller.enqueue(encodeSSEREsponseChunk(errorResponse));
} finally { } finally {
pendingChunks--; pendingChunks--;
grpcStream?.resume(); grpcStream?.resume();
@ -270,26 +279,26 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p
} }
isStreamActive = false; isStreamActive = false;
// Send error as SSE event // Send properly typed error response
const errorData = `data: ${JSON.stringify({ const errorResponse: StreamedSearchErrorResponse = {
error: { type: 'error',
code: error.code, error: unexpectedError(error.details || error.message),
message: error.details || error.message, };
} controller.enqueue(encodeSSEREsponseChunk(errorResponse));
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close(); controller.close();
client.close(); client.close();
}); });
} catch (error) { } catch (error) {
logger.error('Stream initialization error:', error); logger.error('Stream initialization error:', error);
Sentry.captureException(error);
const errorMessage = error instanceof Error ? error.message : 'Unknown error'; const errorMessage = error instanceof Error ? error.message : 'Unknown error';
const errorData = `data: ${JSON.stringify({ const errorResponse: StreamedSearchErrorResponse = {
error: { message: errorMessage } type: 'error',
})}\n\n`; error: unexpectedError(errorMessage),
controller.enqueue(new TextEncoder().encode(errorData)); };
controller.enqueue(encodeSSEREsponseChunk(errorResponse));
controller.close(); controller.close();
client.close(); client.close();
@ -309,6 +318,12 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p
}); });
} }
// Encodes a response chunk into a SSE-compatible format.
const encodeSSEREsponseChunk = (response: object | string) => {
const data = typeof response === 'string' ? response : JSON.stringify(response);
return new TextEncoder().encode(`data: ${data}\n\n`);
}
// Creates a mapping between all repository ids in a given response // Creates a mapping between all repository ids in a given response
// chunk. The mapping allows us to efficiently lookup repository metadata. // chunk. The mapping allows us to efficiently lookup repository metadata.
const createReposMapForChunk = async (chunk: ZoektGrpcSearchResponse, reposMapCache: Map<string | number, Repo>, prisma: PrismaClient): Promise<Map<string | number, Repo>> => { const createReposMapForChunk = async (chunk: ZoektGrpcSearchResponse, reposMapCache: Map<string | number, Repo>, prisma: PrismaClient): Promise<Map<string | number, Repo>> => {