diff --git a/packages/web/src/app/[domain]/stream_search/page.tsx b/packages/web/src/app/[domain]/stream_search/page.tsx new file mode 100644 index 00000000..86c1e968 --- /dev/null +++ b/packages/web/src/app/[domain]/stream_search/page.tsx @@ -0,0 +1,221 @@ +'use client'; + +import { useState } from 'react'; +import { useStreamingSearch } from './useStreamingSearch'; +import type { FileMatch__Output } from './types'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { Separator } from '@/components/ui/separator'; +import { Loader2 } from 'lucide-react'; + +// @nocheckin +export default function StreamSearchPage() { + const [query, setQuery] = useState('useMemo'); + const [matches, setMatches] = useState(10000); + const [contextLines, _setContextLines] = useState(5); + + const { + chunks, + isStreaming, + totalFiles, + totalMatches, + error, + streamSearch, + cancel, + reset + } = useStreamingSearch(); + + const handleSearch = () => { + streamSearch({ + query, + matches, + contextLines, + whole: false, + }); + }; + + return ( +
+
+
+

Streaming Search Demo

+

+ Test the SSE streaming search API with real-time results +

+
+ + + + {/* Search Controls */} +
+
+
+ + setQuery(e.target.value)} + placeholder="Enter search query (e.g., useMemo, file:.tsx)" + onKeyDown={(e) => e.key === 'Enter' && handleSearch()} + disabled={isStreaming} + /> +
+
+ + setMatches(Number(e.target.value))} + placeholder="Max matches" + disabled={isStreaming} + /> +
+
+ +
+ + {isStreaming && ( + + )} + {chunks.length > 0 && !isStreaming && ( + + )} +
+
+ + + + {/* Results Stats */} + {(isStreaming || chunks.length > 0) && ( +
+
+
+ Status:{' '} + {isStreaming ? ( + + 🔄 Streaming... + + ) : ( + + ✓ Complete + + )} +
+
+ Chunks:{' '} + {chunks.length} +
+
+ Files:{' '} + {totalFiles} +
+
+ Matches:{' '} + {totalMatches} +
+
+
+ )} + + {/* Error Display */} + {error && ( +
+
+ Error occurred: +
+
+ {error.message} +
+
+ )} + + {/* Results Display */} + {chunks.length > 0 && ( +
+

+ Results ({chunks.length} chunks) +

+ +
+ {chunks.map((chunk, i) => ( +
+
+
+ Chunk {i + 1} +
+
+ {chunk.response_chunk?.files?.length || 0} files, {' '} + {chunk.response_chunk?.stats?.match_count || 0} matches +
+
+ + {chunk.response_chunk?.files && chunk.response_chunk.files.length > 0 && ( +
+ {chunk.response_chunk.files.map((file: FileMatch__Output, j: number) => { + // Decode file_name from Buffer to string + const fileName = file.file_name + ? Buffer.from(file.file_name).toString('utf-8') + : 'Unknown file'; + + return ( +
+
+ 📄 {fileName} +
+ {file.repository && ( +
+ {file.repository} +
+ )} + {file.language && ( +
+ Language: {file.language} +
+ )} +
+ ); + })} +
+ )} +
+ ))} +
+
+ )} + + {/* Empty State */} + {!isStreaming && chunks.length === 0 && !error && ( +
+

Enter a search query and click “Search” to start streaming results

+
+ )} +
+
+ ); +} \ No newline at end of file diff --git a/packages/web/src/app/[domain]/stream_search/types.ts b/packages/web/src/app/[domain]/stream_search/types.ts new file mode 100644 index 00000000..e9e4aaf3 --- /dev/null +++ b/packages/web/src/app/[domain]/stream_search/types.ts @@ -0,0 +1,69 @@ +/** + * Types for streaming search functionality + */ + +import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse'; +import type { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse'; +import type { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch'; +import type { Stats__Output } from '@/proto/zoekt/webserver/v1/Stats'; +import type { ChunkMatch__Output } from '@/proto/zoekt/webserver/v1/ChunkMatch'; +import type { Progress__Output } from '@/proto/zoekt/webserver/v1/Progress'; + +/** + * A single chunk received from the streaming search API + */ +export interface StreamSearchChunk { + response_chunk?: SearchResponse__Output | null; + error?: StreamSearchError; +} + +/** + * Error response from the streaming search + */ +export interface StreamSearchError { + code?: number; + message: string; +} + +/** + * Parameters for initiating a streaming search + */ +export interface StreamSearchParams { + query: string; + matches: number; + contextLines?: number; + whole?: boolean; +} + +/** + * State of the streaming search + */ +export interface StreamingSearchState { + chunks: StreamSearchChunk[]; + isStreaming: boolean; + error: Error | null; + totalFiles: number; + totalMatches: number; +} + +/** + * Return type of the useStreamingSearch hook + */ +export interface UseStreamingSearchReturn extends StreamingSearchState { + streamSearch: (params: StreamSearchParams) => Promise; + cancel: () => void; + reset: () => void; +} + +/** + * Re-export proto types for convenience + */ +export type { + StreamSearchResponse__Output, + SearchResponse__Output, + FileMatch__Output, + Stats__Output, + ChunkMatch__Output, + Progress__Output, +}; + diff --git a/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts b/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts new file mode 100644 index 00000000..d84eeaf0 --- /dev/null +++ b/packages/web/src/app/[domain]/stream_search/useStreamingSearch.ts @@ -0,0 +1,148 @@ +'use client'; + +import { useState, useCallback, useRef } from 'react'; +import type { + StreamSearchChunk, + StreamSearchParams, + StreamingSearchState, + UseStreamingSearchReturn, +} from './types'; + +export function useStreamingSearch(): UseStreamingSearchReturn { + const [state, setState] = useState({ + chunks: [], + isStreaming: false, + error: null, + totalFiles: 0, + totalMatches: 0, + }); + + const abortControllerRef = useRef(null); + + const streamSearch = useCallback(async (params: StreamSearchParams) => { + // Cancel any existing stream + if (abortControllerRef.current) { + abortControllerRef.current.abort(); + } + + abortControllerRef.current = new AbortController(); + + setState({ + chunks: [], + isStreaming: true, + error: null, + totalFiles: 0, + totalMatches: 0, + }); + + try { + const response = await fetch('/api/stream_search', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(params), + signal: abortControllerRef.current.signal, + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + if (!response.body) { + throw new Error('No response body'); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true as boolean) { + const { done, value } = await reader.read(); + + if (done) break; + + // Decode the chunk and add to buffer + buffer += decoder.decode(value, { stream: true }); + + // Process complete SSE messages (separated by \n\n) + const messages = buffer.split('\n\n'); + buffer = messages.pop() || ''; // Keep incomplete message in buffer + + for (const message of messages) { + if (!message.trim()) continue; + + // SSE messages start with "data: " + const dataMatch = message.match(/^data: (.+)$/); + if (!dataMatch) continue; + + const data = dataMatch[1]; + + // Check for completion signal + if (data === '[DONE]') { + setState(prev => ({ ...prev, isStreaming: false })); + return; + } + + // Parse the JSON chunk + try { + const chunk: StreamSearchChunk = JSON.parse(data); + + // Check for errors + if (chunk.error) { + throw new Error(chunk.error.message); + } + + // Update state with new chunk + setState(prev => ({ + ...prev, + chunks: [...prev.chunks, chunk], + totalFiles: prev.totalFiles + (chunk.response_chunk?.files?.length || 0), + totalMatches: prev.totalMatches + (chunk.response_chunk?.stats?.match_count || 0), + })); + } catch (parseError) { + console.error('Error parsing chunk:', parseError); + } + } + } + + setState(prev => ({ ...prev, isStreaming: false })); + } catch (error) { + if ((error as Error).name === 'AbortError') { + console.log('Stream aborted'); + } else { + setState(prev => ({ + ...prev, + isStreaming: false, + error: error as Error, + })); + } + } + }, []); + + const cancel = useCallback(() => { + if (abortControllerRef.current) { + abortControllerRef.current.abort(); + abortControllerRef.current = null; + } + setState(prev => ({ ...prev, isStreaming: false })); + }, []); + + const reset = useCallback(() => { + cancel(); + setState({ + chunks: [], + isStreaming: false, + error: null, + totalFiles: 0, + totalMatches: 0, + }); + }, [cancel]); + + return { + ...state, + streamSearch, + cancel, + reset, + }; +} \ No newline at end of file diff --git a/packages/web/src/app/api/(server)/stream_search/route.ts b/packages/web/src/app/api/(server)/stream_search/route.ts new file mode 100644 index 00000000..70a44e20 --- /dev/null +++ b/packages/web/src/app/api/(server)/stream_search/route.ts @@ -0,0 +1,174 @@ +'use server'; + +import { NextRequest } from 'next/server'; +import * as grpc from '@grpc/grpc-js'; +import * as protoLoader from '@grpc/proto-loader'; +import * as path from 'path'; +import type { ProtoGrpcType } from '@/proto/webserver'; +import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService'; +import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest'; +import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest'; +import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse'; +import { env } from '@sourcebot/shared'; +import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError'; +import { searchRequestSchema } from '@/features/search/schemas'; + +/** + * Create a gRPC client for the Zoekt webserver + */ +function createGrpcClient(): WebserverServiceClient { + // Path to proto files - these should match your monorepo structure + const protoBasePath = path.join(process.cwd(), '../../vendor/zoekt/grpc/protos'); + const protoPath = path.join(protoBasePath, 'zoekt/webserver/v1/webserver.proto'); + + const packageDefinition = protoLoader.loadSync(protoPath, { + keepCase: true, + longs: Number, + enums: String, + defaults: true, + oneofs: true, + includeDirs: [protoBasePath], + }); + + const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; + + // Extract host and port from ZOEKT_WEBSERVER_URL + const zoektUrl = new URL(env.ZOEKT_WEBSERVER_URL); + const grpcAddress = `${zoektUrl.hostname}:${zoektUrl.port}`; + + return new proto.zoekt.webserver.v1.WebserverService( + grpcAddress, + grpc.credentials.createInsecure(), + { + 'grpc.max_receive_message_length': 500 * 1024 * 1024, // 500MB + 'grpc.max_send_message_length': 500 * 1024 * 1024, // 500MB + } + ); +} + +/** + * POST handler for streaming search via SSE + */ +export const POST = async (request: NextRequest) => { + try { + // Parse and validate request body + const body = await request.json(); + const parsed = await searchRequestSchema.safeParseAsync(body); + + if (!parsed.success) { + return serviceErrorResponse(schemaValidationError(parsed.error)); + } + + const searchRequest: SearchRequest = { + query: { + and: { + children: [ + { + regexp: { + regexp: parsed.data.query, + case_sensitive: true, + } + } + ] + } + }, + opts: { + chunk_matches: true, + num_context_lines: parsed.data.contextLines ?? 5, + total_max_match_count: parsed.data.matches, + }, + }; + + // Create ReadableStream for SSE + const stream = new ReadableStream({ + async start(controller) { + const client = createGrpcClient(); + + try { + const metadata = new grpc.Metadata(); + metadata.add('x-sourcegraph-tenant-id', '1'); + + const streamRequest: StreamSearchRequest = { + request: searchRequest, + }; + + const grpcStream = client.StreamSearch(streamRequest, metadata); + + // Handle incoming data chunks + grpcStream.on('data', (chunk: StreamSearchResponse__Output) => { + try { + // SSE format: "data: {json}\n\n" + const sseData = `data: ${JSON.stringify(chunk)}\n\n`; + controller.enqueue(new TextEncoder().encode(sseData)); + } catch (error) { + console.error('Error encoding chunk:', error); + } + }); + + // Handle stream completion + grpcStream.on('end', () => { + // Send completion signal + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); + controller.close(); + client.close(); + }); + + // Handle errors + grpcStream.on('error', (error: grpc.ServiceError) => { + console.error('gRPC stream error:', error); + + // Send error as SSE event + const errorData = `data: ${JSON.stringify({ + error: { + code: error.code, + message: error.details || error.message, + } + })}\n\n`; + controller.enqueue(new TextEncoder().encode(errorData)); + + controller.close(); + client.close(); + }); + } catch (error) { + console.error('Stream initialization error:', error); + + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + const errorData = `data: ${JSON.stringify({ + error: { message: errorMessage } + })}\n\n`; + controller.enqueue(new TextEncoder().encode(errorData)); + + controller.close(); + client.close(); + } + }, + cancel() { + // Cleanup when client cancels the stream + console.log('SSE stream cancelled by client'); + } + }); + + // Return streaming response with SSE headers + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', // Disable nginx buffering if applicable + }, + }); + } catch (error) { + console.error('Request handling error:', error); + return new Response( + JSON.stringify({ + error: { + message: error instanceof Error ? error.message : 'Internal server error' + } + }), + { + status: 500, + headers: { 'Content-Type': 'application/json' }, + } + ); + } +}; \ No newline at end of file