stream poc over SSE

This commit is contained in:
bkellam 2025-11-13 21:18:08 -08:00
parent a040ee1e07
commit cca3d30b4a
4 changed files with 612 additions and 0 deletions

View file

@ -0,0 +1,221 @@
'use client';
import { useState } from 'react';
import { useStreamingSearch } from './useStreamingSearch';
import type { FileMatch__Output } from './types';
import { Button } from '@/components/ui/button';
import { Input } from '@/components/ui/input';
import { Separator } from '@/components/ui/separator';
import { Loader2 } from 'lucide-react';
// @nocheckin
export default function StreamSearchPage() {
const [query, setQuery] = useState('useMemo');
const [matches, setMatches] = useState(10000);
const [contextLines, _setContextLines] = useState(5);
const {
chunks,
isStreaming,
totalFiles,
totalMatches,
error,
streamSearch,
cancel,
reset
} = useStreamingSearch();
const handleSearch = () => {
streamSearch({
query,
matches,
contextLines,
whole: false,
});
};
return (
<div className="container mx-auto p-6 max-w-6xl">
<div className="space-y-6">
<div>
<h1 className="text-3xl font-bold mb-2">Streaming Search Demo</h1>
<p className="text-muted-foreground">
Test the SSE streaming search API with real-time results
</p>
</div>
<Separator />
{/* Search Controls */}
<div className="space-y-4">
<div className="grid grid-cols-1 md:grid-cols-3 gap-4">
<div className="md:col-span-2">
<label className="text-sm font-medium mb-1.5 block">
Search Query
</label>
<Input
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Enter search query (e.g., useMemo, file:.tsx)"
onKeyDown={(e) => e.key === 'Enter' && handleSearch()}
disabled={isStreaming}
/>
</div>
<div>
<label className="text-sm font-medium mb-1.5 block">
Max Matches
</label>
<Input
type="number"
value={matches}
onChange={(e) => setMatches(Number(e.target.value))}
placeholder="Max matches"
disabled={isStreaming}
/>
</div>
</div>
<div className="flex gap-2">
<Button
onClick={handleSearch}
disabled={isStreaming || !query}
className="w-32"
>
{isStreaming ? (
<>
<Loader2 className="w-4 h-4 mr-2 animate-spin" />
Searching
</>
) : (
'Search'
)}
</Button>
{isStreaming && (
<Button onClick={cancel} variant="destructive">
Cancel
</Button>
)}
{chunks.length > 0 && !isStreaming && (
<Button onClick={reset} variant="outline">
Clear Results
</Button>
)}
</div>
</div>
<Separator />
{/* Results Stats */}
{(isStreaming || chunks.length > 0) && (
<div className="bg-muted/50 rounded-lg p-4">
<div className="flex items-center gap-6 text-sm">
<div>
<span className="font-semibold">Status:</span>{' '}
{isStreaming ? (
<span className="text-blue-600 dark:text-blue-400">
🔄 Streaming...
</span>
) : (
<span className="text-green-600 dark:text-green-400">
Complete
</span>
)}
</div>
<div>
<span className="font-semibold">Chunks:</span>{' '}
{chunks.length}
</div>
<div>
<span className="font-semibold">Files:</span>{' '}
{totalFiles}
</div>
<div>
<span className="font-semibold">Matches:</span>{' '}
{totalMatches}
</div>
</div>
</div>
)}
{/* Error Display */}
{error && (
<div className="bg-destructive/10 border border-destructive/30 rounded-lg p-4">
<div className="font-semibold text-destructive mb-1">
Error occurred:
</div>
<div className="text-sm text-destructive/80">
{error.message}
</div>
</div>
)}
{/* Results Display */}
{chunks.length > 0 && (
<div className="space-y-4">
<h2 className="text-xl font-semibold">
Results ({chunks.length} chunks)
</h2>
<div className="space-y-3">
{chunks.map((chunk, i) => (
<div
key={i}
className="border rounded-lg p-4 bg-card"
>
<div className="flex items-center justify-between mb-3">
<div className="text-sm font-medium text-muted-foreground">
Chunk {i + 1}
</div>
<div className="text-sm text-muted-foreground">
{chunk.response_chunk?.files?.length || 0} files, {' '}
{chunk.response_chunk?.stats?.match_count || 0} matches
</div>
</div>
{chunk.response_chunk?.files && chunk.response_chunk.files.length > 0 && (
<div className="space-y-2">
{chunk.response_chunk.files.map((file: FileMatch__Output, j: number) => {
// Decode file_name from Buffer to string
const fileName = file.file_name
? Buffer.from(file.file_name).toString('utf-8')
: 'Unknown file';
return (
<div
key={j}
className="text-sm pl-4 border-l-2 border-muted-foreground/20 py-1"
>
<div className="font-mono">
📄 {fileName}
</div>
{file.repository && (
<div className="text-xs text-muted-foreground mt-0.5">
{file.repository}
</div>
)}
{file.language && (
<div className="text-xs text-muted-foreground">
Language: {file.language}
</div>
)}
</div>
);
})}
</div>
)}
</div>
))}
</div>
</div>
)}
{/* Empty State */}
{!isStreaming && chunks.length === 0 && !error && (
<div className="text-center py-12 text-muted-foreground">
<p>Enter a search query and click &ldquo;Search&rdquo; to start streaming results</p>
</div>
)}
</div>
</div>
);
}

