diff --git a/packages/web/src/app/[domain]/search/useStreamedSearch.ts b/packages/web/src/app/[domain]/search/useStreamedSearch.ts index e1d87a0e..b4f079fd 100644 --- a/packages/web/src/app/[domain]/search/useStreamedSearch.ts +++ b/packages/web/src/app/[domain]/search/useStreamedSearch.ts @@ -222,6 +222,8 @@ export const useStreamedSearch = ({ query, matches, contextLines, whole, isRegex } : {}), })); break; + case 'error': + throw new ServiceErrorException(response.error); } numMessagesProcessed++; diff --git a/packages/web/src/features/search/types.ts b/packages/web/src/features/search/types.ts index 77d91faa..90f50182 100644 --- a/packages/web/src/features/search/types.ts +++ b/packages/web/src/features/search/types.ts @@ -1,5 +1,6 @@ import { CodeHostType } from "@sourcebot/db"; import { z } from "zod"; +import { serviceErrorSchema } from "@/lib/serviceError"; export const locationSchema = z.object({ byteOffset: z.number(), // 0-based byte offset from the beginning of the file @@ -126,10 +127,19 @@ export const streamedSearchFinalResponseSchema = z.object({ }); export type StreamedSearchFinalResponse = z.infer; +/** + * Sent when an error occurs during streaming. + */ +export const streamedSearchErrorResponseSchema = z.object({ + type: z.literal('error'), + error: serviceErrorSchema, +}); +export type StreamedSearchErrorResponse = z.infer; export const streamedSearchResponseSchema = z.discriminatedUnion('type', [ streamedSearchChunkResponseSchema, streamedSearchFinalResponseSchema, + streamedSearchErrorResponseSchema, ]); export type StreamedSearchResponse = z.infer; diff --git a/packages/web/src/features/search/zoektSearcher.ts b/packages/web/src/features/search/zoektSearcher.ts index ec588a73..b3f19679 100644 --- a/packages/web/src/features/search/zoektSearcher.ts +++ b/packages/web/src/features/search/zoektSearcher.ts @@ -1,4 +1,5 @@ import { getCodeHostBrowseFileAtBranchUrl } from "@/lib/utils"; +import { unexpectedError } from "@/lib/serviceError"; import type { ProtoGrpcType } from '@/proto/webserver'; import { FileMatch__Output as ZoektGrpcFileMatch } from "@/proto/zoekt/webserver/v1/FileMatch"; import { FlushReason as ZoektGrpcFlushReason } from "@/proto/zoekt/webserver/v1/FlushReason"; @@ -15,7 +16,7 @@ import { PrismaClient, Repo } from "@sourcebot/db"; import { createLogger, env } from "@sourcebot/shared"; import path from 'path'; import { QueryIR, someInQueryIR } from './ir'; -import { RepositoryInfo, SearchResponse, SearchResultFile, SearchStats, SourceRange, StreamedSearchResponse } from "./types"; +import { RepositoryInfo, SearchResponse, SearchResultFile, SearchStats, SourceRange, StreamedSearchErrorResponse, StreamedSearchResponse } from "./types"; const logger = createLogger("zoekt-searcher"); @@ -177,8 +178,8 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount, } - controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finalResponse)}\n\n`)); - controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); + controller.enqueue(encodeSSEREsponseChunk(finalResponse)); + controller.enqueue(encodeSSEREsponseChunk('[DONE]')); controller.close(); client.close(); logger.debug('SSE stream closed'); @@ -231,10 +232,18 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p stats } - const sseData = `data: ${JSON.stringify(response)}\n\n`; - controller.enqueue(new TextEncoder().encode(sseData)); + controller.enqueue(encodeSSEREsponseChunk(response)); } catch (error) { - console.error('Error encoding chunk:', error); + logger.error('Error processing chunk:', error); + Sentry.captureException(error); + isStreamActive = false; + + const errorMessage = error instanceof Error ? error.message : 'Unknown error processing chunk'; + const errorResponse: StreamedSearchErrorResponse = { + type: 'error', + error: unexpectedError(errorMessage), + }; + controller.enqueue(encodeSSEREsponseChunk(errorResponse)); } finally { pendingChunks--; grpcStream?.resume(); @@ -270,26 +279,26 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p } isStreamActive = false; - // Send error as SSE event - const errorData = `data: ${JSON.stringify({ - error: { - code: error.code, - message: error.details || error.message, - } - })}\n\n`; - controller.enqueue(new TextEncoder().encode(errorData)); + // Send properly typed error response + const errorResponse: StreamedSearchErrorResponse = { + type: 'error', + error: unexpectedError(error.details || error.message), + }; + controller.enqueue(encodeSSEREsponseChunk(errorResponse)); controller.close(); client.close(); }); } catch (error) { logger.error('Stream initialization error:', error); + Sentry.captureException(error); const errorMessage = error instanceof Error ? error.message : 'Unknown error'; - const errorData = `data: ${JSON.stringify({ - error: { message: errorMessage } - })}\n\n`; - controller.enqueue(new TextEncoder().encode(errorData)); + const errorResponse: StreamedSearchErrorResponse = { + type: 'error', + error: unexpectedError(errorMessage), + }; + controller.enqueue(encodeSSEREsponseChunk(errorResponse)); controller.close(); client.close(); @@ -309,6 +318,12 @@ export const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, p }); } +// Encodes a response chunk into a SSE-compatible format. +const encodeSSEREsponseChunk = (response: object | string) => { + const data = typeof response === 'string' ? response : JSON.stringify(response); + return new TextEncoder().encode(`data: ${data}\n\n`); +} + // Creates a mapping between all repository ids in a given response // chunk. The mapping allows us to efficiently lookup repository metadata. const createReposMapForChunk = async (chunk: ZoektGrpcSearchResponse, reposMapCache: Map, prisma: PrismaClient): Promise> => {