diff --git a/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx b/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx
index 9c33e11d..7bacb12c 100644
--- a/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx
+++ b/packages/web/src/app/[domain]/search/components/searchResultsPage.tsx
@@ -12,24 +12,22 @@ import {
import { Separator } from "@/components/ui/separator";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
import { RepositoryInfo, SearchResultFile, SearchStats } from "@/features/search/types";
-import useCaptureEvent from "@/hooks/useCaptureEvent";
import { useDomain } from "@/hooks/useDomain";
import { useNonEmptyQueryParam } from "@/hooks/useNonEmptyQueryParam";
import { useSearchHistory } from "@/hooks/useSearchHistory";
import { SearchQueryParams } from "@/lib/types";
-import { createPathWithQueryParams, measure, unwrapServiceError } from "@/lib/utils";
-import { InfoCircledIcon, SymbolIcon } from "@radix-ui/react-icons";
-import { useQuery } from "@tanstack/react-query";
+import { createPathWithQueryParams } from "@/lib/utils";
+import { InfoCircledIcon } from "@radix-ui/react-icons";
import { useLocalStorage } from "@uidotdev/usehooks";
-import { AlertTriangleIcon, BugIcon, FilterIcon } from "lucide-react";
+import { AlertTriangleIcon, BugIcon, FilterIcon, RefreshCcwIcon } from "lucide-react";
import { useRouter } from "next/navigation";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useHotkeys } from "react-hotkeys-hook";
import { ImperativePanelHandle } from "react-resizable-panels";
-import { search } from "../../../api/(client)/client";
import { CopyIconButton } from "../../components/copyIconButton";
import { SearchBar } from "../../components/searchBar";
import { TopBar } from "../../components/topBar";
+import { useStreamedSearch } from "../useStreamedSearch";
import { CodePreviewPanel } from "./codePreviewPanel";
import { FilterPanel } from "./filterPanel";
import { useFilteredMatches } from "./filterPanel/useFilterMatches";
@@ -46,7 +44,6 @@ export const SearchResultsPage = ({
}: SearchResultsPageProps) => {
const router = useRouter();
const { setSearchHistory } = useSearchHistory();
- const captureEvent = useCaptureEvent();
const domain = useDomain();
const { toast } = useToast();
@@ -55,26 +52,17 @@ export const SearchResultsPage = ({
const maxMatchCount = isNaN(_maxMatchCount) ? defaultMaxMatchCount : _maxMatchCount;
const {
- data: searchResponse,
- isPending: isSearchPending,
- isFetching: isFetching,
- error
- } = useQuery({
- queryKey: ["search", searchQuery, maxMatchCount],
- queryFn: () => measure(() => unwrapServiceError(search({
- query: searchQuery,
- matches: maxMatchCount,
- contextLines: 3,
- whole: false,
- })), "client.search"),
- select: ({ data, durationMs }) => ({
- ...data,
- totalClientSearchDurationMs: durationMs,
- }),
- enabled: searchQuery.length > 0,
- refetchOnWindowFocus: false,
- retry: false,
- staleTime: 0,
+ error,
+ files,
+ repoInfo,
+ durationMs,
+ isStreaming,
+ numMatches,
+ } = useStreamedSearch({
+ query: searchQuery,
+ matches: maxMatchCount,
+ contextLines: 3,
+ whole: false,
});
useEffect(() => {
@@ -102,38 +90,39 @@ export const SearchResultsPage = ({
])
}, [searchQuery, setSearchHistory]);
- useEffect(() => {
- if (!searchResponse) {
- return;
- }
+ // @todo: capture search stats on completion.
+ // useEffect(() => {
+ // if (!searchResponse) {
+ // return;
+ // }
- const fileLanguages = searchResponse.files?.map(file => file.language) || [];
+ // const fileLanguages = searchResponse.files?.map(file => file.language) || [];
- captureEvent("search_finished", {
- durationMs: searchResponse.totalClientSearchDurationMs,
- fileCount: searchResponse.stats.fileCount,
- matchCount: searchResponse.stats.totalMatchCount,
- actualMatchCount: searchResponse.stats.actualMatchCount,
- filesSkipped: searchResponse.stats.filesSkipped,
- contentBytesLoaded: searchResponse.stats.contentBytesLoaded,
- indexBytesLoaded: searchResponse.stats.indexBytesLoaded,
- crashes: searchResponse.stats.crashes,
- shardFilesConsidered: searchResponse.stats.shardFilesConsidered,
- filesConsidered: searchResponse.stats.filesConsidered,
- filesLoaded: searchResponse.stats.filesLoaded,
- shardsScanned: searchResponse.stats.shardsScanned,
- shardsSkipped: searchResponse.stats.shardsSkipped,
- shardsSkippedFilter: searchResponse.stats.shardsSkippedFilter,
- ngramMatches: searchResponse.stats.ngramMatches,
- ngramLookups: searchResponse.stats.ngramLookups,
- wait: searchResponse.stats.wait,
- matchTreeConstruction: searchResponse.stats.matchTreeConstruction,
- matchTreeSearch: searchResponse.stats.matchTreeSearch,
- regexpsConsidered: searchResponse.stats.regexpsConsidered,
- flushReason: searchResponse.stats.flushReason,
- fileLanguages,
- });
- }, [captureEvent, searchQuery, searchResponse]);
+ // captureEvent("search_finished", {
+ // durationMs: searchResponse.totalClientSearchDurationMs,
+ // fileCount: searchResponse.stats.fileCount,
+ // matchCount: searchResponse.stats.totalMatchCount,
+ // actualMatchCount: searchResponse.stats.actualMatchCount,
+ // filesSkipped: searchResponse.stats.filesSkipped,
+ // contentBytesLoaded: searchResponse.stats.contentBytesLoaded,
+ // indexBytesLoaded: searchResponse.stats.indexBytesLoaded,
+ // crashes: searchResponse.stats.crashes,
+ // shardFilesConsidered: searchResponse.stats.shardFilesConsidered,
+ // filesConsidered: searchResponse.stats.filesConsidered,
+ // filesLoaded: searchResponse.stats.filesLoaded,
+ // shardsScanned: searchResponse.stats.shardsScanned,
+ // shardsSkipped: searchResponse.stats.shardsSkipped,
+ // shardsSkippedFilter: searchResponse.stats.shardsSkippedFilter,
+ // ngramMatches: searchResponse.stats.ngramMatches,
+ // ngramLookups: searchResponse.stats.ngramLookups,
+ // wait: searchResponse.stats.wait,
+ // matchTreeConstruction: searchResponse.stats.matchTreeConstruction,
+ // matchTreeSearch: searchResponse.stats.matchTreeSearch,
+ // regexpsConsidered: searchResponse.stats.regexpsConsidered,
+ // flushReason: searchResponse.stats.flushReason,
+ // fileLanguages,
+ // });
+ // }, [captureEvent, searchQuery, searchResponse]);
const onLoadMoreResults = useCallback(() => {
@@ -157,12 +146,7 @@ export const SearchResultsPage = ({
/>
- {(isSearchPending || isFetching) ? (
-
- ) : error ? (
+ {error ? (
Failed to search
@@ -170,14 +154,18 @@ export const SearchResultsPage = ({
) : (
)}
@@ -186,10 +174,11 @@ export const SearchResultsPage = ({
interface PanelGroupProps {
fileMatches: SearchResultFile[];
- isMoreResultsButtonVisible?: boolean;
onLoadMoreResults: () => void;
+ isStreaming: boolean;
+ isMoreResultsButtonVisible?: boolean;
isBranchFilteringEnabled: boolean;
- repoInfo: RepositoryInfo[];
+ repoInfo: Record;
searchDurationMs: number;
numMatches: number;
searchStats?: SearchStats;
@@ -198,9 +187,10 @@ interface PanelGroupProps {
const PanelGroup = ({
fileMatches,
isMoreResultsButtonVisible,
+ isStreaming,
onLoadMoreResults,
isBranchFilteringEnabled,
- repoInfo: _repoInfo,
+ repoInfo,
searchDurationMs: _searchDurationMs,
numMatches,
searchStats,
@@ -228,13 +218,6 @@ const PanelGroup = ({
return Math.round(_searchDurationMs);
}, [_searchDurationMs]);
- const repoInfo = useMemo(() => {
- return _repoInfo.reduce((acc, repo) => {
- acc[repo.id] = repo;
- return acc;
- }, {} as Record);
- }, [_repoInfo]);
-
return (
-
-
-
-
-
-
-
-
Search stats for nerds
-
{
- navigator.clipboard.writeText(JSON.stringify(searchStats, null, 2));
- return true;
- }}
- className="ml-auto"
- />
-
-
- {JSON.stringify(searchStats, null, 2)}
-
-
-
- {
- fileMatches.length > 0 ? (
-
{`[${searchDurationMs} ms] Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}
- ) : (
-
No results
- )
- }
- {isMoreResultsButtonVisible && (
-
- (load more)
-
+ {isStreaming ? (
+ <>
+
+
Searching...
+ {numMatches > 0 && (
+
{`Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}
+ )}
+ >
+ ) : (
+ <>
+
+
+
+
+
+
+
+
Search stats for nerds
+
{
+ navigator.clipboard.writeText(JSON.stringify(searchStats, null, 2));
+ return true;
+ }}
+ className="ml-auto"
+ />
+
+
+ {JSON.stringify(searchStats, null, 2)}
+
+
+
+ {
+ fileMatches.length > 0 ? (
+
{`[${searchDurationMs} ms] Found ${numMatches} matches in ${fileMatches.length} ${fileMatches.length > 1 ? 'files' : 'file'}`}
+ ) : (
+
No results
+ )
+ }
+ {isMoreResultsButtonVisible && (
+
+ (load more)
+
+ )}
+ >
)}
{filteredFileMatches.length > 0 ? (
@@ -340,6 +335,11 @@ const PanelGroup = ({
isBranchFilteringEnabled={isBranchFilteringEnabled}
repoInfo={repoInfo}
/>
+ ) : isStreaming ? (
+
) : (
No results found
diff --git a/packages/web/src/app/[domain]/search/useStreamedSearch.ts b/packages/web/src/app/[domain]/search/useStreamedSearch.ts
new file mode 100644
index 00000000..2369346a
--- /dev/null
+++ b/packages/web/src/app/[domain]/search/useStreamedSearch.ts
@@ -0,0 +1,171 @@
+'use client';
+
+import { RepositoryInfo, SearchRequest, SearchResponse, SearchResultFile } from '@/features/search/types';
+import { useState, useCallback, useRef, useEffect } from 'react';
+
+
+export const useStreamedSearch = ({ query, matches, contextLines, whole }: SearchRequest) => {
+
+ const [state, setState] = useState<{
+ isStreaming: boolean,
+ error: Error | null,
+ files: SearchResultFile[],
+ repoInfo: Record
,
+ durationMs: number,
+ numMatches: number,
+ }>({
+ isStreaming: false,
+ error: null,
+ files: [],
+ repoInfo: {},
+ durationMs: 0,
+ numMatches: 0,
+ });
+
+ const abortControllerRef = useRef(null);
+
+ const cancel = useCallback(() => {
+ if (abortControllerRef.current) {
+ abortControllerRef.current.abort();
+ abortControllerRef.current = null;
+ }
+ setState(prev => ({
+ ...prev,
+ isStreaming: false,
+ }));
+ }, []);
+
+ useEffect(() => {
+ const search = async () => {
+ const startTime = performance.now();
+
+ if (abortControllerRef.current) {
+ abortControllerRef.current.abort();
+ }
+ abortControllerRef.current = new AbortController();
+
+ setState({
+ isStreaming: true,
+ error: null,
+ files: [],
+ repoInfo: {},
+ durationMs: 0,
+ numMatches: 0,
+ });
+
+ try {
+ const response = await fetch('/api/stream_search', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ query,
+ matches,
+ contextLines,
+ whole,
+ }),
+ signal: abortControllerRef.current.signal,
+ });
+
+ if (!response.ok) {
+ throw new Error(`HTTP error! status: ${response.status}`);
+ }
+
+ if (!response.body) {
+ throw new Error('No response body');
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = '';
+
+ while (true as boolean) {
+ const { done, value } = await reader.read();
+
+ if (done) {
+ break;
+ }
+
+ // Decode the chunk and add to buffer
+ buffer += decoder.decode(value, { stream: true });
+
+ // Process complete SSE messages (separated by \n\n)
+ const messages = buffer.split('\n\n');
+
+ // Keep the last element (potentially incomplete message) in the buffer for the next chunk.
+ // Stream chunks can split messages mid-way, so we only process complete messages.
+ buffer = messages.pop() || '';
+
+ for (const message of messages) {
+ if (!message.trim()) {
+ continue;
+ }
+
+ // SSE messages start with "data: "
+ const dataMatch = message.match(/^data: (.+)$/);
+ if (!dataMatch) continue;
+
+ const data = dataMatch[1];
+
+ // Check for completion signal
+ if (data === '[DONE]') {
+ setState(prev => ({ ...prev, isStreaming: false }));
+ return;
+ }
+
+ try {
+ const chunk: SearchResponse = JSON.parse(data);
+ setState(prev => ({
+ ...prev,
+ files: [
+ ...prev.files,
+ ...chunk.files
+ ],
+ repoInfo: {
+ ...prev.repoInfo,
+ ...chunk.repositoryInfo.reduce((acc, repo) => {
+ acc[repo.id] = repo;
+ return acc;
+ }, {} as Record),
+ },
+ numMatches: prev.numMatches + chunk.stats.actualMatchCount,
+ }));
+ } catch (parseError) {
+ console.error('Error parsing chunk:', parseError);
+ }
+ }
+ }
+
+ setState(prev => ({ ...prev, isStreaming: false }));
+ } catch (error) {
+ if ((error as Error).name === 'AbortError') {
+ console.log('Stream aborted');
+ } else {
+ setState(prev => ({
+ ...prev,
+ isStreaming: false,
+ error: error as Error,
+ }));
+ }
+ } finally {
+ const endTime = performance.now();
+ const durationMs = endTime - startTime;
+ setState(prev => ({
+ ...prev,
+ durationMs,
+ }));
+ }
+ }
+
+ search();
+
+ return () => {
+ }
+ }, [query, matches, contextLines, whole]);
+
+ return {
+ ...state,
+ cancel,
+ };
+}
\ No newline at end of file
diff --git a/packages/web/src/app/[domain]/stream_search/page.tsx b/packages/web/src/app/[domain]/stream_search/page.tsx
deleted file mode 100644
index 86c1e968..00000000
--- a/packages/web/src/app/[domain]/stream_search/page.tsx
+++ /dev/null
@@ -1,221 +0,0 @@
-'use client';
-
-import { useState } from 'react';
-import { useStreamingSearch } from './useStreamingSearch';
-import type { FileMatch__Output } from './types';
-import { Button } from '@/components/ui/button';
-import { Input } from '@/components/ui/input';
-import { Separator } from '@/components/ui/separator';
-import { Loader2 } from 'lucide-react';
-
-// @nocheckin
-export default function StreamSearchPage() {
- const [query, setQuery] = useState('useMemo');
- const [matches, setMatches] = useState(10000);
- const [contextLines, _setContextLines] = useState(5);
-
- const {
- chunks,
- isStreaming,
- totalFiles,
- totalMatches,
- error,
- streamSearch,
- cancel,
- reset
- } = useStreamingSearch();
-
- const handleSearch = () => {
- streamSearch({
- query,
- matches,
- contextLines,
- whole: false,
- });
- };
-
- return (
-
-
-
-
Streaming Search Demo
-
- Test the SSE streaming search API with real-time results
-
-
-
-
-
- {/* Search Controls */}
-
-
-
-
-
- {isStreaming && (
-
- )}
- {chunks.length > 0 && !isStreaming && (
-
- )}
-
-
-
-
-
- {/* Results Stats */}
- {(isStreaming || chunks.length > 0) && (
-
-
-
- Status:{' '}
- {isStreaming ? (
-
- 🔄 Streaming...
-
- ) : (
-
- ✓ Complete
-
- )}
-
-
- Chunks:{' '}
- {chunks.length}
-
-
- Files:{' '}
- {totalFiles}
-
-
- Matches:{' '}
- {totalMatches}
-
-
-
- )}
-
- {/* Error Display */}
- {error && (
-
-
- Error occurred:
-
-
- {error.message}
-
-
- )}
-
- {/* Results Display */}
- {chunks.length > 0 && (
-
-
- Results ({chunks.length} chunks)
-
-
-
- {chunks.map((chunk, i) => (
-
-
-
- Chunk {i + 1}
-
-
- {chunk.response_chunk?.files?.length || 0} files, {' '}
- {chunk.response_chunk?.stats?.match_count || 0} matches
-
-
-
- {chunk.response_chunk?.files && chunk.response_chunk.files.length > 0 && (
-
- {chunk.response_chunk.files.map((file: FileMatch__Output, j: number) => {
- // Decode file_name from Buffer to string
- const fileName = file.file_name
- ? Buffer.from(file.file_name).toString('utf-8')
- : 'Unknown file';
-
- return (
-
-
- 📄 {fileName}
-
- {file.repository && (
-
- {file.repository}
-
- )}
- {file.language && (
-
- Language: {file.language}
-
- )}
-
- );
- })}
-
- )}
-
- ))}
-
-
- )}
-
- {/* Empty State */}
- {!isStreaming && chunks.length === 0 && !error && (
-
-
Enter a search query and click “Search” to start streaming results
-
- )}
-
-
- );
-}
\ No newline at end of file
diff --git a/packages/web/src/app/[domain]/stream_search/types.ts b/packages/web/src/app/[domain]/stream_search/types.ts
deleted file mode 100644
index e9e4aaf3..00000000
--- a/packages/web/src/app/[domain]/stream_search/types.ts
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Types for streaming search functionality
- */
-
-import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
-import type { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse';
-import type { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch';
-import type { Stats__Output } from '@/proto/zoekt/webserver/v1/Stats';
-import type { ChunkMatch__Output } from '@/proto/zoekt/webserver/v1/ChunkMatch';
-import type { Progress__Output } from '@/proto/zoekt/webserver/v1/Progress';
-
-/**
- * A single chunk received from the streaming search API
- */
-export interface StreamSearchChunk {
- response_chunk?: SearchResponse__Output | null;
- error?: StreamSearchError;
-}
-
-/**
- * Error response from the streaming search
- */
-export interface StreamSearchError {
- code?: number;
- message: string;
-}
-
-/**
- * Parameters for initiating a streaming search
- */
-export interface StreamSearchParams {
- query: string;
- matches: number;
- contextLines?: number;
- whole?: boolean;
-}
-
-/**
- * State of the streaming search
- */
-export interface StreamingSearchState {
- chunks: StreamSearchChunk[];
- isStreaming: boolean;
- error: Error | null;
- totalFiles: number;
- totalMatches: number;
-}
-
-/**
- * Return type of the useStreamingSearch hook
- */
-export interface UseStreamingSearchReturn extends StreamingSearchState {
- streamSearch: (params: StreamSearchParams) => Promise;
- cancel: () => void;
- reset: () => void;
-}
-
-/**
- * Re-export proto types for convenience
- */
-export type {
- StreamSearchResponse__Output,
- SearchResponse__Output,
- FileMatch__Output,
- Stats__Output,
- ChunkMatch__Output,
- Progress__Output,
-};
-
diff --git a/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts b/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts
deleted file mode 100644
index d84eeaf0..00000000
--- a/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts
+++ /dev/null
@@ -1,148 +0,0 @@
-'use client';
-
-import { useState, useCallback, useRef } from 'react';
-import type {
- StreamSearchChunk,
- StreamSearchParams,
- StreamingSearchState,
- UseStreamingSearchReturn,
-} from './types';
-
-export function useStreamingSearch(): UseStreamingSearchReturn {
- const [state, setState] = useState({
- chunks: [],
- isStreaming: false,
- error: null,
- totalFiles: 0,
- totalMatches: 0,
- });
-
- const abortControllerRef = useRef(null);
-
- const streamSearch = useCallback(async (params: StreamSearchParams) => {
- // Cancel any existing stream
- if (abortControllerRef.current) {
- abortControllerRef.current.abort();
- }
-
- abortControllerRef.current = new AbortController();
-
- setState({
- chunks: [],
- isStreaming: true,
- error: null,
- totalFiles: 0,
- totalMatches: 0,
- });
-
- try {
- const response = await fetch('/api/stream_search', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- },
- body: JSON.stringify(params),
- signal: abortControllerRef.current.signal,
- });
-
- if (!response.ok) {
- throw new Error(`HTTP error! status: ${response.status}`);
- }
-
- if (!response.body) {
- throw new Error('No response body');
- }
-
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = '';
-
- while (true as boolean) {
- const { done, value } = await reader.read();
-
- if (done) break;
-
- // Decode the chunk and add to buffer
- buffer += decoder.decode(value, { stream: true });
-
- // Process complete SSE messages (separated by \n\n)
- const messages = buffer.split('\n\n');
- buffer = messages.pop() || ''; // Keep incomplete message in buffer
-
- for (const message of messages) {
- if (!message.trim()) continue;
-
- // SSE messages start with "data: "
- const dataMatch = message.match(/^data: (.+)$/);
- if (!dataMatch) continue;
-
- const data = dataMatch[1];
-
- // Check for completion signal
- if (data === '[DONE]') {
- setState(prev => ({ ...prev, isStreaming: false }));
- return;
- }
-
- // Parse the JSON chunk
- try {
- const chunk: StreamSearchChunk = JSON.parse(data);
-
- // Check for errors
- if (chunk.error) {
- throw new Error(chunk.error.message);
- }
-
- // Update state with new chunk
- setState(prev => ({
- ...prev,
- chunks: [...prev.chunks, chunk],
- totalFiles: prev.totalFiles + (chunk.response_chunk?.files?.length || 0),
- totalMatches: prev.totalMatches + (chunk.response_chunk?.stats?.match_count || 0),
- }));
- } catch (parseError) {
- console.error('Error parsing chunk:', parseError);
- }
- }
- }
-
- setState(prev => ({ ...prev, isStreaming: false }));
- } catch (error) {
- if ((error as Error).name === 'AbortError') {
- console.log('Stream aborted');
- } else {
- setState(prev => ({
- ...prev,
- isStreaming: false,
- error: error as Error,
- }));
- }
- }
- }, []);
-
- const cancel = useCallback(() => {
- if (abortControllerRef.current) {
- abortControllerRef.current.abort();
- abortControllerRef.current = null;
- }
- setState(prev => ({ ...prev, isStreaming: false }));
- }, []);
-
- const reset = useCallback(() => {
- cancel();
- setState({
- chunks: [],
- isStreaming: false,
- error: null,
- totalFiles: 0,
- totalMatches: 0,
- });
- }, [cancel]);
-
- return {
- ...state,
- streamSearch,
- cancel,
- reset,
- };
-}
\ No newline at end of file
diff --git a/packages/web/src/app/api/(server)/stream_search/route.ts b/packages/web/src/app/api/(server)/stream_search/route.ts
index 70a44e20..ac834d35 100644
--- a/packages/web/src/app/api/(server)/stream_search/route.ts
+++ b/packages/web/src/app/api/(server)/stream_search/route.ts
@@ -1,17 +1,23 @@
'use server';
-import { NextRequest } from 'next/server';
-import * as grpc from '@grpc/grpc-js';
-import * as protoLoader from '@grpc/proto-loader';
-import * as path from 'path';
+import { searchRequestSchema } from '@/features/search/schemas';
+import { SearchResponse, SourceRange } from '@/features/search/types';
+import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError';
+import { prisma } from '@/prisma';
import type { ProtoGrpcType } from '@/proto/webserver';
-import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
+import { Range__Output } from '@/proto/zoekt/webserver/v1/Range';
import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest';
import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest';
import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
-import { env } from '@sourcebot/shared';
-import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError';
-import { searchRequestSchema } from '@/features/search/schemas';
+import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
+import * as grpc from '@grpc/grpc-js';
+import * as protoLoader from '@grpc/proto-loader';
+import { PrismaClient, Repo } from '@sourcebot/db';
+import { createLogger, env } from '@sourcebot/shared';
+import { NextRequest } from 'next/server';
+import * as path from 'path';
+
+const logger = createLogger('streamSearchApi');
/**
* Create a gRPC client for the Zoekt webserver
@@ -54,18 +60,22 @@ export const POST = async (request: NextRequest) => {
// Parse and validate request body
const body = await request.json();
const parsed = await searchRequestSchema.safeParseAsync(body);
-
+
if (!parsed.success) {
return serviceErrorResponse(schemaValidationError(parsed.error));
}
+ const { query, matches, contextLines, whole } = parsed.data;
+
const searchRequest: SearchRequest = {
query: {
and: {
+ // @todo: we should use repo_ids to filter out repositories that the user
+ // has access to (if permission syncing is enabled!).
children: [
{
regexp: {
- regexp: parsed.data.query,
+ regexp: query,
case_sensitive: true,
}
}
@@ -74,79 +84,20 @@ export const POST = async (request: NextRequest) => {
},
opts: {
chunk_matches: true,
- num_context_lines: parsed.data.contextLines ?? 5,
- total_max_match_count: parsed.data.matches,
+ max_match_display_count: matches,
+ total_max_match_count: matches + 1,
+ num_context_lines: contextLines,
+ whole: !!whole,
+ shard_max_match_count: -1,
+ max_wall_time: {
+ seconds: 0,
+ }
},
};
- // Create ReadableStream for SSE
- const stream = new ReadableStream({
- async start(controller) {
- const client = createGrpcClient();
-
- try {
- const metadata = new grpc.Metadata();
- metadata.add('x-sourcegraph-tenant-id', '1');
+ // @nocheckin: this should be using the `prisma` instance from the auth context.
+ const stream = await createSSESearchStream(searchRequest, prisma);
- const streamRequest: StreamSearchRequest = {
- request: searchRequest,
- };
-
- const grpcStream = client.StreamSearch(streamRequest, metadata);
-
- // Handle incoming data chunks
- grpcStream.on('data', (chunk: StreamSearchResponse__Output) => {
- try {
- // SSE format: "data: {json}\n\n"
- const sseData = `data: ${JSON.stringify(chunk)}\n\n`;
- controller.enqueue(new TextEncoder().encode(sseData));
- } catch (error) {
- console.error('Error encoding chunk:', error);
- }
- });
-
- // Handle stream completion
- grpcStream.on('end', () => {
- // Send completion signal
- controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
- controller.close();
- client.close();
- });
-
- // Handle errors
- grpcStream.on('error', (error: grpc.ServiceError) => {
- console.error('gRPC stream error:', error);
-
- // Send error as SSE event
- const errorData = `data: ${JSON.stringify({
- error: {
- code: error.code,
- message: error.details || error.message,
- }
- })}\n\n`;
- controller.enqueue(new TextEncoder().encode(errorData));
-
- controller.close();
- client.close();
- });
- } catch (error) {
- console.error('Stream initialization error:', error);
-
- const errorMessage = error instanceof Error ? error.message : 'Unknown error';
- const errorData = `data: ${JSON.stringify({
- error: { message: errorMessage }
- })}\n\n`;
- controller.enqueue(new TextEncoder().encode(errorData));
-
- controller.close();
- client.close();
- }
- },
- cancel() {
- // Cleanup when client cancels the stream
- console.log('SSE stream cancelled by client');
- }
- });
// Return streaming response with SSE headers
return new Response(stream, {
@@ -171,4 +122,268 @@ export const POST = async (request: NextRequest) => {
}
);
}
-};
\ No newline at end of file
+};
+
+const createSSESearchStream = async (searchRequest: SearchRequest, prisma: PrismaClient): Promise => {
+ const client = createGrpcClient();
+ let grpcStream: ReturnType | null = null;
+ let isStreamActive = true;
+
+ return new ReadableStream({
+ async start(controller) {
+ try {
+ // @todo: we should just disable tenant enforcement for now.
+ const metadata = new grpc.Metadata();
+ metadata.add('x-sourcegraph-tenant-id', '1');
+
+ const streamRequest: StreamSearchRequest = {
+ request: searchRequest,
+ };
+
+ grpcStream = client.StreamSearch(streamRequest, metadata);
+
+ // @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
+ // which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
+ // metadata when transforming (potentially thousands) of file matches, we aggregate a unique
+ // set of repository ids* and map them to their corresponding Repo record.
+ //
+ // *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`?
+ // A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was
+ // always undefined. To make this a non-breaking change, we fallback to using the repository's name
+ // (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in
+ // practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo',
+ // 'gitea.com/org/repo', etc.).
+ //
+ // Note: When a repository is re-indexed (every hour) this ID will be populated.
+ // @see: https://github.com/sourcebot-dev/zoekt/pull/6
+ const repos = new Map();
+
+ // Handle incoming data chunks
+ grpcStream.on('data', async (chunk: StreamSearchResponse__Output) => {
+ console.log('chunk');
+
+ if (!isStreamActive) {
+ return;
+ }
+
+ // grpcStream.on doesn't actually await on our handler, so we need to
+ // explicitly pause the stream here to prevent the stream from completing
+ // prior to our asynchronous work being completed.
+ grpcStream?.pause();
+
+ try {
+ if (!chunk.response_chunk) {
+ logger.warn('No response chunk received');
+ return;
+ }
+
+ const files = (await Promise.all(chunk.response_chunk.files.map(async (file) => {
+ const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name);
+
+ const identifier = file.repository_id ?? file.repository;
+
+ // If the repository is not in the map, fetch it from the database.
+ if (!repos.has(identifier)) {
+ const repo = typeof identifier === 'number' ?
+ await prisma.repo.findUnique({
+ where: {
+ id: identifier,
+ },
+ }) :
+ await prisma.repo.findFirst({
+ where: {
+ name: identifier,
+ },
+ });
+
+ if (repo) {
+ repos.set(identifier, repo);
+ }
+ }
+
+
+ const repo = repos.get(identifier);
+
+ // This can happen if the user doesn't have access to the repository.
+ if (!repo) {
+ return undefined;
+ }
+
+ // @todo: address "file_name might not be a valid UTF-8 string" warning.
+ const fileName = file.file_name.toString('utf-8');
+
+ const convertRange = (range: Range__Output): SourceRange => ({
+ start: {
+ byteOffset: range.start?.byte_offset ?? 0,
+ column: range.start?.column ?? 0,
+ lineNumber: range.start?.line_number ?? 0,
+ },
+ end: {
+ byteOffset: range.end?.byte_offset ?? 0,
+ column: range.end?.column ?? 0,
+ lineNumber: range.end?.line_number ?? 0,
+ }
+ })
+
+ return {
+ fileName: {
+ text: fileName,
+ matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [],
+ },
+ repository: repo.name,
+ repositoryId: repo.id,
+ language: file.language,
+ // @todo: we will need to have a mechanism of forming the file's web url.
+ webUrl: '',
+ chunks: file.chunk_matches
+ .filter((chunk) => !chunk.file_name) // filter out filename chunks.
+ .map((chunk) => {
+ return {
+ content: chunk.content.toString('utf-8'),
+ matchRanges: chunk.ranges.map(convertRange),
+ contentStart: chunk.content_start ? {
+ byteOffset: chunk.content_start.byte_offset ?? 0,
+ column: chunk.content_start.column ?? 0,
+ lineNumber: chunk.content_start.line_number ?? 0,
+ // @nocheckin: Will need to figure out how to handle this case.
+ } : {
+ byteOffset: 0,
+ column: 0,
+ lineNumber: 0,
+ },
+ symbols: chunk.symbol_info.map((symbol) => {
+ return {
+ symbol: symbol.sym,
+ kind: symbol.kind,
+ parent: symbol.parent ? {
+ symbol: symbol.parent,
+ kind: symbol.parent_kind,
+ } : undefined,
+ }
+ })
+ }
+ }),
+ branches: file.branches,
+ content: file.content ? file.content.toString('utf-8') : undefined,
+ }
+ }))).filter(file => file !== undefined);
+
+ const actualMatchCount = files.reduce(
+ (acc, file) =>
+ // Match count is the sum of the number of chunk matches and file name matches.
+ acc + file.chunks.reduce(
+ (acc, chunk) => acc + chunk.matchRanges.length,
+ 0,
+ ) + file.fileName.matchRanges.length,
+ 0,
+ );
+
+ const response: SearchResponse = {
+ files,
+ repositoryInfo: Array.from(repos.values()).map((repo) => ({
+ id: repo.id,
+ codeHostType: repo.external_codeHostType,
+ name: repo.name,
+ displayName: repo.displayName ?? undefined,
+ webUrl: repo.webUrl ?? undefined,
+ })),
+ isBranchFilteringEnabled: false,
+ // @todo: we will need to figure out how to handle if a search is exhaustive or not
+ isSearchExhaustive: false,
+ stats: {
+ actualMatchCount,
+ // @todo: todo -
+ totalMatchCount: 0,
+ duration: chunk.response_chunk.stats?.duration?.nanos ?? 0,
+ fileCount: chunk.response_chunk.stats?.file_count.valueOf() ?? 0,
+ filesSkipped: chunk.response_chunk.stats?.files_skipped.valueOf() ?? 0,
+ contentBytesLoaded: chunk.response_chunk.stats?.content_bytes_loaded.valueOf() ?? 0,
+ indexBytesLoaded: chunk.response_chunk.stats?.index_bytes_loaded.valueOf() ?? 0,
+ crashes: chunk.response_chunk.stats?.crashes.valueOf() ?? 0,
+ shardFilesConsidered: chunk.response_chunk.stats?.shard_files_considered.valueOf() ?? 0,
+ filesConsidered: chunk.response_chunk.stats?.files_considered.valueOf() ?? 0,
+ filesLoaded: chunk.response_chunk.stats?.files_loaded.valueOf() ?? 0,
+ shardsScanned: chunk.response_chunk.stats?.shards_scanned.valueOf() ?? 0,
+ shardsSkipped: chunk.response_chunk.stats?.shards_skipped.valueOf() ?? 0,
+ shardsSkippedFilter: chunk.response_chunk.stats?.shards_skipped_filter.valueOf() ?? 0,
+ ngramMatches: chunk.response_chunk.stats?.ngram_matches.valueOf() ?? 0,
+ ngramLookups: chunk.response_chunk.stats?.ngram_lookups.valueOf() ?? 0,
+ wait: chunk.response_chunk.stats?.wait?.nanos ?? 0,
+ matchTreeConstruction: chunk.response_chunk.stats?.match_tree_construction?.nanos ?? 0,
+ matchTreeSearch: chunk.response_chunk.stats?.match_tree_search?.nanos ?? 0,
+ regexpsConsidered: chunk.response_chunk.stats?.regexps_considered.valueOf() ?? 0,
+ // @todo: handle this.
+ flushReason: 0,
+ }
+ }
+
+ const sseData = `data: ${JSON.stringify(response)}\n\n`;
+ controller.enqueue(new TextEncoder().encode(sseData));
+ } catch (error) {
+ console.error('Error encoding chunk:', error);
+ } finally {
+ grpcStream?.resume();
+ }
+ });
+
+ // Handle stream completion
+ grpcStream.on('end', () => {
+ if (!isStreamActive) {
+ return;
+ }
+ isStreamActive = false;
+
+ // Send completion signal
+ controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
+ controller.close();
+ console.log('SSE stream completed');
+ client.close();
+ });
+
+ // Handle errors
+ grpcStream.on('error', (error: grpc.ServiceError) => {
+ console.error('gRPC stream error:', error);
+
+ if (!isStreamActive) {
+ return;
+ }
+ isStreamActive = false;
+
+ // Send error as SSE event
+ const errorData = `data: ${JSON.stringify({
+ error: {
+ code: error.code,
+ message: error.details || error.message,
+ }
+ })}\n\n`;
+ controller.enqueue(new TextEncoder().encode(errorData));
+
+ controller.close();
+ client.close();
+ });
+ } catch (error) {
+ console.error('Stream initialization error:', error);
+
+ const errorMessage = error instanceof Error ? error.message : 'Unknown error';
+ const errorData = `data: ${JSON.stringify({
+ error: { message: errorMessage }
+ })}\n\n`;
+ controller.enqueue(new TextEncoder().encode(errorData));
+
+ controller.close();
+ client.close();
+ }
+ },
+ cancel() {
+ console.log('SSE stream cancelled by client');
+ isStreamActive = false;
+
+ // Cancel the gRPC stream to stop receiving data
+ if (grpcStream) {
+ grpcStream.cancel();
+ }
+
+ client.close();
+ }
+ });
+}
\ No newline at end of file
diff --git a/packages/web/src/features/search/searchApi.ts b/packages/web/src/features/search/searchApi.ts
index 1ca57ef4..ee703759 100644
--- a/packages/web/src/features/search/searchApi.ts
+++ b/packages/web/src/features/search/searchApi.ts
@@ -216,7 +216,7 @@ export const search = async ({ query, matches, contextLines, whole }: SearchRequ
return invalidZoektResponse(searchResponse);
}
- const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse) => {
+ const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse): Promise => {
// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
// metadata when transforming (potentially thousands) of file matches, we aggregate a unique