View file

@ -0,0 +1,69 @@
/**
* Types for streaming search functionality
*/
import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
import type { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse';
import type { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch';
import type { Stats__Output } from '@/proto/zoekt/webserver/v1/Stats';
import type { ChunkMatch__Output } from '@/proto/zoekt/webserver/v1/ChunkMatch';
import type { Progress__Output } from '@/proto/zoekt/webserver/v1/Progress';
/**
* A single chunk received from the streaming search API
*/
export interface StreamSearchChunk {
response_chunk?: SearchResponse__Output | null;
error?: StreamSearchError;
}
/**
* Error response from the streaming search
*/
export interface StreamSearchError {
code?: number;
message: string;
}
/**
* Parameters for initiating a streaming search
*/
export interface StreamSearchParams {
query: string;
matches: number;
contextLines?: number;
whole?: boolean;
}
/**
* State of the streaming search
*/
export interface StreamingSearchState {
chunks: StreamSearchChunk[];
isStreaming: boolean;
error: Error | null;
totalFiles: number;
totalMatches: number;
}
/**
* Return type of the useStreamingSearch hook
*/
export interface UseStreamingSearchReturn extends StreamingSearchState {
streamSearch: (params: StreamSearchParams) => Promise<void>;
cancel: () => void;
reset: () => void;
}
/**
* Re-export proto types for convenience
*/
export type {
StreamSearchResponse__Output,
SearchResponse__Output,
FileMatch__Output,
Stats__Output,
ChunkMatch__Output,
Progress__Output,
};

View file

@ -0,0 +1,148 @@
'use client';
import { useState, useCallback, useRef } from 'react';
import type {
StreamSearchChunk,
StreamSearchParams,
StreamingSearchState,
UseStreamingSearchReturn,
} from './types';
export function useStreamingSearch(): UseStreamingSearchReturn {
const [state, setState] = useState<StreamingSearchState>({
chunks: [],
isStreaming: false,
error: null,
totalFiles: 0,
totalMatches: 0,
});
const abortControllerRef = useRef<AbortController | null>(null);
const streamSearch = useCallback(async (params: StreamSearchParams) => {
// Cancel any existing stream
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
abortControllerRef.current = new AbortController();
setState({
chunks: [],
isStreaming: true,
error: null,
totalFiles: 0,
totalMatches: 0,
});
try {
const response = await fetch('/api/stream_search', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(params),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
if (!response.body) {
throw new Error('No response body');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true as boolean) {
const { done, value } = await reader.read();
if (done) break;
// Decode the chunk and add to buffer
buffer += decoder.decode(value, { stream: true });
// Process complete SSE messages (separated by \n\n)
const messages = buffer.split('\n\n');
buffer = messages.pop() || ''; // Keep incomplete message in buffer
for (const message of messages) {
if (!message.trim()) continue;
// SSE messages start with "data: "
const dataMatch = message.match(/^data: (.+)$/);
if (!dataMatch) continue;
const data = dataMatch[1];
// Check for completion signal
if (data === '[DONE]') {
setState(prev => ({ ...prev, isStreaming: false }));
return;
}
// Parse the JSON chunk
try {
const chunk: StreamSearchChunk = JSON.parse(data);
// Check for errors
if (chunk.error) {
throw new Error(chunk.error.message);
}
// Update state with new chunk
setState(prev => ({
...prev,
chunks: [...prev.chunks, chunk],
totalFiles: prev.totalFiles + (chunk.response_chunk?.files?.length || 0),
totalMatches: prev.totalMatches + (chunk.response_chunk?.stats?.match_count || 0),
}));
} catch (parseError) {
console.error('Error parsing chunk:', parseError);
}
}
}
setState(prev => ({ ...prev, isStreaming: false }));
} catch (error) {
if ((error as Error).name === 'AbortError') {
console.log('Stream aborted');
} else {
setState(prev => ({
...prev,
isStreaming: false,
error: error as Error,
}));
}
}
}, []);
const cancel = useCallback(() => {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
setState(prev => ({ ...prev, isStreaming: false }));
}, []);
const reset = useCallback(() => {
cancel();
setState({
chunks: [],
isStreaming: false,
error: null,
totalFiles: 0,
totalMatches: 0,
});
}, [cancel]);
return {
...state,
streamSearch,
cancel,
reset,
};
}

View file

@ -0,0 +1,174 @@
'use server';
import { NextRequest } from 'next/server';
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import * as path from 'path';
import type { ProtoGrpcType } from '@/proto/webserver';
import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest';
import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest';
import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
import { env } from '@sourcebot/shared';
import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError';
import { searchRequestSchema } from '@/features/search/schemas';
/**
* Create a gRPC client for the Zoekt webserver
*/
function createGrpcClient(): WebserverServiceClient {
// Path to proto files - these should match your monorepo structure
const protoBasePath = path.join(process.cwd(), '../../vendor/zoekt/grpc/protos');
const protoPath = path.join(protoBasePath, 'zoekt/webserver/v1/webserver.proto');
const packageDefinition = protoLoader.loadSync(protoPath, {
keepCase: true,
longs: Number,
enums: String,
defaults: true,
oneofs: true,
includeDirs: [protoBasePath],
});
const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType;
// Extract host and port from ZOEKT_WEBSERVER_URL
const zoektUrl = new URL(env.ZOEKT_WEBSERVER_URL);
const grpcAddress = `${zoektUrl.hostname}:${zoektUrl.port}`;
return new proto.zoekt.webserver.v1.WebserverService(
grpcAddress,
grpc.credentials.createInsecure(),
{
'grpc.max_receive_message_length': 500 * 1024 * 1024, // 500MB
'grpc.max_send_message_length': 500 * 1024 * 1024, // 500MB
}
);
}
/**
* POST handler for streaming search via SSE
*/
export const POST = async (request: NextRequest) => {
try {
// Parse and validate request body
const body = await request.json();
const parsed = await searchRequestSchema.safeParseAsync(body);
if (!parsed.success) {
return serviceErrorResponse(schemaValidationError(parsed.error));
}
const searchRequest: SearchRequest = {
query: {
and: {
children: [
{
regexp: {
regexp: parsed.data.query,
case_sensitive: true,
}
}
]
}
},
opts: {
chunk_matches: true,
num_context_lines: parsed.data.contextLines ?? 5,
total_max_match_count: parsed.data.matches,
},
};
// Create ReadableStream for SSE
const stream = new ReadableStream({
async start(controller) {
const client = createGrpcClient();
try {
const metadata = new grpc.Metadata();
metadata.add('x-sourcegraph-tenant-id', '1');
const streamRequest: StreamSearchRequest = {
request: searchRequest,
};
const grpcStream = client.StreamSearch(streamRequest, metadata);
// Handle incoming data chunks
grpcStream.on('data', (chunk: StreamSearchResponse__Output) => {
try {
// SSE format: "data: {json}\n\n"
const sseData = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(new TextEncoder().encode(sseData));
} catch (error) {
console.error('Error encoding chunk:', error);
}
});
// Handle stream completion
grpcStream.on('end', () => {
// Send completion signal
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
controller.close();
client.close();
});
// Handle errors
grpcStream.on('error', (error: grpc.ServiceError) => {
console.error('gRPC stream error:', error);
// Send error as SSE event
const errorData = `data: ${JSON.stringify({
error: {
code: error.code,
message: error.details || error.message,
}
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close();
client.close();
});
} catch (error) {
console.error('Stream initialization error:', error);
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
const errorData = `data: ${JSON.stringify({
error: { message: errorMessage }
})}\n\n`;
controller.enqueue(new TextEncoder().encode(errorData));
controller.close();
client.close();
}
},
cancel() {
// Cleanup when client cancels the stream
console.log('SSE stream cancelled by client');
}
});
// Return streaming response with SSE headers
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // Disable nginx buffering if applicable
},
});
} catch (error) {
console.error('Request handling error:', error);
return new Response(
JSON.stringify({
error: {
message: error instanceof Error ? error.message : 'Internal server error'
}
}),
{
status: 500,
headers: { 'Content-Type': 'application/json' },
}
);
}
};