mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-11 20:05:25 +00:00
Some checks failed
Publish to ghcr / build (linux/amd64, blacksmith-4vcpu-ubuntu-2404) (push) Has been cancelled
Publish to ghcr / build (linux/arm64, blacksmith-8vcpu-ubuntu-2204-arm) (push) Has been cancelled
Update Roadmap Released / update (push) Has been cancelled
Publish to ghcr / merge (push) Has been cancelled
* generate protobuf types * stream poc over SSE * wip: make stream search api follow existing schema. Modify UI to support streaming * fix scrolling issue * Dockerfile * wip on lezer parser grammar for query language * add lezer tree -> grpc transformer * remove spammy log message * fix syntax highlighting by adding a module resolution for @lezer/common * further wip on query language * Add case sensitivity and regexp toggles * Improved type safety / cleanup for query lang * support search contexts * update Dockerfile with query langauge package * fix filter * Add skeletons to filter panel when search is streaming * add client side caching * improved cancelation handling * add isSearchExausted flag for flagging when a search captured all results * Add back posthog search_finished event * remove zoekt tenant enforcement * migrate blocking search over to grpc. Centralize everything in searchApi * branch handling * plumb file weburl * add repo_sets filter for repositories a user has access to * refactor a bunch of stuff + add support for passing in Query IR to search api * refactor * dev README * wip on better error handling * error handling for stream path * update mcp * changelog wip * type fix * style * Support rev:* wildcard * changelog * changelog nit * feedback * fix build * update docs and remove uneeded test file
288 lines
No EOL
10 KiB
TypeScript
288 lines
No EOL
10 KiB
TypeScript
'use client';
|
|
|
|
import { RepositoryInfo, SearchRequest, SearchResultFile, SearchStats, StreamedSearchResponse } from '@/features/search';
|
|
import { ServiceErrorException } from '@/lib/serviceError';
|
|
import { isServiceError } from '@/lib/utils';
|
|
import * as Sentry from '@sentry/nextjs';
|
|
import { useCallback, useEffect, useRef, useState } from 'react';
|
|
|
|
interface CacheEntry {
|
|
files: SearchResultFile[];
|
|
repoInfo: Record<number, RepositoryInfo>;
|
|
numMatches: number;
|
|
timeToSearchCompletionMs: number;
|
|
timeToFirstSearchResultMs: number;
|
|
timestamp: number;
|
|
isExhaustive: boolean;
|
|
}
|
|
|
|
const searchCache = new Map<string, CacheEntry>();
|
|
const CACHE_TTL = 5 * 60 * 1000;
|
|
|
|
const createCacheKey = (params: SearchRequest): string => {
|
|
return JSON.stringify({
|
|
query: params.query,
|
|
matches: params.matches,
|
|
contextLines: params.contextLines,
|
|
whole: params.whole,
|
|
isRegexEnabled: params.isRegexEnabled,
|
|
isCaseSensitivityEnabled: params.isCaseSensitivityEnabled,
|
|
});
|
|
};
|
|
|
|
const isCacheValid = (entry: CacheEntry): boolean => {
|
|
return Date.now() - entry.timestamp < CACHE_TTL;
|
|
};
|
|
|
|
export const useStreamedSearch = ({ query, matches, contextLines, whole, isRegexEnabled, isCaseSensitivityEnabled }: SearchRequest) => {
|
|
const [state, setState] = useState<{
|
|
isStreaming: boolean,
|
|
isExhaustive: boolean,
|
|
error: Error | null,
|
|
files: SearchResultFile[],
|
|
repoInfo: Record<number, RepositoryInfo>,
|
|
timeToSearchCompletionMs: number,
|
|
timeToFirstSearchResultMs: number,
|
|
numMatches: number,
|
|
stats?: SearchStats,
|
|
}>({
|
|
isStreaming: false,
|
|
isExhaustive: false,
|
|
error: null,
|
|
files: [],
|
|
repoInfo: {},
|
|
timeToSearchCompletionMs: 0,
|
|
timeToFirstSearchResultMs: 0,
|
|
numMatches: 0,
|
|
stats: undefined,
|
|
});
|
|
|
|
const abortControllerRef = useRef<AbortController | null>(null);
|
|
|
|
const cancel = useCallback(() => {
|
|
if (abortControllerRef.current) {
|
|
abortControllerRef.current.abort();
|
|
abortControllerRef.current = null;
|
|
}
|
|
setState(prev => ({
|
|
...prev,
|
|
isStreaming: false,
|
|
}));
|
|
}, []);
|
|
|
|
useEffect(() => {
|
|
const search = async () => {
|
|
const startTime = performance.now();
|
|
|
|
if (abortControllerRef.current) {
|
|
abortControllerRef.current.abort();
|
|
}
|
|
abortControllerRef.current = new AbortController();
|
|
|
|
const cacheKey = createCacheKey({
|
|
query,
|
|
matches,
|
|
contextLines,
|
|
whole,
|
|
isRegexEnabled,
|
|
isCaseSensitivityEnabled,
|
|
});
|
|
|
|
// Check if we have a valid cached result. If so, use it.
|
|
const cachedEntry = searchCache.get(cacheKey);
|
|
if (cachedEntry && isCacheValid(cachedEntry)) {
|
|
console.debug('Using cached search results');
|
|
setState({
|
|
isStreaming: false,
|
|
isExhaustive: cachedEntry.isExhaustive,
|
|
error: null,
|
|
files: cachedEntry.files,
|
|
repoInfo: cachedEntry.repoInfo,
|
|
timeToSearchCompletionMs: cachedEntry.timeToSearchCompletionMs,
|
|
timeToFirstSearchResultMs: cachedEntry.timeToFirstSearchResultMs,
|
|
numMatches: cachedEntry.numMatches,
|
|
});
|
|
return;
|
|
}
|
|
|
|
setState({
|
|
isStreaming: true,
|
|
isExhaustive: false,
|
|
error: null,
|
|
files: [],
|
|
repoInfo: {},
|
|
timeToSearchCompletionMs: 0,
|
|
timeToFirstSearchResultMs: 0,
|
|
numMatches: 0,
|
|
});
|
|
|
|
try {
|
|
const response = await fetch('/api/stream_search', {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
},
|
|
body: JSON.stringify({
|
|
query,
|
|
matches,
|
|
contextLines,
|
|
whole,
|
|
isRegexEnabled,
|
|
isCaseSensitivityEnabled,
|
|
}),
|
|
signal: abortControllerRef.current.signal,
|
|
});
|
|
|
|
if (!response.ok) {
|
|
// Check if this is a service error response
|
|
const contentType = response.headers.get('content-type');
|
|
if (contentType?.includes('application/json')) {
|
|
const errorData = await response.json();
|
|
if (isServiceError(errorData)) {
|
|
throw new ServiceErrorException(errorData);
|
|
}
|
|
}
|
|
throw new Error(`HTTP error! status: ${response.status}`);
|
|
}
|
|
|
|
if (!response.body) {
|
|
throw new Error('No response body');
|
|
}
|
|
|
|
const reader = response.body.getReader();
|
|
const decoder = new TextDecoder();
|
|
let buffer = '';
|
|
let numMessagesProcessed = 0;
|
|
|
|
while (true as boolean) {
|
|
const { done, value } = await reader.read();
|
|
|
|
if (done) {
|
|
break;
|
|
}
|
|
|
|
// Decode the chunk and add to buffer
|
|
buffer += decoder.decode(value, { stream: true });
|
|
|
|
// Process complete SSE messages (separated by \n\n)
|
|
const messages = buffer.split('\n\n');
|
|
|
|
// Keep the last element (potentially incomplete message) in the buffer for the next chunk.
|
|
// Stream chunks can split messages mid-way, so we only process complete messages.
|
|
buffer = messages.pop() || '';
|
|
|
|
for (const message of messages) {
|
|
if (!message.trim()) {
|
|
continue;
|
|
}
|
|
|
|
// SSE messages start with "data: "
|
|
const dataMatch = message.match(/^data: (.+)$/);
|
|
if (!dataMatch) {
|
|
continue;
|
|
}
|
|
|
|
const data = dataMatch[1];
|
|
|
|
// Check for completion signal
|
|
if (data === '[DONE]') {
|
|
break;
|
|
}
|
|
|
|
const response: StreamedSearchResponse = JSON.parse(data);
|
|
const isFirstMessage = numMessagesProcessed === 0;
|
|
switch (response.type) {
|
|
case 'chunk':
|
|
setState(prev => ({
|
|
...prev,
|
|
files: [
|
|
...prev.files,
|
|
...response.files
|
|
],
|
|
repoInfo: {
|
|
...prev.repoInfo,
|
|
...response.repositoryInfo.reduce((acc, repo) => {
|
|
acc[repo.id] = repo;
|
|
return acc;
|
|
}, {} as Record<number, RepositoryInfo>),
|
|
},
|
|
numMatches: prev.numMatches + response.stats.actualMatchCount,
|
|
...(isFirstMessage ? {
|
|
timeToFirstSearchResultMs: performance.now() - startTime,
|
|
} : {}),
|
|
}));
|
|
break;
|
|
case 'final':
|
|
setState(prev => ({
|
|
...prev,
|
|
isExhaustive: response.isSearchExhaustive,
|
|
stats: response.accumulatedStats,
|
|
...(isFirstMessage ? {
|
|
timeToFirstSearchResultMs: performance.now() - startTime,
|
|
} : {}),
|
|
}));
|
|
break;
|
|
case 'error':
|
|
throw new ServiceErrorException(response.error);
|
|
}
|
|
|
|
numMessagesProcessed++;
|
|
}
|
|
}
|
|
|
|
const timeToSearchCompletionMs = performance.now() - startTime;
|
|
setState(prev => {
|
|
// Cache the final results after the stream has completed.
|
|
searchCache.set(cacheKey, {
|
|
files: prev.files,
|
|
repoInfo: prev.repoInfo,
|
|
isExhaustive: prev.isExhaustive,
|
|
numMatches: prev.numMatches,
|
|
timeToFirstSearchResultMs: prev.timeToFirstSearchResultMs,
|
|
timeToSearchCompletionMs,
|
|
timestamp: Date.now(),
|
|
});
|
|
return {
|
|
...prev,
|
|
timeToSearchCompletionMs,
|
|
isStreaming: false,
|
|
}
|
|
});
|
|
|
|
} catch (error) {
|
|
if ((error as Error).name === 'AbortError') {
|
|
return;
|
|
}
|
|
|
|
console.error(error);
|
|
Sentry.captureException(error);
|
|
const timeToSearchCompletionMs = performance.now() - startTime;
|
|
setState(prev => ({
|
|
...prev,
|
|
isStreaming: false,
|
|
timeToSearchCompletionMs,
|
|
error: error instanceof Error ? error : null,
|
|
}));
|
|
}
|
|
}
|
|
|
|
search();
|
|
|
|
return () => {
|
|
cancel();
|
|
}
|
|
}, [
|
|
query,
|
|
matches,
|
|
contextLines,
|
|
whole,
|
|
isRegexEnabled,
|
|
isCaseSensitivityEnabled,
|
|
cancel,
|
|
]);
|
|
|
|
return {
|
|
...state,
|
|
cancel,
|
|
};
|
|
} |