From 9cd32362e88679538817a92b6df0b6b247aae349 Mon Sep 17 00:00:00 2001 From: bkellam Date: Fri, 14 Nov 2025 20:53:55 -0800 Subject: [PATCH] wip: make stream search api follow existing schema. Modify UI to support streaming --- .../search/components/searchResultsPage.tsx | 230 +++++------ .../app/[domain]/search/useStreamedSearch.ts | 171 ++++++++ .../src/app/[domain]/stream_search/page.tsx | 221 ----------- .../src/app/[domain]/stream_search/types.ts | 69 ---- .../stream_search/useStreamingSearch.ts | 148 ------- .../app/api/(server)/stream_search/route.ts | 375 ++++++++++++++---- packages/web/src/features/search/searchApi.ts | 2 +- 7 files changed, 582 insertions(+), 634 deletions(-) create mode 100644 packages/web/src/app/[domain]/search/useStreamedSearch.ts delete mode 100644 packages/web/src/app/[domain]/stream_search/page.tsx delete mode 100644 packages/web/src/app/[domain]/stream_search/types.ts delete mode 100644 packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts diff --git a/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx b/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx index 9c33e11d..7bacb12c 100644 --- a/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx +++ b/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx @@ -12,24 +12,22 @@ import { import { Separator } from "@/components/ui/separator"; import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"; import { RepositoryInfo, SearchResultFile, SearchStats } from "@/features/search/types"; -import useCaptureEvent from "@/hooks/useCaptureEvent"; import { useDomain } from "@/hooks/useDomain"; import { useNonEmptyQueryParam } from "@/hooks/useNonEmptyQueryParam"; import { useSearchHistory } from "@/hooks/useSearchHistory"; import { SearchQueryParams } from "@/lib/types"; -import { createPathWithQueryParams, measure, unwrapServiceError } from "@/lib/utils"; -import { InfoCircledIcon, SymbolIcon } from "@radix-ui/react-icons"; -import { useQuery } from "@tanstack/react-query"; +import { createPathWithQueryParams } from "@/lib/utils"; +import { InfoCircledIcon } from "@radix-ui/react-icons"; import { useLocalStorage } from "@uidotdev/usehooks"; -import { AlertTriangleIcon, BugIcon, FilterIcon } from "lucide-react"; +import { AlertTriangleIcon, BugIcon, FilterIcon, RefreshCcwIcon } from "lucide-react"; import { useRouter } from "next/navigation"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useHotkeys } from "react-hotkeys-hook"; import { ImperativePanelHandle } from "react-resizable-panels"; -import { search } from "../../../api/(client)/client"; import { CopyIconButton } from "../../components/copyIconButton"; import { SearchBar } from "../../components/searchBar"; import { TopBar } from "../../components/topBar"; +import { useStreamedSearch } from "../useStreamedSearch"; import { CodePreviewPanel } from "./codePreviewPanel"; import { FilterPanel } from "./filterPanel"; import { useFilteredMatches } from "./filterPanel/useFilterMatches"; @@ -46,7 +44,6 @@ export const SearchResultsPage = ({ }: SearchResultsPageProps) => { const router = useRouter(); const { setSearchHistory } = useSearchHistory(); - const captureEvent = useCaptureEvent(); const domain = useDomain(); const { toast } = useToast(); @@ -55,26 +52,17 @@ export const SearchResultsPage = ({ const maxMatchCount = isNaN(_maxMatchCount) ? defaultMaxMatchCount : _maxMatchCount; const { - data: searchResponse, - isPending: isSearchPending, - isFetching: isFetching, - error - } = useQuery({ - queryKey: ["search", searchQuery, maxMatchCount], - queryFn: () => measure(() => unwrapServiceError(search({ - query: searchQuery, - matches: maxMatchCount, - contextLines: 3, - whole: false, - })), "client.search"), - select: ({ data, durationMs }) => ({ - ...data, - totalClientSearchDurationMs: durationMs, - }), - enabled: searchQuery.length > 0, - refetchOnWindowFocus: false, - retry: false, - staleTime: 0, + error, + files, + repoInfo, + durationMs, + isStreaming, + numMatches, + } = useStreamedSearch({ + query: searchQuery, + matches: maxMatchCount, + contextLines: 3, + whole: false, }); useEffect(() => { @@ -102,38 +90,39 @@ export const SearchResultsPage = ({ ]) }, [searchQuery, setSearchHistory]); - useEffect(() => { - if (!searchResponse) { - return; - } + // @todo: capture search stats on completion. + // useEffect(() => { + // if (!searchResponse) { + // return; + // } - const fileLanguages = searchResponse.files?.map(file => file.language) || []; + // const fileLanguages = searchResponse.files?.map(file => file.language) || []; - captureEvent("search_finished", { - durationMs: searchResponse.totalClientSearchDurationMs, - fileCount: searchResponse.stats.fileCount, - matchCount: searchResponse.stats.totalMatchCount, - actualMatchCount: searchResponse.stats.actualMatchCount, - filesSkipped: searchResponse.stats.filesSkipped, - contentBytesLoaded: searchResponse.stats.contentBytesLoaded, - indexBytesLoaded: searchResponse.stats.indexBytesLoaded, - crashes: searchResponse.stats.crashes, - shardFilesConsidered: searchResponse.stats.shardFilesConsidered, - filesConsidered: searchResponse.stats.filesConsidered, - filesLoaded: searchResponse.stats.filesLoaded, - shardsScanned: searchResponse.stats.shardsScanned, - shardsSkipped: searchResponse.stats.shardsSkipped, - shardsSkippedFilter: searchResponse.stats.shardsSkippedFilter, - ngramMatches: searchResponse.stats.ngramMatches, - ngramLookups: searchResponse.stats.ngramLookups, - wait: searchResponse.stats.wait, - matchTreeConstruction: searchResponse.stats.matchTreeConstruction, - matchTreeSearch: searchResponse.stats.matchTreeSearch, - regexpsConsidered: searchResponse.stats.regexpsConsidered, - flushReason: searchResponse.stats.flushReason, - fileLanguages, - }); - }, [captureEvent, searchQuery, searchResponse]); + // captureEvent("search_finished", { + // durationMs: searchResponse.totalClientSearchDurationMs, + // fileCount: searchResponse.stats.fileCount, + // matchCount: searchResponse.stats.totalMatchCount, + // actualMatchCount: searchResponse.stats.actualMatchCount, + // filesSkipped: searchResponse.stats.filesSkipped, + // contentBytesLoaded: searchResponse.stats.contentBytesLoaded, + // indexBytesLoaded: searchResponse.stats.indexBytesLoaded, + // crashes: searchResponse.stats.crashes, + // shardFilesConsidered: searchResponse.stats.shardFilesConsidered, + // filesConsidered: searchResponse.stats.filesConsidered, + // filesLoaded: searchResponse.stats.filesLoaded, + // shardsScanned: searchResponse.stats.shardsScanned, + // shardsSkipped: searchResponse.stats.shardsSkipped, + // shardsSkippedFilter: searchResponse.stats.shardsSkippedFilter, + // ngramMatches: searchResponse.stats.ngramMatches, + // ngramLookups: searchResponse.stats.ngramLookups, + // wait: searchResponse.stats.wait, + // matchTreeConstruction: searchResponse.stats.matchTreeConstruction, + // matchTreeSearch: searchResponse.stats.matchTreeSearch, + // regexpsConsidered: searchResponse.stats.regexpsConsidered, + // flushReason: searchResponse.stats.flushReason, + // fileLanguages, + // }); + // }, [captureEvent, searchQuery, searchResponse]); const onLoadMoreResults = useCallback(() => { @@ -157,12 +146,7 @@ export const SearchResultsPage = ({ /> - {(isSearchPending || isFetching) ? ( -
- -

Searching...

-
- ) : error ? ( + {error ? (

Failed to search

@@ -170,14 +154,18 @@ export const SearchResultsPage = ({
) : ( )} @@ -186,10 +174,11 @@ export const SearchResultsPage = ({ interface PanelGroupProps { fileMatches: SearchResultFile[]; - isMoreResultsButtonVisible?: boolean; onLoadMoreResults: () => void; + isStreaming: boolean; + isMoreResultsButtonVisible?: boolean; isBranchFilteringEnabled: boolean; - repoInfo: RepositoryInfo[]; + repoInfo: Record; searchDurationMs: number; numMatches: number; searchStats?: SearchStats; @@ -198,9 +187,10 @@ interface PanelGroupProps { const PanelGroup = ({ fileMatches, isMoreResultsButtonVisible, + isStreaming, onLoadMoreResults, isBranchFilteringEnabled, - repoInfo: _repoInfo, + repoInfo, searchDurationMs: _searchDurationMs, numMatches, searchStats, @@ -228,13 +218,6 @@ const PanelGroup = ({ return Math.round(_searchDurationMs); }, [_searchDurationMs]); - const repoInfo = useMemo(() => { - return _repoInfo.reduce((acc, repo) => { - acc[repo.id] = repo; - return acc; - }, {} as Record); - }, [_repoInfo]); - return (
- - - - - -
- -

Search stats for nerds

- { - navigator.clipboard.writeText(JSON.stringify(searchStats, null, 2)); - return true; - }} - className="ml-auto" - /> -
- - {JSON.stringify(searchStats, null, 2)} - -
-
- { - fileMatches.length > 0 ? ( -

{`[${searchDurationMs} ms] Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}

- ) : ( -

No results

- ) - } - {isMoreResultsButtonVisible && ( -
- (load more) -
+ {isStreaming ? ( + <> + +

Searching...

+ {numMatches > 0 && ( +

{`Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}

+ )} + + ) : ( + <> + + + + + +
+ +

Search stats for nerds

+ { + navigator.clipboard.writeText(JSON.stringify(searchStats, null, 2)); + return true; + }} + className="ml-auto" + /> +
+ + {JSON.stringify(searchStats, null, 2)} + +
+
+ { + fileMatches.length > 0 ? ( +

{`[${searchDurationMs} ms] Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}

+ ) : ( +

No results

+ ) + } + {isMoreResultsButtonVisible && ( +
+ (load more) +
+ )} + )}
{filteredFileMatches.length > 0 ? ( @@ -340,6 +335,11 @@ const PanelGroup = ({ isBranchFilteringEnabled={isBranchFilteringEnabled} repoInfo={repoInfo} /> + ) : isStreaming ? ( +
+ +

Searching...

+
) : (

No results found

diff --git a/packages/web/src/app/[domain]/search/useStreamedSearch.ts b/packages/web/src/app/[domain]/search/useStreamedSearch.ts new file mode 100644 index 00000000..2369346a --- /dev/null +++ b/packages/web/src/app/[domain]/search/useStreamedSearch.ts @@ -0,0 +1,171 @@ +'use client'; + +import { RepositoryInfo, SearchRequest, SearchResponse, SearchResultFile } from '@/features/search/types'; +import { useState, useCallback, useRef, useEffect } from 'react'; + + +export const useStreamedSearch = ({ query, matches, contextLines, whole }: SearchRequest) => { + + const [state, setState] = useState<{ + isStreaming: boolean, + error: Error | null, + files: SearchResultFile[], + repoInfo: Record, + durationMs: number, + numMatches: number, + }>({ + isStreaming: false, + error: null, + files: [], + repoInfo: {}, + durationMs: 0, + numMatches: 0, + }); + + const abortControllerRef = useRef(null); + + const cancel = useCallback(() => { + if (abortControllerRef.current) { + abortControllerRef.current.abort(); + abortControllerRef.current = null; + } + setState(prev => ({ + ...prev, + isStreaming: false, + })); + }, []); + + useEffect(() => { + const search = async () => { + const startTime = performance.now(); + + if (abortControllerRef.current) { + abortControllerRef.current.abort(); + } + abortControllerRef.current = new AbortController(); + + setState({ + isStreaming: true, + error: null, + files: [], + repoInfo: {}, + durationMs: 0, + numMatches: 0, + }); + + try { + const response = await fetch('/api/stream_search', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query, + matches, + contextLines, + whole, + }), + signal: abortControllerRef.current.signal, + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + if (!response.body) { + throw new Error('No response body'); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true as boolean) { + const { done, value } = await reader.read(); + + if (done) { + break; + } + + // Decode the chunk and add to buffer + buffer += decoder.decode(value, { stream: true }); + + // Process complete SSE messages (separated by \n\n) + const messages = buffer.split('\n\n'); + + // Keep the last element (potentially incomplete message) in the buffer for the next chunk. + // Stream chunks can split messages mid-way, so we only process complete messages. + buffer = messages.pop() || ''; + + for (const message of messages) { + if (!message.trim()) { + continue; + } + + // SSE messages start with "data: " + const dataMatch = message.match(/^data: (.+)$/); + if (!dataMatch) continue; + + const data = dataMatch[1]; + + // Check for completion signal + if (data === '[DONE]') { + setState(prev => ({ ...prev, isStreaming: false })); + return; + } + + try { + const chunk: SearchResponse = JSON.parse(data); + setState(prev => ({ + ...prev, + files: [ + ...prev.files, + ...chunk.files + ], + repoInfo: { + ...prev.repoInfo, + ...chunk.repositoryInfo.reduce((acc, repo) => { + acc[repo.id] = repo; + return acc; + }, {} as Record), + }, + numMatches: prev.numMatches + chunk.stats.actualMatchCount, + })); + } catch (parseError) { + console.error('Error parsing chunk:', parseError); + } + } + } + + setState(prev => ({ ...prev, isStreaming: false })); + } catch (error) { + if ((error as Error).name === 'AbortError') { + console.log('Stream aborted'); + } else { + setState(prev => ({ + ...prev, + isStreaming: false, + error: error as Error, + })); + } + } finally { + const endTime = performance.now(); + const durationMs = endTime - startTime; + setState(prev => ({ + ...prev, + durationMs, + })); + } + } + + search(); + + return () => { + } + }, [query, matches, contextLines, whole]); + + return { + ...state, + cancel, + }; +} \ No newline at end of file diff --git a/packages/web/src/app/[domain]/stream_search/page.tsx b/packages/web/src/app/[domain]/stream_search/page.tsx deleted file mode 100644 index 86c1e968..00000000 --- a/packages/web/src/app/[domain]/stream_search/page.tsx +++ /dev/null @@ -1,221 +0,0 @@ -'use client'; - -import { useState } from 'react'; -import { useStreamingSearch } from './useStreamingSearch'; -import type { FileMatch__Output } from './types'; -import { Button } from '@/components/ui/button'; -import { Input } from '@/components/ui/input'; -import { Separator } from '@/components/ui/separator'; -import { Loader2 } from 'lucide-react'; - -// @nocheckin -export default function StreamSearchPage() { - const [query, setQuery] = useState('useMemo'); - const [matches, setMatches] = useState(10000); - const [contextLines, _setContextLines] = useState(5); - - const { - chunks, - isStreaming, - totalFiles, - totalMatches, - error, - streamSearch, - cancel, - reset - } = useStreamingSearch(); - - const handleSearch = () => { - streamSearch({ - query, - matches, - contextLines, - whole: false, - }); - }; - - return ( -
-
-
-

Streaming Search Demo

-

- Test the SSE streaming search API with real-time results -

-
- - - - {/* Search Controls */} -
-
-
- - setQuery(e.target.value)} - placeholder="Enter search query (e.g., useMemo, file:.tsx)" - onKeyDown={(e) => e.key === 'Enter' && handleSearch()} - disabled={isStreaming} - /> -
-
- - setMatches(Number(e.target.value))} - placeholder="Max matches" - disabled={isStreaming} - /> -
-
- -
- - {isStreaming && ( - - )} - {chunks.length > 0 && !isStreaming && ( - - )} -
-
- - - - {/* Results Stats */} - {(isStreaming || chunks.length > 0) && ( -
-
-
- Status:{' '} - {isStreaming ? ( - - 🔄 Streaming... - - ) : ( - - ✓ Complete - - )} -
-
- Chunks:{' '} - {chunks.length} -
-
- Files:{' '} - {totalFiles} -
-
- Matches:{' '} - {totalMatches} -
-
-
- )} - - {/* Error Display */} - {error && ( -
-
- Error occurred: -
-
- {error.message} -
-
- )} - - {/* Results Display */} - {chunks.length > 0 && ( -
-

- Results ({chunks.length} chunks) -

- -
- {chunks.map((chunk, i) => ( -
-
-
- Chunk {i + 1} -
-
- {chunk.response_chunk?.files?.length || 0} files, {' '} - {chunk.response_chunk?.stats?.match_count || 0} matches -
-
- - {chunk.response_chunk?.files && chunk.response_chunk.files.length > 0 && ( -
- {chunk.response_chunk.files.map((file: FileMatch__Output, j: number) => { - // Decode file_name from Buffer to string - const fileName = file.file_name - ? Buffer.from(file.file_name).toString('utf-8') - : 'Unknown file'; - - return ( -
-
- 📄 {fileName} -
- {file.repository && ( -
- {file.repository} -
- )} - {file.language && ( -
- Language: {file.language} -
- )} -
- ); - })} -
- )} -
- ))} -
-
- )} - - {/* Empty State */} - {!isStreaming && chunks.length === 0 && !error && ( -
-

Enter a search query and click “Search” to start streaming results

-
- )} -
-
- ); -} \ No newline at end of file diff --git a/packages/web/src/app/[domain]/stream_search/types.ts b/packages/web/src/app/[domain]/stream_search/types.ts deleted file mode 100644 index e9e4aaf3..00000000 --- a/packages/web/src/app/[domain]/stream_search/types.ts +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Types for streaming search functionality - */ - -import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse'; -import type { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse'; -import type { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch'; -import type { Stats__Output } from '@/proto/zoekt/webserver/v1/Stats'; -import type { ChunkMatch__Output } from '@/proto/zoekt/webserver/v1/ChunkMatch'; -import type { Progress__Output } from '@/proto/zoekt/webserver/v1/Progress'; - -/** - * A single chunk received from the streaming search API - */ -export interface StreamSearchChunk { - response_chunk?: SearchResponse__Output | null; - error?: StreamSearchError; -} - -/** - * Error response from the streaming search - */ -export interface StreamSearchError { - code?: number; - message: string; -} - -/** - * Parameters for initiating a streaming search - */ -export interface StreamSearchParams { - query: string; - matches: number; - contextLines?: number; - whole?: boolean; -} - -/** - * State of the streaming search - */ -export interface StreamingSearchState { - chunks: StreamSearchChunk[]; - isStreaming: boolean; - error: Error | null; - totalFiles: number; - totalMatches: number; -} - -/** - * Return type of the useStreamingSearch hook - */ -export interface UseStreamingSearchReturn extends StreamingSearchState { - streamSearch: (params: StreamSearchParams) => Promise; - cancel: () => void; - reset: () => void; -} - -/** - * Re-export proto types for convenience - */ -export type { - StreamSearchResponse__Output, - SearchResponse__Output, - FileMatch__Output, - Stats__Output, - ChunkMatch__Output, - Progress__Output, -}; - diff --git a/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts b/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts deleted file mode 100644 index d84eeaf0..00000000 --- a/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts +++ /dev/null @@ -1,148 +0,0 @@ -'use client'; - -import { useState, useCallback, useRef } from 'react'; -import type { - StreamSearchChunk, - StreamSearchParams, - StreamingSearchState, - UseStreamingSearchReturn, -} from './types'; - -export function useStreamingSearch(): UseStreamingSearchReturn { - const [state, setState] = useState({ - chunks: [], - isStreaming: false, - error: null, - totalFiles: 0, - totalMatches: 0, - }); - - const abortControllerRef = useRef(null); - - const streamSearch = useCallback(async (params: StreamSearchParams) => { - // Cancel any existing stream - if (abortControllerRef.current) { - abortControllerRef.current.abort(); - } - - abortControllerRef.current = new AbortController(); - - setState({ - chunks: [], - isStreaming: true, - error: null, - totalFiles: 0, - totalMatches: 0, - }); - - try { - const response = await fetch('/api/stream_search', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(params), - signal: abortControllerRef.current.signal, - }); - - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`); - } - - if (!response.body) { - throw new Error('No response body'); - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - while (true as boolean) { - const { done, value } = await reader.read(); - - if (done) break; - - // Decode the chunk and add to buffer - buffer += decoder.decode(value, { stream: true }); - - // Process complete SSE messages (separated by \n\n) - const messages = buffer.split('\n\n'); - buffer = messages.pop() || ''; // Keep incomplete message in buffer - - for (const message of messages) { - if (!message.trim()) continue; - - // SSE messages start with "data: " - const dataMatch = message.match(/^data: (.+)$/); - if (!dataMatch) continue; - - const data = dataMatch[1]; - - // Check for completion signal - if (data === '[DONE]') { - setState(prev => ({ ...prev, isStreaming: false })); - return; - } - - // Parse the JSON chunk - try { - const chunk: StreamSearchChunk = JSON.parse(data); - - // Check for errors - if (chunk.error) { - throw new Error(chunk.error.message); - } - - // Update state with new chunk - setState(prev => ({ - ...prev, - chunks: [...prev.chunks, chunk], - totalFiles: prev.totalFiles + (chunk.response_chunk?.files?.length || 0), - totalMatches: prev.totalMatches + (chunk.response_chunk?.stats?.match_count || 0), - })); - } catch (parseError) { - console.error('Error parsing chunk:', parseError); - } - } - } - - setState(prev => ({ ...prev, isStreaming: false })); - } catch (error) { - if ((error as Error).name === 'AbortError') { - console.log('Stream aborted'); - } else { - setState(prev => ({ - ...prev, - isStreaming: false, - error: error as Error, - })); - } - } - }, []); - - const cancel = useCallback(() => { - if (abortControllerRef.current) { - abortControllerRef.current.abort(); - abortControllerRef.current = null; - } - setState(prev => ({ ...prev, isStreaming: false })); - }, []); - - const reset = useCallback(() => { - cancel(); - setState({ - chunks: [], - isStreaming: false, - error: null, - totalFiles: 0, - totalMatches: 0, - }); - }, [cancel]); - - return { - ...state, - streamSearch, - cancel, - reset, - }; -} \ No newline at end of file diff --git a/packages/web/src/app/api/(server)/stream_search/route.ts b/packages/web/src/app/api/(server)/stream_search/route.ts index 70a44e20..ac834d35 100644 --- a/packages/web/src/app/api/(server)/stream_search/route.ts +++ b/packages/web/src/app/api/(server)/stream_search/route.ts @@ -1,17 +1,23 @@ 'use server'; -import { NextRequest } from 'next/server'; -import * as grpc from '@grpc/grpc-js'; -import * as protoLoader from '@grpc/proto-loader'; -import * as path from 'path'; +import { searchRequestSchema } from '@/features/search/schemas'; +import { SearchResponse, SourceRange } from '@/features/search/types'; +import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError'; +import { prisma } from '@/prisma'; import type { ProtoGrpcType } from '@/proto/webserver'; -import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService'; +import { Range__Output } from '@/proto/zoekt/webserver/v1/Range'; import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest'; import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest'; import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse'; -import { env } from '@sourcebot/shared'; -import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError'; -import { searchRequestSchema } from '@/features/search/schemas'; +import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService'; +import * as grpc from '@grpc/grpc-js'; +import * as protoLoader from '@grpc/proto-loader'; +import { PrismaClient, Repo } from '@sourcebot/db'; +import { createLogger, env } from '@sourcebot/shared'; +import { NextRequest } from 'next/server'; +import * as path from 'path'; + +const logger = createLogger('streamSearchApi'); /** * Create a gRPC client for the Zoekt webserver @@ -54,18 +60,22 @@ export const POST = async (request: NextRequest) => { // Parse and validate request body const body = await request.json(); const parsed = await searchRequestSchema.safeParseAsync(body); - + if (!parsed.success) { return serviceErrorResponse(schemaValidationError(parsed.error)); } + const { query, matches, contextLines, whole } = parsed.data; + const searchRequest: SearchRequest = { query: { and: { + // @todo: we should use repo_ids to filter out repositories that the user + // has access to (if permission syncing is enabled!). children: [ { regexp: { - regexp: parsed.data.query, + regexp: query, case_sensitive: true, } } @@ -74,79 +84,20 @@ export const POST = async (request: NextRequest) => { }, opts: { chunk_matches: true, - num_context_lines: parsed.data.contextLines ?? 5, - total_max_match_count: parsed.data.matches, + max_match_display_count: matches, + total_max_match_count: matches + 1, + num_context_lines: contextLines, + whole: !!whole, + shard_max_match_count: -1, + max_wall_time: { + seconds: 0, + } }, }; - // Create ReadableStream for SSE - const stream = new ReadableStream({ - async start(controller) { - const client = createGrpcClient(); - - try { - const metadata = new grpc.Metadata(); - metadata.add('x-sourcegraph-tenant-id', '1'); + // @nocheckin: this should be using the `prisma` instance from the auth context. + const stream = await createSSESearchStream(searchRequest, prisma); - const streamRequest: StreamSearchRequest = { - request: searchRequest, - }; - - const grpcStream = client.StreamSearch(streamRequest, metadata); - - // Handle incoming data chunks - grpcStream.on('data', (chunk: StreamSearchResponse__Output) => { - try { - // SSE format: "data: {json}\n\n" - const sseData = `data: ${JSON.stringify(chunk)}\n\n`; - controller.enqueue(new TextEncoder().encode(sseData)); - } catch (error) { - console.error('Error encoding chunk:', error); - } - }); - - // Handle stream completion - grpcStream.on('end', () => { - // Send completion signal - controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); - controller.close(); - client.close(); - }); - - // Handle errors - grpcStream.on('error', (error: grpc.ServiceError) => { - console.error('gRPC stream error:', error); - - // Send error as SSE event - const errorData = `data: ${JSON.stringify({ - error: { - code: error.code, - message: error.details || error.message, - } - })}\n\n`; - controller.enqueue(new TextEncoder().encode(errorData)); - - controller.close(); - client.close(); - }); - } catch (error) { - console.error('Stream initialization error:', error); - - const errorMessage = error instanceof Error ? error.message : 'Unknown error'; - const errorData = `data: ${JSON.stringify({ - error: { message: errorMessage } - })}\n\n`; - controller.enqueue(new TextEncoder().encode(errorData)); - - controller.close(); - client.close(); - } - }, - cancel() { - // Cleanup when client cancels the stream - console.log('SSE stream cancelled by client'); - } - }); // Return streaming response with SSE headers return new Response(stream, { @@ -171,4 +122,268 @@ export const POST = async (request: NextRequest) => { } ); } -}; \ No newline at end of file +}; + +const createSSESearchStream = async (searchRequest: SearchRequest, prisma: PrismaClient): Promise => { + const client = createGrpcClient(); + let grpcStream: ReturnType | null = null; + let isStreamActive = true; + + return new ReadableStream({ + async start(controller) { + try { + // @todo: we should just disable tenant enforcement for now. + const metadata = new grpc.Metadata(); + metadata.add('x-sourcegraph-tenant-id', '1'); + + const streamRequest: StreamSearchRequest = { + request: searchRequest, + }; + + grpcStream = client.StreamSearch(streamRequest, metadata); + + // @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field + // which corresponds to the `id` in the Repo table. In order to efficiently fetch repository + // metadata when transforming (potentially thousands) of file matches, we aggregate a unique + // set of repository ids* and map them to their corresponding Repo record. + // + // *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`? + // A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was + // always undefined. To make this a non-breaking change, we fallback to using the repository's name + // (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in + // practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo', + // 'gitea.com/org/repo', etc.). + // + // Note: When a repository is re-indexed (every hour) this ID will be populated. + // @see: https://github.com/sourcebot-dev/zoekt/pull/6 + const repos = new Map(); + + // Handle incoming data chunks + grpcStream.on('data', async (chunk: StreamSearchResponse__Output) => { + console.log('chunk'); + + if (!isStreamActive) { + return; + } + + // grpcStream.on doesn't actually await on our handler, so we need to + // explicitly pause the stream here to prevent the stream from completing + // prior to our asynchronous work being completed. + grpcStream?.pause(); + + try { + if (!chunk.response_chunk) { + logger.warn('No response chunk received'); + return; + } + + const files = (await Promise.all(chunk.response_chunk.files.map(async (file) => { + const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name); + + const identifier = file.repository_id ?? file.repository; + + // If the repository is not in the map, fetch it from the database. + if (!repos.has(identifier)) { + const repo = typeof identifier === 'number' ? + await prisma.repo.findUnique({ + where: { + id: identifier, + }, + }) : + await prisma.repo.findFirst({ + where: { + name: identifier, + }, + }); + + if (repo) { + repos.set(identifier, repo); + } + } + + + const repo = repos.get(identifier); + + // This can happen if the user doesn't have access to the repository. + if (!repo) { + return undefined; + } + + // @todo: address "file_name might not be a valid UTF-8 string" warning. + const fileName = file.file_name.toString('utf-8'); + + const convertRange = (range: Range__Output): SourceRange => ({ + start: { + byteOffset: range.start?.byte_offset ?? 0, + column: range.start?.column ?? 0, + lineNumber: range.start?.line_number ?? 0, + }, + end: { + byteOffset: range.end?.byte_offset ?? 0, + column: range.end?.column ?? 0, + lineNumber: range.end?.line_number ?? 0, + } + }) + + return { + fileName: { + text: fileName, + matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [], + }, + repository: repo.name, + repositoryId: repo.id, + language: file.language, + // @todo: we will need to have a mechanism of forming the file's web url. + webUrl: '', + chunks: file.chunk_matches + .filter((chunk) => !chunk.file_name) // filter out filename chunks. + .map((chunk) => { + return { + content: chunk.content.toString('utf-8'), + matchRanges: chunk.ranges.map(convertRange), + contentStart: chunk.content_start ? { + byteOffset: chunk.content_start.byte_offset ?? 0, + column: chunk.content_start.column ?? 0, + lineNumber: chunk.content_start.line_number ?? 0, + // @nocheckin: Will need to figure out how to handle this case. + } : { + byteOffset: 0, + column: 0, + lineNumber: 0, + }, + symbols: chunk.symbol_info.map((symbol) => { + return { + symbol: symbol.sym, + kind: symbol.kind, + parent: symbol.parent ? { + symbol: symbol.parent, + kind: symbol.parent_kind, + } : undefined, + } + }) + } + }), + branches: file.branches, + content: file.content ? file.content.toString('utf-8') : undefined, + } + }))).filter(file => file !== undefined); + + const actualMatchCount = files.reduce( + (acc, file) => + // Match count is the sum of the number of chunk matches and file name matches. + acc + file.chunks.reduce( + (acc, chunk) => acc + chunk.matchRanges.length, + 0, + ) + file.fileName.matchRanges.length, + 0, + ); + + const response: SearchResponse = { + files, + repositoryInfo: Array.from(repos.values()).map((repo) => ({ + id: repo.id, + codeHostType: repo.external_codeHostType, + name: repo.name, + displayName: repo.displayName ?? undefined, + webUrl: repo.webUrl ?? undefined, + })), + isBranchFilteringEnabled: false, + // @todo: we will need to figure out how to handle if a search is exhaustive or not + isSearchExhaustive: false, + stats: { + actualMatchCount, + // @todo: todo - + totalMatchCount: 0, + duration: chunk.response_chunk.stats?.duration?.nanos ?? 0, + fileCount: chunk.response_chunk.stats?.file_count.valueOf() ?? 0, + filesSkipped: chunk.response_chunk.stats?.files_skipped.valueOf() ?? 0, + contentBytesLoaded: chunk.response_chunk.stats?.content_bytes_loaded.valueOf() ?? 0, + indexBytesLoaded: chunk.response_chunk.stats?.index_bytes_loaded.valueOf() ?? 0, + crashes: chunk.response_chunk.stats?.crashes.valueOf() ?? 0, + shardFilesConsidered: chunk.response_chunk.stats?.shard_files_considered.valueOf() ?? 0, + filesConsidered: chunk.response_chunk.stats?.files_considered.valueOf() ?? 0, + filesLoaded: chunk.response_chunk.stats?.files_loaded.valueOf() ?? 0, + shardsScanned: chunk.response_chunk.stats?.shards_scanned.valueOf() ?? 0, + shardsSkipped: chunk.response_chunk.stats?.shards_skipped.valueOf() ?? 0, + shardsSkippedFilter: chunk.response_chunk.stats?.shards_skipped_filter.valueOf() ?? 0, + ngramMatches: chunk.response_chunk.stats?.ngram_matches.valueOf() ?? 0, + ngramLookups: chunk.response_chunk.stats?.ngram_lookups.valueOf() ?? 0, + wait: chunk.response_chunk.stats?.wait?.nanos ?? 0, + matchTreeConstruction: chunk.response_chunk.stats?.match_tree_construction?.nanos ?? 0, + matchTreeSearch: chunk.response_chunk.stats?.match_tree_search?.nanos ?? 0, + regexpsConsidered: chunk.response_chunk.stats?.regexps_considered.valueOf() ?? 0, + // @todo: handle this. + flushReason: 0, + } + } + + const sseData = `data: ${JSON.stringify(response)}\n\n`; + controller.enqueue(new TextEncoder().encode(sseData)); + } catch (error) { + console.error('Error encoding chunk:', error); + } finally { + grpcStream?.resume(); + } + }); + + // Handle stream completion + grpcStream.on('end', () => { + if (!isStreamActive) { + return; + } + isStreamActive = false; + + // Send completion signal + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); + controller.close(); + console.log('SSE stream completed'); + client.close(); + }); + + // Handle errors + grpcStream.on('error', (error: grpc.ServiceError) => { + console.error('gRPC stream error:', error); + + if (!isStreamActive) { + return; + } + isStreamActive = false; + + // Send error as SSE event + const errorData = `data: ${JSON.stringify({ + error: { + code: error.code, + message: error.details || error.message, + } + })}\n\n`; + controller.enqueue(new TextEncoder().encode(errorData)); + + controller.close(); + client.close(); + }); + } catch (error) { + console.error('Stream initialization error:', error); + + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + const errorData = `data: ${JSON.stringify({ + error: { message: errorMessage } + })}\n\n`; + controller.enqueue(new TextEncoder().encode(errorData)); + + controller.close(); + client.close(); + } + }, + cancel() { + console.log('SSE stream cancelled by client'); + isStreamActive = false; + + // Cancel the gRPC stream to stop receiving data + if (grpcStream) { + grpcStream.cancel(); + } + + client.close(); + } + }); +} \ No newline at end of file diff --git a/packages/web/src/features/search/searchApi.ts b/packages/web/src/features/search/searchApi.ts index 1ca57ef4..ee703759 100644 --- a/packages/web/src/features/search/searchApi.ts +++ b/packages/web/src/features/search/searchApi.ts @@ -216,7 +216,7 @@ export const search = async ({ query, matches, contextLines, whole }: SearchRequ return invalidZoektResponse(searchResponse); } - const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse) => { + const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse): Promise => { // @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field // which corresponds to the `id` in the Repo table. In order to efficiently fetch repository // metadata when transforming (potentially thousands) of file matches, we aggregate a unique