From 5ab585c021941e0afc824c4dda684d7c5599728f Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 18 Nov 2025 16:01:06 -0800 Subject: [PATCH] improved cancelation handling --- .../search/components/searchResultsPage.tsx | 6 +- .../app/[domain]/search/useStreamedSearch.ts | 78 ++++++++++--------- .../app/api/(server)/stream_search/route.ts | 52 +++++++++---- 3 files changed, 79 insertions(+), 57 deletions(-) diff --git a/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx b/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx index 48784eaf..e1f58423 100644 --- a/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx +++ b/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx @@ -19,7 +19,7 @@ import { SearchQueryParams } from "@/lib/types"; import { createPathWithQueryParams } from "@/lib/utils"; import { InfoCircledIcon } from "@radix-ui/react-icons"; import { useLocalStorage } from "@uidotdev/usehooks"; -import { AlertTriangleIcon, BugIcon, FilterIcon, RefreshCcwIcon } from "lucide-react"; +import { AlertTriangleIcon, BugIcon, FilterIcon, RefreshCwIcon } from "lucide-react"; import { useRouter } from "next/navigation"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useHotkeys } from "react-hotkeys-hook"; @@ -291,7 +291,7 @@ const PanelGroup = ({
{isStreaming ? ( <> - +

Searching...

{numMatches > 0 && (

{`Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}

@@ -353,7 +353,7 @@ const PanelGroup = ({ /> ) : isStreaming ? (
- +

Searching...

) : ( diff --git a/packages/web/src/app/[domain]/search/useStreamedSearch.ts b/packages/web/src/app/[domain]/search/useStreamedSearch.ts index 861288a1..85a6fd64 100644 --- a/packages/web/src/app/[domain]/search/useStreamedSearch.ts +++ b/packages/web/src/app/[domain]/search/useStreamedSearch.ts @@ -2,6 +2,7 @@ import { RepositoryInfo, SearchRequest, SearchResponse, SearchResultFile } from '@/features/search/types'; import { useState, useCallback, useRef, useEffect } from 'react'; +import * as Sentry from '@sentry/nextjs'; interface CacheEntry { files: SearchResultFile[]; @@ -155,54 +156,39 @@ export const useStreamedSearch = ({ query, matches, contextLines, whole, isRegex // SSE messages start with "data: " const dataMatch = message.match(/^data: (.+)$/); - if (!dataMatch) continue; + if (!dataMatch) { + continue; + } const data = dataMatch[1]; // Check for completion signal if (data === '[DONE]') { - setState(prev => ({ ...prev, isStreaming: false })); - return; + break; } - try { - const chunk: SearchResponse = JSON.parse(data); - setState(prev => ({ - ...prev, - files: [ - ...prev.files, - ...chunk.files - ], - repoInfo: { - ...prev.repoInfo, - ...chunk.repositoryInfo.reduce((acc, repo) => { - acc[repo.id] = repo; - return acc; - }, {} as Record), - }, - numMatches: prev.numMatches + chunk.stats.actualMatchCount, - })); - } catch (parseError) { - console.error('Error parsing chunk:', parseError); - } + const chunk: SearchResponse = JSON.parse(data); + setState(prev => ({ + ...prev, + files: [ + ...prev.files, + ...chunk.files + ], + repoInfo: { + ...prev.repoInfo, + ...chunk.repositoryInfo.reduce((acc, repo) => { + acc[repo.id] = repo; + return acc; + }, {} as Record), + }, + numMatches: prev.numMatches + chunk.stats.actualMatchCount, + })); } } - setState(prev => ({ ...prev, isStreaming: false })); - } catch (error) { - if ((error as Error).name === 'AbortError') { - console.log('Stream aborted'); - } else { - setState(prev => ({ - ...prev, - isStreaming: false, - error: error as Error, - })); - } - } finally { - const endTime = performance.now(); - const durationMs = endTime - startTime; + const durationMs = performance.now() - startTime; setState(prev => { + // Cache the final results after the stream has completed. searchCache.set(cacheKey, { files: prev.files, repoInfo: prev.repoInfo, @@ -213,14 +199,31 @@ export const useStreamedSearch = ({ query, matches, contextLines, whole, isRegex return { ...prev, durationMs, + isStreaming: false, } }); + + } catch (error) { + if ((error as Error).name === 'AbortError') { + return; + } + + console.error(error); + Sentry.captureException(error); + const durationMs = performance.now() - startTime; + setState(prev => ({ + ...prev, + isStreaming: false, + durationMs, + error: error as Error, + })); } } search(); return () => { + cancel(); } }, [ query, @@ -229,6 +232,7 @@ export const useStreamedSearch = ({ query, matches, contextLines, whole, isRegex whole, isRegexEnabled, isCaseSensitivityEnabled, + cancel, ]); return { diff --git a/packages/web/src/app/api/(server)/stream_search/route.ts b/packages/web/src/app/api/(server)/stream_search/route.ts index 4b267922..11905ee7 100644 --- a/packages/web/src/app/api/(server)/stream_search/route.ts +++ b/packages/web/src/app/api/(server)/stream_search/route.ts @@ -2,6 +2,7 @@ import { searchRequestSchema } from '@/features/search/schemas'; import { SearchResponse, SourceRange } from '@/features/search/types'; +import { SINGLE_TENANT_ORG_ID } from '@/lib/constants'; import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError'; import { prisma } from '@/prisma'; import type { ProtoGrpcType } from '@/proto/webserver'; @@ -12,13 +13,13 @@ import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/St import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService'; import * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; +import * as Sentry from '@sentry/nextjs'; import { PrismaClient, Repo } from '@sourcebot/db'; +import { parser as _parser } from '@sourcebot/query-language'; import { createLogger, env } from '@sourcebot/shared'; import { NextRequest } from 'next/server'; import * as path from 'path'; -import { parser as _parser } from '@sourcebot/query-language'; import { transformToZoektQuery } from './transformer'; -import { SINGLE_TENANT_ORG_ID } from '@/lib/constants'; const logger = createLogger('streamSearchApi'); @@ -87,8 +88,8 @@ export const POST = async (request: NextRequest) => { input: query, isCaseSensitivityEnabled, isRegexEnabled, - onExpandSearchContext: async (contextName: string) => { - const context = await prisma.searchContext.findUnique({ + onExpandSearchContext: async (contextName: string) => { + const context = await prisma.searchContext.findUnique({ where: { name_orgId: { name: contextName, @@ -108,8 +109,6 @@ export const POST = async (request: NextRequest) => { }, }); - console.log(JSON.stringify(zoektQuery, null, 2)); - const searchRequest: SearchRequest = { query: zoektQuery, opts: { @@ -158,9 +157,19 @@ const createSSESearchStream = async (searchRequest: SearchRequest, prisma: Prism const client = createGrpcClient(); let grpcStream: ReturnType | null = null; let isStreamActive = true; + let pendingChunks = 0; return new ReadableStream({ async start(controller) { + const tryCloseController = () => { + if (!isStreamActive && pendingChunks === 0) { + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); + controller.close(); + client.close(); + logger.debug('SSE stream closed'); + } + }; + try { // @todo: we should just disable tenant enforcement for now. const metadata = new grpc.Metadata(); @@ -190,12 +199,14 @@ const createSSESearchStream = async (searchRequest: SearchRequest, prisma: Prism // Handle incoming data chunks grpcStream.on('data', async (chunk: StreamSearchResponse__Output) => { - console.log('chunk'); - if (!isStreamActive) { + logger.debug('SSE stream closed, skipping chunk'); return; } + // Track that we're processing a chunk + pendingChunks++; + // grpcStream.on doesn't actually await on our handler, so we need to // explicitly pause the stream here to prevent the stream from completing // prior to our asynchronous work being completed. @@ -352,7 +363,18 @@ const createSSESearchStream = async (searchRequest: SearchRequest, prisma: Prism } catch (error) { console.error('Error encoding chunk:', error); } finally { + pendingChunks--; grpcStream?.resume(); + + // @note: we were hitting "Controller is already closed" errors when calling + // `controller.enqueue` above for the last chunk. The reasoning was the event + // handler for 'end' was being invoked prior to the completion of the last chunk, + // resulting in the controller being closed prematurely. The workaround was to + // keep track of the number of pending chunks and only close the controller + // when there are no more chunks to process. We need to explicitly call + // `tryCloseController` since there _seems_ to be no ordering guarantees between + // the 'end' event handler and this callback. + tryCloseController(); } }); @@ -362,17 +384,13 @@ const createSSESearchStream = async (searchRequest: SearchRequest, prisma: Prism return; } isStreamActive = false; - - // Send completion signal - controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); - controller.close(); - console.log('SSE stream completed'); - client.close(); + tryCloseController(); }); // Handle errors grpcStream.on('error', (error: grpc.ServiceError) => { - console.error('gRPC stream error:', error); + logger.error('gRPC stream error:', error); + Sentry.captureException(error); if (!isStreamActive) { return; @@ -392,7 +410,7 @@ const createSSESearchStream = async (searchRequest: SearchRequest, prisma: Prism client.close(); }); } catch (error) { - console.error('Stream initialization error:', error); + logger.error('Stream initialization error:', error); const errorMessage = error instanceof Error ? error.message : 'Unknown error'; const errorData = `data: ${JSON.stringify({ @@ -405,7 +423,7 @@ const createSSESearchStream = async (searchRequest: SearchRequest, prisma: Prism } }, cancel() { - console.log('SSE stream cancelled by client'); + logger.warn('SSE stream cancelled by client'); isStreamActive = false; // Cancel the gRPC stream to stop receiving data