wip: make stream search api follow existing schema. Modify UI to support streaming

This commit is contained in:
bkellam 2025-11-14 20:53:55 -08:00
parent cca3d30b4a
commit 9cd32362e8
7 changed files with 582 additions and 634 deletions

View file

@ -12,24 +12,22 @@ import {
import { Separator } from "@/components/ui/separator";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { RepositoryInfo, SearchResultFile, SearchStats } from "@/features/search/types";
import useCaptureEvent from "@/hooks/useCaptureEvent";
import { useDomain } from "@/hooks/useDomain";
import { useNonEmptyQueryParam } from "@/hooks/useNonEmptyQueryParam";
import { useSearchHistory } from "@/hooks/useSearchHistory";
import { SearchQueryParams } from "@/lib/types";
import { createPathWithQueryParams, measure, unwrapServiceError } from "@/lib/utils";
import { InfoCircledIcon, SymbolIcon } from "@radix-ui/react-icons";
import { useQuery } from "@tanstack/react-query";
import { createPathWithQueryParams } from "@/lib/utils";
import { InfoCircledIcon } from "@radix-ui/react-icons";
import { useLocalStorage } from "@uidotdev/usehooks";
import { AlertTriangleIcon, BugIcon, FilterIcon } from "lucide-react";
import { AlertTriangleIcon, BugIcon, FilterIcon, RefreshCcwIcon } from "lucide-react";
import { useRouter } from "next/navigation";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useHotkeys } from "react-hotkeys-hook";
import { ImperativePanelHandle } from "react-resizable-panels";
import { search } from "../../../api/(client)/client";
import { CopyIconButton } from "../../components/copyIconButton";
import { SearchBar } from "../../components/searchBar";
import { TopBar } from "../../components/topBar";
import { useStreamedSearch } from "../useStreamedSearch";
import { CodePreviewPanel } from "./codePreviewPanel";
import { FilterPanel } from "./filterPanel";
import { useFilteredMatches } from "./filterPanel/useFilterMatches";
@ -46,7 +44,6 @@ export const SearchResultsPage = ({
}: SearchResultsPageProps) => {
const router = useRouter();
const { setSearchHistory } = useSearchHistory();
const captureEvent = useCaptureEvent();
const domain = useDomain();
const { toast } = useToast();
@ -55,26 +52,17 @@ export const SearchResultsPage = ({
const maxMatchCount = isNaN(_maxMatchCount) ? defaultMaxMatchCount : _maxMatchCount;
const {
data: searchResponse,
isPending: isSearchPending,
isFetching: isFetching,
error
} = useQuery({
queryKey: ["search", searchQuery, maxMatchCount],
queryFn: () => measure(() => unwrapServiceError(search({
query: searchQuery,
matches: maxMatchCount,
contextLines: 3,
whole: false,
})), "client.search"),
select: ({ data, durationMs }) => ({
...data,
totalClientSearchDurationMs: durationMs,
}),
enabled: searchQuery.length > 0,
refetchOnWindowFocus: false,
retry: false,
staleTime: 0,
error,
files,
repoInfo,
durationMs,
isStreaming,
numMatches,
} = useStreamedSearch({
query: searchQuery,
matches: maxMatchCount,
contextLines: 3,
whole: false,
});
useEffect(() => {
@ -102,38 +90,39 @@ export const SearchResultsPage = ({
])
}, [searchQuery, setSearchHistory]);
useEffect(() => {
if (!searchResponse) {
return;
}
// @todo: capture search stats on completion.
// useEffect(() => {
// if (!searchResponse) {
// return;
// }
const fileLanguages = searchResponse.files?.map(file => file.language) || [];
// const fileLanguages = searchResponse.files?.map(file => file.language) || [];
captureEvent("search_finished", {
durationMs: searchResponse.totalClientSearchDurationMs,
fileCount: searchResponse.stats.fileCount,
matchCount: searchResponse.stats.totalMatchCount,
actualMatchCount: searchResponse.stats.actualMatchCount,
filesSkipped: searchResponse.stats.filesSkipped,
contentBytesLoaded: searchResponse.stats.contentBytesLoaded,
indexBytesLoaded: searchResponse.stats.indexBytesLoaded,
crashes: searchResponse.stats.crashes,
shardFilesConsidered: searchResponse.stats.shardFilesConsidered,
filesConsidered: searchResponse.stats.filesConsidered,
filesLoaded: searchResponse.stats.filesLoaded,
shardsScanned: searchResponse.stats.shardsScanned,
shardsSkipped: searchResponse.stats.shardsSkipped,
shardsSkippedFilter: searchResponse.stats.shardsSkippedFilter,
ngramMatches: searchResponse.stats.ngramMatches,
ngramLookups: searchResponse.stats.ngramLookups,
wait: searchResponse.stats.wait,
matchTreeConstruction: searchResponse.stats.matchTreeConstruction,
matchTreeSearch: searchResponse.stats.matchTreeSearch,
regexpsConsidered: searchResponse.stats.regexpsConsidered,
flushReason: searchResponse.stats.flushReason,
fileLanguages,
});
}, [captureEvent, searchQuery, searchResponse]);
// captureEvent("search_finished", {
// durationMs: searchResponse.totalClientSearchDurationMs,
// fileCount: searchResponse.stats.fileCount,
// matchCount: searchResponse.stats.totalMatchCount,
// actualMatchCount: searchResponse.stats.actualMatchCount,
// filesSkipped: searchResponse.stats.filesSkipped,
// contentBytesLoaded: searchResponse.stats.contentBytesLoaded,
// indexBytesLoaded: searchResponse.stats.indexBytesLoaded,
// crashes: searchResponse.stats.crashes,
// shardFilesConsidered: searchResponse.stats.shardFilesConsidered,
// filesConsidered: searchResponse.stats.filesConsidered,
// filesLoaded: searchResponse.stats.filesLoaded,
// shardsScanned: searchResponse.stats.shardsScanned,
// shardsSkipped: searchResponse.stats.shardsSkipped,
// shardsSkippedFilter: searchResponse.stats.shardsSkippedFilter,
// ngramMatches: searchResponse.stats.ngramMatches,
// ngramLookups: searchResponse.stats.ngramLookups,
// wait: searchResponse.stats.wait,
// matchTreeConstruction: searchResponse.stats.matchTreeConstruction,
// matchTreeSearch: searchResponse.stats.matchTreeSearch,
// regexpsConsidered: searchResponse.stats.regexpsConsidered,
// flushReason: searchResponse.stats.flushReason,
// fileLanguages,
// });
// }, [captureEvent, searchQuery, searchResponse]);
const onLoadMoreResults = useCallback(() => {
@ -157,12 +146,7 @@ export const SearchResultsPage = ({
/>
</TopBar>
{(isSearchPending || isFetching) ? (
<div className="flex flex-col items-center justify-center h-full gap-2">
<SymbolIcon className="h-6 w-6 animate-spin" />
<p className="font-semibold text-center">Searching...</p>
</div>
) : error ? (
{error ? (
<div className="flex flex-col items-center justify-center h-full gap-2">
<AlertTriangleIcon className="h-6 w-6" />
<p className="font-semibold text-center">Failed to search</p>
@ -170,14 +154,18 @@ export const SearchResultsPage = ({
</div>
) : (
<PanelGroup
fileMatches={searchResponse.files}
isMoreResultsButtonVisible={searchResponse.isSearchExhaustive === false}
fileMatches={files}
onLoadMoreResults={onLoadMoreResults}
isBranchFilteringEnabled={searchResponse.isBranchFilteringEnabled}
repoInfo={searchResponse.repositoryInfo}
searchDurationMs={searchResponse.totalClientSearchDurationMs}
numMatches={searchResponse.stats.actualMatchCount}
searchStats={searchResponse.stats}
numMatches={numMatches}
repoInfo={repoInfo}
searchDurationMs={durationMs}
isStreaming={isStreaming}
// @todo: handle search stats
searchStats={undefined}
// @todo: detect when more results are available
isMoreResultsButtonVisible={false}
// @todo: handle branch filtering
isBranchFilteringEnabled={false}
/>
)}
</div>
@ -186,10 +174,11 @@ export const SearchResultsPage = ({
interface PanelGroupProps {
fileMatches: SearchResultFile[];
isMoreResultsButtonVisible?: boolean;
onLoadMoreResults: () => void;
isStreaming: boolean;
isMoreResultsButtonVisible?: boolean;
isBranchFilteringEnabled: boolean;
repoInfo: RepositoryInfo[];
repoInfo: Record<number, RepositoryInfo>;
searchDurationMs: number;
numMatches: number;
searchStats?: SearchStats;
@ -198,9 +187,10 @@ interface PanelGroupProps {
const PanelGroup = ({
fileMatches,
isMoreResultsButtonVisible,
isStreaming,
onLoadMoreResults,
isBranchFilteringEnabled,
repoInfo: _repoInfo,
repoInfo,
searchDurationMs: _searchDurationMs,
numMatches,
searchStats,
@ -228,13 +218,6 @@ const PanelGroup = ({
return Math.round(_searchDurationMs);
}, [_searchDurationMs]);
const repoInfo = useMemo(() => {
return _repoInfo.reduce((acc, repo) => {
acc[repo.id] = repo;
return acc;
}, {} as Record<number, RepositoryInfo>);
}, [_repoInfo]);
return (
<ResizablePanelGroup
direction="horizontal"
@ -291,41 +274,53 @@ const PanelGroup = ({
order={2}
>
<div className="py-1 px-2 flex flex-row items-center">
<Tooltip>
<TooltipTrigger asChild>
<InfoCircledIcon className="w-4 h-4 mr-2" />
</TooltipTrigger>
<TooltipContent side="right" className="flex flex-col items-start gap-2 p-4">
<div className="flex flex-row items-center w-full">
<BugIcon className="w-4 h-4 mr-1.5" />
<p className="text-md font-medium">Search stats for nerds</p>
<CopyIconButton
onCopy={() => {
navigator.clipboard.writeText(JSON.stringify(searchStats, null, 2));
return true;
}}
className="ml-auto"
/>
</div>
<CodeSnippet renderNewlines>
{JSON.stringify(searchStats, null, 2)}
</CodeSnippet>
</TooltipContent>
</Tooltip>
{
fileMatches.length > 0 ? (
<p className="text-sm font-medium">{`[${searchDurationMs} ms] Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}</p>
) : (
<p className="text-sm font-medium">No results</p>
)
}
{isMoreResultsButtonVisible && (
<div
className="cursor-pointer text-blue-500 text-sm hover:underline ml-4"
onClick={onLoadMoreResults}
>
(load more)
</div>
{isStreaming ? (
<>
<RefreshCcwIcon className="h-4 w-4 animate-spin mr-2" />
<p className="text-sm font-medium mr-1">Searching...</p>
{numMatches > 0 && (
<p className="text-sm font-medium">{`Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}</p>
)}
</>
) : (
<>
<Tooltip>
<TooltipTrigger asChild>
<InfoCircledIcon className="w-4 h-4 mr-2" />
</TooltipTrigger>
<TooltipContent side="right" className="flex flex-col items-start gap-2 p-4">
<div className="flex flex-row items-center w-full">
<BugIcon className="w-4 h-4 mr-1.5" />
<p className="text-md font-medium">Search stats for nerds</p>
<CopyIconButton
onCopy={() => {
navigator.clipboard.writeText(JSON.stringify(searchStats, null, 2));
return true;
}}
className="ml-auto"
/>
</div>
<CodeSnippet renderNewlines>
{JSON.stringify(searchStats, null, 2)}
</CodeSnippet>
</TooltipContent>
</Tooltip>
{
fileMatches.length > 0 ? (
<p className="text-sm font-medium">{`[${searchDurationMs} ms] Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}</p>
) : (
<p className="text-sm font-medium">No results</p>
)
}
{isMoreResultsButtonVisible && (
<div
className="cursor-pointer text-blue-500 text-sm hover:underline ml-4"
onClick={onLoadMoreResults}
>
(load more)
</div>
)}
</>
)}
</div>
{filteredFileMatches.length > 0 ? (
@ -340,6 +335,11 @@ const PanelGroup = ({
isBranchFilteringEnabled={isBranchFilteringEnabled}
repoInfo={repoInfo}
/>
) : isStreaming ? (
<div className="flex flex-col items-center justify-center h-full gap-2">
<RefreshCcwIcon className="h-6 w-6 animate-spin" />
<p className="font-semibold text-center">Searching...</p>
</div>
) : (
<div className="flex flex-col items-center justify-center h-full">
<p className="text-sm text-muted-foreground">No results found</p>

View file

@ -0,0 +1,171 @@
'use client';
import { RepositoryInfo, SearchRequest, SearchResponse, SearchResultFile } from '@/features/search/types';
import { useState, useCallback, useRef, useEffect } from 'react';
export const useStreamedSearch = ({ query, matches, contextLines, whole }: SearchRequest) => {
const [state, setState] = useState<{
isStreaming: boolean,
error: Error | null,
files: SearchResultFile[],
repoInfo: Record<number, RepositoryInfo>,
durationMs: number,
numMatches: number,
}>({
isStreaming: false,
error: null,
files: [],
repoInfo: {},
durationMs: 0,
numMatches: 0,
});
const abortControllerRef = useRef<AbortController | null>(null);
const cancel = useCallback(() => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
setState(prev => ({
...prev,
isStreaming: false,
}));
}, []);
useEffect(() => {
const search = async () => {
const startTime = performance.now();
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
abortControllerRef.current = new AbortController();
setState({
isStreaming: true,
error: null,
files: [],
repoInfo: {},
durationMs: 0,
numMatches: 0,
});
try {
const response = await fetch('/api/stream_search', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
query,
matches,
contextLines,
whole,
}),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
if (!response.body) {
throw new Error('No response body');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true as boolean) {
const { done, value } = await reader.read();
if (done) {
break;
}
// Decode the chunk and add to buffer
buffer += decoder.decode(value, { stream: true });
// Process complete SSE messages (separated by \n\n)
const messages = buffer.split('\n\n');
// Keep the last element (potentially incomplete message) in the buffer for the next chunk.
// Stream chunks can split messages mid-way, so we only process complete messages.
buffer = messages.pop() || '';
for (const message of messages) {
if (!message.trim()) {
continue;
}
// SSE messages start with "data: "
const dataMatch = message.match(/^data: (.+)$/);
if (!dataMatch) continue;
const data = dataMatch[1];
// Check for completion signal
if (data === '[DONE]') {
setState(prev => ({ ...prev, isStreaming: false }));
return;
}
try {
const chunk: SearchResponse = JSON.parse(data);
setState(prev => ({
...prev,
files: [
...prev.files,
...chunk.files
],
repoInfo: {
...prev.repoInfo,
...chunk.repositoryInfo.reduce((acc, repo) => {
acc[repo.id] = repo;
return acc;
}, {} as Record<number, RepositoryInfo>),
},
numMatches: prev.numMatches + chunk.stats.actualMatchCount,
}));
} catch (parseError) {
console.error('Error parsing chunk:', parseError);
}
}
}
setState(prev => ({ ...prev, isStreaming: false }));
} catch (error) {
if ((error as Error).name === 'AbortError') {
console.log('Stream aborted');
} else {
setState(prev => ({
...prev,
isStreaming: false,
error: error as Error,
}));
}
} finally {
const endTime = performance.now();
const durationMs = endTime - startTime;
setState(prev => ({
...prev,
durationMs,
}));
}
}
search();
return () => {
}
}, [query, matches, contextLines, whole]);
return {
...state,
cancel,
};
}

View file

@ -1,221 +0,0 @@
'use client';
import { useState } from 'react';
import { useStreamingSearch } from './useStreamingSearch';
import type { FileMatch__Output } from './types';
import { Button } from '@/components/ui/button';
import { Input } from '@/components/ui/input';
import { Separator } from '@/components/ui/separator';
import { Loader2 } from 'lucide-react';
// @nocheckin
export default function StreamSearchPage() {
const [query, setQuery] = useState('useMemo');
const [matches, setMatches] = useState(10000);
const [contextLines, _setContextLines] = useState(5);
const {
chunks,
isStreaming,
totalFiles,
totalMatches,
error,
streamSearch,
cancel,
reset
} = useStreamingSearch();
const handleSearch = () => {
streamSearch({
query,
matches,
contextLines,
whole: false,
});
};
return (
<div className="container mx-auto p-6 max-w-6xl">
<div className="space-y-6">
<div>
<h1 className="text-3xl font-bold mb-2">Streaming Search Demo</h1>
<p className="text-muted-foreground">
Test the SSE streaming search API with real-time results
</p>
</div>
<Separator />
{/* Search Controls */}
<div className="space-y-4">
<div className="grid grid-cols-1 md:grid-cols-3 gap-4">
<div className="md:col-span-2">
<label className="text-sm font-medium mb-1.5 block">
Search Query
</label>
<Input
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Enter search query (e.g., useMemo, file:.tsx)"
onKeyDown={(e) => e.key === 'Enter' && handleSearch()}
disabled={isStreaming}
/>
</div>
<div>
<label className="text-sm font-medium mb-1.5 block">
Max Matches
</label>
<Input
type="number"
value={matches}
onChange={(e) => setMatches(Number(e.target.value))}
placeholder="Max matches"
disabled={isStreaming}
/>
</div>
</div>
<div className="flex gap-2">
<Button
onClick={handleSearch}
disabled={isStreaming || !query}
className="w-32"
>
{isStreaming ? (
<>
<Loader2 className="w-4 h-4 mr-2 animate-spin" />
Searching
</>
) : (
'Search'
)}
</Button>
{isStreaming && (
<Button onClick={cancel} variant="destructive">
Cancel
</Button>
)}
{chunks.length > 0 && !isStreaming && (
<Button onClick={reset} variant="outline">
Clear Results
</Button>
)}
</div>
</div>
<Separator />
{/* Results Stats */}
{(isStreaming || chunks.length > 0) && (
<div className="bg-muted/50 rounded-lg p-4">
<div className="flex items-center gap-6 text-sm">
<div>
<span className="font-semibold">Status:</span>{' '}
{isStreaming ? (
<span className="text-blue-600 dark:text-blue-400">
🔄 Streaming...
</span>
) : (
<span className="text-green-600 dark:text-green-400">
Complete
</span>
)}
</div>
<div>
<span className="font-semibold">Chunks:</span>{' '}
{chunks.length}
</div>
<div>
<span className="font-semibold">Files:</span>{' '}
{totalFiles}
</div>
<div>
<span className="font-semibold">Matches:</span>{' '}
{totalMatches}
</div>
</div>
</div>
)}
{/* Error Display */}
{error && (
<div className="bg-destructive/10 border border-destructive/30 rounded-lg p-4">
<div className="font-semibold text-destructive mb-1">
Error occurred:
</div>
<div className="text-sm text-destructive/80">
{error.message}
</div>
</div>
)}
{/* Results Display */}
{chunks.length > 0 && (
<div className="space-y-4">
<h2 className="text-xl font-semibold">
Results ({chunks.length} chunks)
</h2>
<div className="space-y-3">
{chunks.map((chunk, i) => (
<div
key={i}
className="border rounded-lg p-4 bg-card"
>
<div className="flex items-center justify-between mb-3">
<div className="text-sm font-medium text-muted-foreground">
Chunk {i + 1}
</div>
<div className="text-sm text-muted-foreground">
{chunk.response_chunk?.files?.length || 0} files, {' '}
{chunk.response_chunk?.stats?.match_count || 0} matches
</div>
</div>
{chunk.response_chunk?.files && chunk.response_chunk.files.length > 0 && (
<div className="space-y-2">
{chunk.response_chunk.files.map((file: FileMatch__Output, j: number) => {
// Decode file_name from Buffer to string
const fileName = file.file_name
? Buffer.from(file.file_name).toString('utf-8')
: 'Unknown file';
return (
<div
key={j}
className="text-sm pl-4 border-l-2 border-muted-foreground/20 py-1"
>
<div className="font-mono">
📄 {fileName}
</div>
{file.repository && (
<div className="text-xs text-muted-foreground mt-0.5">
{file.repository}
</div>
)}
{file.language && (
<div className="text-xs text-muted-foreground">
Language: {file.language}
</div>
)}
</div>
);
})}
</div>
)}
</div>
))}
</div>
</div>
)}
{/* Empty State */}
{!isStreaming && chunks.length === 0 && !error && (
<div className="text-center py-12 text-muted-foreground">
<p>Enter a search query and click &ldquo;Search&rdquo; to start streaming results</p>
</div>
)}
</div>
</div>
);
}

View file

@ -1,69 +0,0 @@
/**
* Types for streaming search functionality
*/
import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
import type { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse';
import type { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch';
import type { Stats__Output } from '@/proto/zoekt/webserver/v1/Stats';
import type { ChunkMatch__Output } from '@/proto/zoekt/webserver/v1/ChunkMatch';
import type { Progress__Output } from '@/proto/zoekt/webserver/v1/Progress';
/**
* A single chunk received from the streaming search API
*/
export interface StreamSearchChunk {
response_chunk?: SearchResponse__Output | null;
error?: StreamSearchError;
}
/**
* Error response from the streaming search
*/
export interface StreamSearchError {
code?: number;
message: string;
}
/**
* Parameters for initiating a streaming search
*/
export interface StreamSearchParams {
query: string;
matches: number;
contextLines?: number;
whole?: boolean;
}
/**
* State of the streaming search
*/
export interface StreamingSearchState {
chunks: StreamSearchChunk[];
isStreaming: boolean;
error: Error | null;
totalFiles: number;
totalMatches: number;
}
/**
* Return type of the useStreamingSearch hook
*/
export interface UseStreamingSearchReturn extends StreamingSearchState {
streamSearch: (params: StreamSearchParams) => Promise<void>;
cancel: () => void;
reset: () => void;
}
/**
* Re-export proto types for convenience
*/
export type {
StreamSearchResponse__Output,
SearchResponse__Output,
FileMatch__Output,
Stats__Output,
ChunkMatch__Output,
Progress__Output,
};

View file

@ -1,148 +0,0 @@
'use client';
import { useState, useCallback, useRef } from 'react';
import type {
StreamSearchChunk,
StreamSearchParams,
StreamingSearchState,
UseStreamingSearchReturn,
} from './types';
export function useStreamingSearch(): UseStreamingSearchReturn {
const [state, setState] = useState<StreamingSearchState>({
chunks: [],
isStreaming: false,
error: null,
totalFiles: 0,
totalMatches: 0,
});
const abortControllerRef = useRef<AbortController | null>(null);
const streamSearch = useCallback(async (params: StreamSearchParams) => {
// Cancel any existing stream
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
abortControllerRef.current = new AbortController();
setState({
chunks: [],
isStreaming: true,
error: null,
totalFiles: 0,
totalMatches: 0,
});
try {
const response = await fetch('/api/stream_search', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(params),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
if (!response.body) {
throw new Error('No response body');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true as boolean) {
const { done, value } = await reader.read();
if (done) break;
// Decode the chunk and add to buffer
buffer += decoder.decode(value, { stream: true });
// Process complete SSE messages (separated by \n\n)
const messages = buffer.split('\n\n');
buffer = messages.pop() || ''; // Keep incomplete message in buffer
for (const message of messages) {
if (!message.trim()) continue;
// SSE messages start with "data: "
const dataMatch = message.match(/^data: (.+)$/);
if (!dataMatch) continue;
const data = dataMatch[1];
// Check for completion signal
if (data === '[DONE]') {
setState(prev => ({ ...prev, isStreaming: false }));
return;
}
// Parse the JSON chunk
try {
const chunk: StreamSearchChunk = JSON.parse(data);
// Check for errors
if (chunk.error) {
throw new Error(chunk.error.message);
}
// Update state with new chunk
setState(prev => ({
...prev,
chunks: [...prev.chunks, chunk],
totalFiles: prev.totalFiles + (chunk.response_chunk?.files?.length || 0),
totalMatches: prev.totalMatches + (chunk.response_chunk?.stats?.match_count || 0),
}));
} catch (parseError) {
console.error('Error parsing chunk:', parseError);
}
}
}
setState(prev => ({ ...prev, isStreaming: false }));
} catch (error) {
if ((error as Error).name === 'AbortError') {
console.log('Stream aborted');
} else {
setState(prev => ({
...prev,
isStreaming: false,
error: error as Error,
}));
}
}
}, []);
const cancel = useCallback(() => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
setState(prev => ({ ...prev, isStreaming: false }));
}, []);
const reset = useCallback(() => {
cancel();
setState({
chunks: [],
isStreaming: false,
error: null,
totalFiles: 0,
totalMatches: 0,
});
}, [cancel]);
return {
...state,
streamSearch,
cancel,
reset,
};
}

View file

@ -1,17 +1,23 @@
'use server';
import { NextRequest } from 'next/server';
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import * as path from 'path';
import { searchRequestSchema } from '@/features/search/schemas';
import { SearchResponse, SourceRange } from '@/features/search/types';
import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError';
import { prisma } from '@/prisma';
import type { ProtoGrpcType } from '@/proto/webserver';
import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
import { Range__Output } from '@/proto/zoekt/webserver/v1/Range';
import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest';
import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest';
import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
import { env } from '@sourcebot/shared';
import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError';
import { searchRequestSchema } from '@/features/search/schemas';
import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { PrismaClient, Repo } from '@sourcebot/db';
import { createLogger, env } from '@sourcebot/shared';
import { NextRequest } from 'next/server';
import * as path from 'path';
const logger = createLogger('streamSearchApi');
/**
* Create a gRPC client for the Zoekt webserver
@ -54,18 +60,22 @@ export const POST = async (request: NextRequest) => {
// Parse and validate request body
const body = await request.json();
const parsed = await searchRequestSchema.safeParseAsync(body);
if (!parsed.success) {
return serviceErrorResponse(schemaValidationError(parsed.error));
}
const { query, matches, contextLines, whole } = parsed.data;
const searchRequest: SearchRequest = {
query: {
and: {
// @todo: we should use repo_ids to filter out repositories that the user
// has access to (if permission syncing is enabled!).
children: [
{
regexp: {
regexp: parsed.data.query,
regexp: query,
case_sensitive: true,
}
}
@ -74,79 +84,20 @@ export const POST = async (request: NextRequest) => {
},
opts: {
chunk_matches: true,
num_context_lines: parsed.data.contextLines ?? 5,
total_max_match_count: parsed.data.matches,
max_match_display_count: matches,
total_max_match_count: matches + 1,
num_context_lines: contextLines,
whole: !!whole,
shard_max_match_count: -1,
max_wall_time: {
seconds: 0,
}
},
};
// Create ReadableStream for SSE
const stream = new ReadableStream({
async start(controller) {
const client = createGrpcClient();
try {
const metadata = new grpc.Metadata();
metadata.add('x-sourcegraph-tenant-id', '1');
// @nocheckin: this should be using the `prisma` instance from the auth context.
const stream = await createSSESearchStream(searchRequest, prisma);
const streamRequest: StreamSearchRequest = {
request: searchRequest,
};
const grpcStream = client.StreamSearch(streamRequest, metadata);
// Handle incoming data chunks
grpcStream.on('data', (chunk: StreamSearchResponse__Output) => {
try {
// SSE format: "data: {json}\n\n"
const sseData = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(new TextEncoder().encode(sseData));
} catch (error) {
console.error('Error encoding chunk:', error);
}
});
// Handle stream completion
grpcStream.on('end', () => {
// Send completion signal
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
controller.close();
client.close();
});
// Handle errors
grpcStream.on('error', (error: grpc.ServiceError) => {
console.error('gRPC stream error:', error);
// Send error as SSE event
const errorData = `data: ${JSON.stringify({
error: {
code: error.code,
message: error.details || error.message,
}
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close();
client.close();
});
} catch (error) {
console.error('Stream initialization error:', error);
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
const errorData = `data: ${JSON.stringify({
error: { message: errorMessage }
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close();
client.close();
}
},
cancel() {
// Cleanup when client cancels the stream
console.log('SSE stream cancelled by client');
}
});
// Return streaming response with SSE headers
return new Response(stream, {
@ -171,4 +122,268 @@ export const POST = async (request: NextRequest) => {
}
);
}
};
};
const createSSESearchStream = async (searchRequest: SearchRequest, prisma: PrismaClient): Promise<ReadableStream> => {
const client = createGrpcClient();
let grpcStream: ReturnType<WebserverServiceClient['StreamSearch']> | null = null;
let isStreamActive = true;
return new ReadableStream({
async start(controller) {
try {
// @todo: we should just disable tenant enforcement for now.
const metadata = new grpc.Metadata();
metadata.add('x-sourcegraph-tenant-id', '1');
const streamRequest: StreamSearchRequest = {
request: searchRequest,
};
grpcStream = client.StreamSearch(streamRequest, metadata);
// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
// metadata when transforming (potentially thousands) of file matches, we aggregate a unique
// set of repository ids* and map them to their corresponding Repo record.
//
// *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`?
// A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was
// always undefined. To make this a non-breaking change, we fallback to using the repository's name
// (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in
// practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo',
// 'gitea.com/org/repo', etc.).
//
// Note: When a repository is re-indexed (every hour) this ID will be populated.
// @see: https://github.com/sourcebot-dev/zoekt/pull/6
const repos = new Map<string | number, Repo>();
// Handle incoming data chunks
grpcStream.on('data', async (chunk: StreamSearchResponse__Output) => {
console.log('chunk');
if (!isStreamActive) {
return;
}
// grpcStream.on doesn't actually await on our handler, so we need to
// explicitly pause the stream here to prevent the stream from completing
// prior to our asynchronous work being completed.
grpcStream?.pause();
try {
if (!chunk.response_chunk) {
logger.warn('No response chunk received');
return;
}
const files = (await Promise.all(chunk.response_chunk.files.map(async (file) => {
const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name);
const identifier = file.repository_id ?? file.repository;
// If the repository is not in the map, fetch it from the database.
if (!repos.has(identifier)) {
const repo = typeof identifier === 'number' ?
await prisma.repo.findUnique({
where: {
id: identifier,
},
}) :
await prisma.repo.findFirst({
where: {
name: identifier,
},
});
if (repo) {
repos.set(identifier, repo);
}
}
const repo = repos.get(identifier);
// This can happen if the user doesn't have access to the repository.
if (!repo) {
return undefined;
}
// @todo: address "file_name might not be a valid UTF-8 string" warning.
const fileName = file.file_name.toString('utf-8');
const convertRange = (range: Range__Output): SourceRange => ({
start: {
byteOffset: range.start?.byte_offset ?? 0,
column: range.start?.column ?? 0,
lineNumber: range.start?.line_number ?? 0,
},
end: {
byteOffset: range.end?.byte_offset ?? 0,
column: range.end?.column ?? 0,
lineNumber: range.end?.line_number ?? 0,
}
})
return {
fileName: {
text: fileName,
matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [],
},
repository: repo.name,
repositoryId: repo.id,
language: file.language,
// @todo: we will need to have a mechanism of forming the file's web url.
webUrl: '',
chunks: file.chunk_matches
.filter((chunk) => !chunk.file_name) // filter out filename chunks.
.map((chunk) => {
return {
content: chunk.content.toString('utf-8'),
matchRanges: chunk.ranges.map(convertRange),
contentStart: chunk.content_start ? {
byteOffset: chunk.content_start.byte_offset ?? 0,
column: chunk.content_start.column ?? 0,
lineNumber: chunk.content_start.line_number ?? 0,
// @nocheckin: Will need to figure out how to handle this case.
} : {
byteOffset: 0,
column: 0,
lineNumber: 0,
},
symbols: chunk.symbol_info.map((symbol) => {
return {
symbol: symbol.sym,
kind: symbol.kind,
parent: symbol.parent ? {
symbol: symbol.parent,
kind: symbol.parent_kind,
} : undefined,
}
})
}
}),
branches: file.branches,
content: file.content ? file.content.toString('utf-8') : undefined,
}
}))).filter(file => file !== undefined);
const actualMatchCount = files.reduce(
(acc, file) =>
// Match count is the sum of the number of chunk matches and file name matches.
acc + file.chunks.reduce(
(acc, chunk) => acc + chunk.matchRanges.length,
0,
) + file.fileName.matchRanges.length,
0,
);
const response: SearchResponse = {
files,
repositoryInfo: Array.from(repos.values()).map((repo) => ({
id: repo.id,
codeHostType: repo.external_codeHostType,
name: repo.name,
displayName: repo.displayName ?? undefined,
webUrl: repo.webUrl ?? undefined,
})),
isBranchFilteringEnabled: false,
// @todo: we will need to figure out how to handle if a search is exhaustive or not
isSearchExhaustive: false,
stats: {
actualMatchCount,
// @todo: todo -
totalMatchCount: 0,
duration: chunk.response_chunk.stats?.duration?.nanos ?? 0,
fileCount: chunk.response_chunk.stats?.file_count.valueOf() ?? 0,
filesSkipped: chunk.response_chunk.stats?.files_skipped.valueOf() ?? 0,
contentBytesLoaded: chunk.response_chunk.stats?.content_bytes_loaded.valueOf() ?? 0,
indexBytesLoaded: chunk.response_chunk.stats?.index_bytes_loaded.valueOf() ?? 0,
crashes: chunk.response_chunk.stats?.crashes.valueOf() ?? 0,
shardFilesConsidered: chunk.response_chunk.stats?.shard_files_considered.valueOf() ?? 0,
filesConsidered: chunk.response_chunk.stats?.files_considered.valueOf() ?? 0,
filesLoaded: chunk.response_chunk.stats?.files_loaded.valueOf() ?? 0,
shardsScanned: chunk.response_chunk.stats?.shards_scanned.valueOf() ?? 0,
shardsSkipped: chunk.response_chunk.stats?.shards_skipped.valueOf() ?? 0,
shardsSkippedFilter: chunk.response_chunk.stats?.shards_skipped_filter.valueOf() ?? 0,
ngramMatches: chunk.response_chunk.stats?.ngram_matches.valueOf() ?? 0,
ngramLookups: chunk.response_chunk.stats?.ngram_lookups.valueOf() ?? 0,
wait: chunk.response_chunk.stats?.wait?.nanos ?? 0,
matchTreeConstruction: chunk.response_chunk.stats?.match_tree_construction?.nanos ?? 0,
matchTreeSearch: chunk.response_chunk.stats?.match_tree_search?.nanos ?? 0,
regexpsConsidered: chunk.response_chunk.stats?.regexps_considered.valueOf() ?? 0,
// @todo: handle this.
flushReason: 0,
}
}
const sseData = `data: ${JSON.stringify(response)}\n\n`;
controller.enqueue(new TextEncoder().encode(sseData));
} catch (error) {
console.error('Error encoding chunk:', error);
} finally {
grpcStream?.resume();
}
});
// Handle stream completion
grpcStream.on('end', () => {
if (!isStreamActive) {
return;
}
isStreamActive = false;
// Send completion signal
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
controller.close();
console.log('SSE stream completed');
client.close();
});
// Handle errors
grpcStream.on('error', (error: grpc.ServiceError) => {
console.error('gRPC stream error:', error);
if (!isStreamActive) {
return;
}
isStreamActive = false;
// Send error as SSE event
const errorData = `data: ${JSON.stringify({
error: {
code: error.code,
message: error.details || error.message,
}
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close();
client.close();
});
} catch (error) {
console.error('Stream initialization error:', error);
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
const errorData = `data: ${JSON.stringify({
error: { message: errorMessage }
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close();
client.close();
}
},
cancel() {
console.log('SSE stream cancelled by client');
isStreamActive = false;
// Cancel the gRPC stream to stop receiving data
if (grpcStream) {
grpcStream.cancel();
}
client.close();
}
});
}

View file

@ -216,7 +216,7 @@ export const search = async ({ query, matches, contextLines, whole }: SearchRequ
return invalidZoektResponse(searchResponse);
}
const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse) => {
const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse): Promise<SearchResponse> => {
// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
// metadata when transforming (potentially thousands) of file matches, we aggregate a unique