mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 20:35:24 +00:00
migrate blocking search over to grpc. Centralize everything in searchApi
This commit is contained in:
parent
afcc30dca4
commit
898c9097db
10 changed files with 601 additions and 1118 deletions
|
|
@ -1,540 +1,30 @@
|
|||
'use server';
|
||||
|
||||
import { searchRequestSchema, SearchStats, SourceRange, StreamedSearchResponse } from '@/features/search/types';
|
||||
import { SINGLE_TENANT_ORG_ID } from '@/lib/constants';
|
||||
import { streamSearch } from '@/features/search/searchApi';
|
||||
import { searchRequestSchema } from '@/features/search/types';
|
||||
import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError';
|
||||
import { prisma } from '@/prisma';
|
||||
import type { ProtoGrpcType } from '@/proto/webserver';
|
||||
import { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch';
|
||||
import { Range__Output } from '@/proto/zoekt/webserver/v1/Range';
|
||||
import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest';
|
||||
import { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse';
|
||||
import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest';
|
||||
import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse';
|
||||
import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
|
||||
import * as grpc from '@grpc/grpc-js';
|
||||
import * as protoLoader from '@grpc/proto-loader';
|
||||
import * as Sentry from '@sentry/nextjs';
|
||||
import { PrismaClient, Repo } from '@sourcebot/db';
|
||||
import { parser as _parser } from '@sourcebot/query-language';
|
||||
import { createLogger, env } from '@sourcebot/shared';
|
||||
import { isServiceError } from '@/lib/utils';
|
||||
import { NextRequest } from 'next/server';
|
||||
import * as path from 'path';
|
||||
import { transformToZoektQuery } from './transformer';
|
||||
|
||||
const logger = createLogger('streamSearchApi');
|
||||
|
||||
/**
|
||||
* Create a gRPC client for the Zoekt webserver
|
||||
*/
|
||||
function createGrpcClient(): WebserverServiceClient {
|
||||
// Path to proto files - these should match your monorepo structure
|
||||
const protoBasePath = path.join(process.cwd(), '../../vendor/zoekt/grpc/protos');
|
||||
const protoPath = path.join(protoBasePath, 'zoekt/webserver/v1/webserver.proto');
|
||||
|
||||
const packageDefinition = protoLoader.loadSync(protoPath, {
|
||||
keepCase: true,
|
||||
longs: Number,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true,
|
||||
includeDirs: [protoBasePath],
|
||||
});
|
||||
|
||||
const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType;
|
||||
|
||||
// Extract host and port from ZOEKT_WEBSERVER_URL
|
||||
const zoektUrl = new URL(env.ZOEKT_WEBSERVER_URL);
|
||||
const grpcAddress = `${zoektUrl.hostname}:${zoektUrl.port}`;
|
||||
|
||||
return new proto.zoekt.webserver.v1.WebserverService(
|
||||
grpcAddress,
|
||||
grpc.credentials.createInsecure(),
|
||||
{
|
||||
'grpc.max_receive_message_length': 500 * 1024 * 1024, // 500MB
|
||||
'grpc.max_send_message_length': 500 * 1024 * 1024, // 500MB
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* POST handler for streaming search via SSE
|
||||
*/
|
||||
export const POST = async (request: NextRequest) => {
|
||||
try {
|
||||
// Parse and validate request body
|
||||
const body = await request.json();
|
||||
const parsed = await searchRequestSchema.safeParseAsync(body);
|
||||
const body = await request.json();
|
||||
const parsed = await searchRequestSchema.safeParseAsync(body);
|
||||
|
||||
if (!parsed.success) {
|
||||
return serviceErrorResponse(schemaValidationError(parsed.error));
|
||||
}
|
||||
|
||||
const {
|
||||
query,
|
||||
matches,
|
||||
contextLines,
|
||||
whole,
|
||||
isRegexEnabled = false,
|
||||
isCaseSensitivityEnabled = false,
|
||||
} = parsed.data;
|
||||
|
||||
const parser = _parser.configure({
|
||||
strict: true,
|
||||
});
|
||||
|
||||
const tree = parser.parse(query);
|
||||
const zoektQuery = await transformToZoektQuery({
|
||||
tree,
|
||||
input: query,
|
||||
isCaseSensitivityEnabled,
|
||||
isRegexEnabled,
|
||||
onExpandSearchContext: async (contextName: string) => {
|
||||
const context = await prisma.searchContext.findUnique({
|
||||
where: {
|
||||
name_orgId: {
|
||||
name: contextName,
|
||||
orgId: SINGLE_TENANT_ORG_ID,
|
||||
}
|
||||
},
|
||||
include: {
|
||||
repos: true,
|
||||
}
|
||||
});
|
||||
|
||||
if (!context) {
|
||||
throw new Error(`Search context "${contextName}" not found`);
|
||||
}
|
||||
|
||||
return context.repos.map((repo) => repo.name);
|
||||
},
|
||||
});
|
||||
|
||||
console.log(JSON.stringify(zoektQuery, null, 2));
|
||||
|
||||
const searchRequest: SearchRequest = {
|
||||
query: {
|
||||
and: {
|
||||
children: [
|
||||
zoektQuery,
|
||||
{
|
||||
branch: {
|
||||
pattern: 'HEAD',
|
||||
exact: true,
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
opts: {
|
||||
chunk_matches: true,
|
||||
max_match_display_count: matches,
|
||||
total_max_match_count: matches + 1,
|
||||
num_context_lines: contextLines,
|
||||
whole: !!whole,
|
||||
shard_max_match_count: -1,
|
||||
max_wall_time: {
|
||||
seconds: 0,
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// @nocheckin: this should be using the `prisma` instance from the auth context.
|
||||
const stream = await createSSESearchStream(searchRequest, prisma);
|
||||
|
||||
|
||||
// Return streaming response with SSE headers
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache, no-transform',
|
||||
'Connection': 'keep-alive',
|
||||
'X-Accel-Buffering': 'no', // Disable nginx buffering if applicable
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Request handling error:', error);
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
error: {
|
||||
message: error instanceof Error ? error.message : 'Internal server error'
|
||||
}
|
||||
}),
|
||||
{
|
||||
status: 500,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
}
|
||||
);
|
||||
if (!parsed.success) {
|
||||
return serviceErrorResponse(schemaValidationError(parsed.error));
|
||||
}
|
||||
};
|
||||
|
||||
const createSSESearchStream = async (searchRequest: SearchRequest, prisma: PrismaClient): Promise<ReadableStream> => {
|
||||
const client = createGrpcClient();
|
||||
let grpcStream: ReturnType<WebserverServiceClient['StreamSearch']> | null = null;
|
||||
let isStreamActive = true;
|
||||
let pendingChunks = 0;
|
||||
let accumulatedStats: SearchStats = {
|
||||
actualMatchCount: 0,
|
||||
totalMatchCount: 0,
|
||||
duration: 0,
|
||||
fileCount: 0,
|
||||
filesSkipped: 0,
|
||||
contentBytesLoaded: 0,
|
||||
indexBytesLoaded: 0,
|
||||
crashes: 0,
|
||||
shardFilesConsidered: 0,
|
||||
filesConsidered: 0,
|
||||
filesLoaded: 0,
|
||||
shardsScanned: 0,
|
||||
shardsSkipped: 0,
|
||||
shardsSkippedFilter: 0,
|
||||
ngramMatches: 0,
|
||||
ngramLookups: 0,
|
||||
wait: 0,
|
||||
matchTreeConstruction: 0,
|
||||
matchTreeSearch: 0,
|
||||
regexpsConsidered: 0,
|
||||
flushReason: 0,
|
||||
};
|
||||
const stream = await streamSearch(parsed.data);
|
||||
if (isServiceError(stream)) {
|
||||
return serviceErrorResponse(stream);
|
||||
}
|
||||
|
||||
return new ReadableStream({
|
||||
async start(controller) {
|
||||
const tryCloseController = () => {
|
||||
if (!isStreamActive && pendingChunks === 0) {
|
||||
const finalResponse: StreamedSearchResponse = {
|
||||
type: 'final',
|
||||
accumulatedStats,
|
||||
isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount,
|
||||
}
|
||||
|
||||
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finalResponse)}\n\n`));
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
|
||||
controller.close();
|
||||
client.close();
|
||||
logger.debug('SSE stream closed');
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const metadata = new grpc.Metadata();
|
||||
|
||||
const streamRequest: StreamSearchRequest = {
|
||||
request: searchRequest,
|
||||
};
|
||||
|
||||
grpcStream = client.StreamSearch(streamRequest, metadata);
|
||||
|
||||
// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
|
||||
// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
|
||||
// metadata when transforming (potentially thousands) of file matches, we aggregate a unique
|
||||
// set of repository ids* and map them to their corresponding Repo record.
|
||||
//
|
||||
// *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`?
|
||||
// A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was
|
||||
// always undefined. To make this a non-breaking change, we fallback to using the repository's name
|
||||
// (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in
|
||||
// practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo',
|
||||
// 'gitea.com/org/repo', etc.).
|
||||
//
|
||||
// Note: When a repository is re-indexed (every hour) this ID will be populated.
|
||||
// @see: https://github.com/sourcebot-dev/zoekt/pull/6
|
||||
const getRepoIdForFile = (file: FileMatch__Output): string | number => {
|
||||
return file.repository_id ?? file.repository;
|
||||
}
|
||||
|
||||
// `_reposMapCache` is used to cache repository metadata across all chunks.
|
||||
// This reduces the number of database queries required to transform file matches.
|
||||
const _reposMapCache = new Map<string | number, Repo>();
|
||||
|
||||
// Creates a mapping between all repository ids in a given response
|
||||
// chunk. The mapping allows us to efficiently lookup repository metadata.
|
||||
const createReposMapForChunk = async (chunk: SearchResponse__Output): Promise<Map<string | number, Repo>> => {
|
||||
const reposMap = new Map<string | number, Repo>();
|
||||
await Promise.all(chunk.files.map(async (file) => {
|
||||
const id = getRepoIdForFile(file);
|
||||
|
||||
const repo = await (async () => {
|
||||
// If it's in the cache, return the cached value.
|
||||
if (_reposMapCache.has(id)) {
|
||||
return _reposMapCache.get(id);
|
||||
}
|
||||
|
||||
// Otherwise, query the database for the record.
|
||||
const repo = typeof id === 'number' ?
|
||||
await prisma.repo.findUnique({
|
||||
where: {
|
||||
id: id,
|
||||
},
|
||||
}) :
|
||||
await prisma.repo.findFirst({
|
||||
where: {
|
||||
name: id,
|
||||
},
|
||||
});
|
||||
|
||||
// If a repository is found, cache it for future lookups.
|
||||
if (repo) {
|
||||
_reposMapCache.set(id, repo);
|
||||
}
|
||||
|
||||
return repo;
|
||||
})();
|
||||
|
||||
// Only add the repository to the map if it was found.
|
||||
if (repo) {
|
||||
reposMap.set(id, repo);
|
||||
}
|
||||
}));
|
||||
|
||||
return reposMap;
|
||||
}
|
||||
|
||||
// Handle incoming data chunks
|
||||
grpcStream.on('data', async (chunk: StreamSearchResponse__Output) => {
|
||||
if (!isStreamActive) {
|
||||
logger.debug('SSE stream closed, skipping chunk');
|
||||
return;
|
||||
}
|
||||
|
||||
// Track that we're processing a chunk
|
||||
pendingChunks++;
|
||||
|
||||
// grpcStream.on doesn't actually await on our handler, so we need to
|
||||
// explicitly pause the stream here to prevent the stream from completing
|
||||
// prior to our asynchronous work being completed.
|
||||
grpcStream?.pause();
|
||||
|
||||
try {
|
||||
if (!chunk.response_chunk) {
|
||||
logger.warn('No response chunk received');
|
||||
return;
|
||||
}
|
||||
|
||||
const repoIdToRepoDBRecordMap = await createReposMapForChunk(chunk.response_chunk);
|
||||
|
||||
const files = chunk.response_chunk.files.map((file) => {
|
||||
const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name);
|
||||
const repoId = getRepoIdForFile(file);
|
||||
const repo = repoIdToRepoDBRecordMap.get(repoId);
|
||||
|
||||
// This can happen if the user doesn't have access to the repository.
|
||||
if (!repo) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// @todo: address "file_name might not be a valid UTF-8 string" warning.
|
||||
const fileName = file.file_name.toString('utf-8');
|
||||
|
||||
const convertRange = (range: Range__Output): SourceRange => ({
|
||||
start: {
|
||||
byteOffset: range.start?.byte_offset ?? 0,
|
||||
column: range.start?.column ?? 0,
|
||||
lineNumber: range.start?.line_number ?? 0,
|
||||
},
|
||||
end: {
|
||||
byteOffset: range.end?.byte_offset ?? 0,
|
||||
column: range.end?.column ?? 0,
|
||||
lineNumber: range.end?.line_number ?? 0,
|
||||
}
|
||||
})
|
||||
|
||||
return {
|
||||
fileName: {
|
||||
text: fileName,
|
||||
matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [],
|
||||
},
|
||||
repository: repo.name,
|
||||
repositoryId: repo.id,
|
||||
language: file.language,
|
||||
// @todo: we will need to have a mechanism of forming the file's web url.
|
||||
webUrl: '',
|
||||
chunks: file.chunk_matches
|
||||
.filter((chunk) => !chunk.file_name) // filter out filename chunks.
|
||||
.map((chunk) => {
|
||||
return {
|
||||
content: chunk.content.toString('utf-8'),
|
||||
matchRanges: chunk.ranges.map(convertRange),
|
||||
contentStart: chunk.content_start ? {
|
||||
byteOffset: chunk.content_start.byte_offset,
|
||||
column: chunk.content_start.column,
|
||||
lineNumber: chunk.content_start.line_number,
|
||||
} : {
|
||||
byteOffset: 0,
|
||||
column: 0,
|
||||
lineNumber: 0,
|
||||
},
|
||||
symbols: chunk.symbol_info.map((symbol) => {
|
||||
return {
|
||||
symbol: symbol.sym,
|
||||
kind: symbol.kind,
|
||||
parent: symbol.parent ? {
|
||||
symbol: symbol.parent,
|
||||
kind: symbol.parent_kind,
|
||||
} : undefined,
|
||||
}
|
||||
})
|
||||
}
|
||||
}),
|
||||
branches: file.branches,
|
||||
content: file.content ? file.content.toString('utf-8') : undefined,
|
||||
}
|
||||
}).filter(file => file !== undefined);
|
||||
|
||||
const actualMatchCount = files.reduce(
|
||||
(acc, file) =>
|
||||
// Match count is the sum of the number of chunk matches and file name matches.
|
||||
acc + file.chunks.reduce(
|
||||
(acc, chunk) => acc + chunk.matchRanges.length,
|
||||
0,
|
||||
) + file.fileName.matchRanges.length,
|
||||
0,
|
||||
);
|
||||
|
||||
const stats: SearchStats = {
|
||||
actualMatchCount,
|
||||
totalMatchCount: chunk.response_chunk.stats?.match_count ?? 0,
|
||||
duration: chunk.response_chunk.stats?.duration?.nanos ?? 0,
|
||||
fileCount: chunk.response_chunk.stats?.file_count ?? 0,
|
||||
filesSkipped: chunk.response_chunk.stats?.files_skipped ?? 0,
|
||||
contentBytesLoaded: chunk.response_chunk.stats?.content_bytes_loaded ?? 0,
|
||||
indexBytesLoaded: chunk.response_chunk.stats?.index_bytes_loaded ?? 0,
|
||||
crashes: chunk.response_chunk.stats?.crashes ?? 0,
|
||||
shardFilesConsidered: chunk.response_chunk.stats?.shard_files_considered ?? 0,
|
||||
filesConsidered: chunk.response_chunk.stats?.files_considered ?? 0,
|
||||
filesLoaded: chunk.response_chunk.stats?.files_loaded ?? 0,
|
||||
shardsScanned: chunk.response_chunk.stats?.shards_scanned ?? 0,
|
||||
shardsSkipped: chunk.response_chunk.stats?.shards_skipped ?? 0,
|
||||
shardsSkippedFilter: chunk.response_chunk.stats?.shards_skipped_filter ?? 0,
|
||||
ngramMatches: chunk.response_chunk.stats?.ngram_matches ?? 0,
|
||||
ngramLookups: chunk.response_chunk.stats?.ngram_lookups ?? 0,
|
||||
wait: chunk.response_chunk.stats?.wait?.nanos ?? 0,
|
||||
matchTreeConstruction: chunk.response_chunk.stats?.match_tree_construction?.nanos ?? 0,
|
||||
matchTreeSearch: chunk.response_chunk.stats?.match_tree_search?.nanos ?? 0,
|
||||
regexpsConsidered: chunk.response_chunk.stats?.regexps_considered ?? 0,
|
||||
// @todo: handle this.
|
||||
// flushReason: chunk.response_chunk.stats?.flush_reason ?? 0,
|
||||
flushReason: 0
|
||||
}
|
||||
|
||||
accumulatedStats = accumulateStats(accumulatedStats, stats);
|
||||
|
||||
const response: StreamedSearchResponse = {
|
||||
type: 'chunk',
|
||||
files,
|
||||
repositoryInfo: Array.from(repoIdToRepoDBRecordMap.values()).map((repo) => ({
|
||||
id: repo.id,
|
||||
codeHostType: repo.external_codeHostType,
|
||||
name: repo.name,
|
||||
displayName: repo.displayName ?? undefined,
|
||||
webUrl: repo.webUrl ?? undefined,
|
||||
})),
|
||||
stats
|
||||
}
|
||||
|
||||
const sseData = `data: ${JSON.stringify(response)}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(sseData));
|
||||
} catch (error) {
|
||||
console.error('Error encoding chunk:', error);
|
||||
} finally {
|
||||
pendingChunks--;
|
||||
grpcStream?.resume();
|
||||
|
||||
// @note: we were hitting "Controller is already closed" errors when calling
|
||||
// `controller.enqueue` above for the last chunk. The reasoning was the event
|
||||
// handler for 'end' was being invoked prior to the completion of the last chunk,
|
||||
// resulting in the controller being closed prematurely. The workaround was to
|
||||
// keep track of the number of pending chunks and only close the controller
|
||||
// when there are no more chunks to process. We need to explicitly call
|
||||
// `tryCloseController` since there _seems_ to be no ordering guarantees between
|
||||
// the 'end' event handler and this callback.
|
||||
tryCloseController();
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stream completion
|
||||
grpcStream.on('end', () => {
|
||||
if (!isStreamActive) {
|
||||
return;
|
||||
}
|
||||
isStreamActive = false;
|
||||
tryCloseController();
|
||||
});
|
||||
|
||||
// Handle errors
|
||||
grpcStream.on('error', (error: grpc.ServiceError) => {
|
||||
logger.error('gRPC stream error:', error);
|
||||
Sentry.captureException(error);
|
||||
|
||||
if (!isStreamActive) {
|
||||
return;
|
||||
}
|
||||
isStreamActive = false;
|
||||
|
||||
// Send error as SSE event
|
||||
const errorData = `data: ${JSON.stringify({
|
||||
error: {
|
||||
code: error.code,
|
||||
message: error.details || error.message,
|
||||
}
|
||||
})}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(errorData));
|
||||
|
||||
controller.close();
|
||||
client.close();
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Stream initialization error:', error);
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
const errorData = `data: ${JSON.stringify({
|
||||
error: { message: errorMessage }
|
||||
})}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(errorData));
|
||||
|
||||
controller.close();
|
||||
client.close();
|
||||
}
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache, no-transform',
|
||||
'Connection': 'keep-alive',
|
||||
'X-Accel-Buffering': 'no', // Disable nginx buffering if applicable
|
||||
},
|
||||
cancel() {
|
||||
logger.warn('SSE stream cancelled by client');
|
||||
isStreamActive = false;
|
||||
|
||||
// Cancel the gRPC stream to stop receiving data
|
||||
if (grpcStream) {
|
||||
grpcStream.cancel();
|
||||
}
|
||||
|
||||
client.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const accumulateStats = (a: SearchStats, b: SearchStats): SearchStats => {
|
||||
return {
|
||||
actualMatchCount: a.actualMatchCount + b.actualMatchCount,
|
||||
totalMatchCount: a.totalMatchCount + b.totalMatchCount,
|
||||
duration: a.duration + b.duration,
|
||||
fileCount: a.fileCount + b.fileCount,
|
||||
filesSkipped: a.filesSkipped + b.filesSkipped,
|
||||
contentBytesLoaded: a.contentBytesLoaded + b.contentBytesLoaded,
|
||||
indexBytesLoaded: a.indexBytesLoaded + b.indexBytesLoaded,
|
||||
crashes: a.crashes + b.crashes,
|
||||
shardFilesConsidered: a.shardFilesConsidered + b.shardFilesConsidered,
|
||||
filesConsidered: a.filesConsidered + b.filesConsidered,
|
||||
filesLoaded: a.filesLoaded + b.filesLoaded,
|
||||
shardsScanned: a.shardsScanned + b.shardsScanned,
|
||||
shardsSkipped: a.shardsSkipped + b.shardsSkipped,
|
||||
shardsSkippedFilter: a.shardsSkippedFilter + b.shardsSkippedFilter,
|
||||
ngramMatches: a.ngramMatches + b.ngramMatches,
|
||||
ngramLookups: a.ngramLookups + b.ngramLookups,
|
||||
wait: a.wait + b.wait,
|
||||
matchTreeConstruction: a.matchTreeConstruction + b.matchTreeConstruction,
|
||||
matchTreeSearch: a.matchTreeSearch + b.matchTreeSearch,
|
||||
regexpsConsidered: a.regexpsConsidered + b.regexpsConsidered,
|
||||
...(a.flushReason === 0 ? {
|
||||
flushReason: b.flushReason
|
||||
} : {
|
||||
flushReason: a.flushReason,
|
||||
}),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -180,10 +180,10 @@ Multiple expressions can be or'd together with or, negated with -, or grouped wi
|
|||
const response = await search({
|
||||
query,
|
||||
matches: limit ?? 100,
|
||||
// @todo: we can make this configurable.
|
||||
contextLines: 3,
|
||||
whole: false,
|
||||
// @todo(mt): handle multi-tenancy.
|
||||
isCaseSensitivityEnabled: true,
|
||||
isRegexEnabled: true,
|
||||
});
|
||||
|
||||
if (isServiceError(response)) {
|
||||
|
|
|
|||
|
|
@ -19,12 +19,14 @@ export const findSearchBasedSymbolReferences = async (props: FindRelatedSymbolsR
|
|||
revisionName = "HEAD",
|
||||
} = props;
|
||||
|
||||
const query = `\\b${symbolName}\\b rev:${revisionName} ${getExpandedLanguageFilter(language)} case:yes`;
|
||||
const query = `\\b${symbolName}\\b rev:${revisionName} ${getExpandedLanguageFilter(language)}`;
|
||||
|
||||
const searchResult = await search({
|
||||
query,
|
||||
matches: MAX_REFERENCE_COUNT,
|
||||
contextLines: 0,
|
||||
isCaseSensitivityEnabled: true,
|
||||
isRegexEnabled: true,
|
||||
});
|
||||
|
||||
if (isServiceError(searchResult)) {
|
||||
|
|
@ -49,6 +51,8 @@ export const findSearchBasedSymbolDefinitions = async (props: FindRelatedSymbols
|
|||
query,
|
||||
matches: MAX_REFERENCE_COUNT,
|
||||
contextLines: 0,
|
||||
isCaseSensitivityEnabled: true,
|
||||
isRegexEnabled: true,
|
||||
});
|
||||
|
||||
if (isServiceError(searchResult)) {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import 'server-only';
|
||||
import escapeStringRegexp from "escape-string-regexp";
|
||||
import { fileNotFound, ServiceError, unexpectedError } from "../../lib/serviceError";
|
||||
import { FileSourceRequest, FileSourceResponse } from "./types";
|
||||
import { isServiceError } from "../../lib/utils";
|
||||
|
|
@ -12,18 +11,17 @@ import { withOptionalAuthV2 } from "@/withAuthV2";
|
|||
|
||||
export const getFileSource = async ({ fileName, repository, branch }: FileSourceRequest): Promise<FileSourceResponse | ServiceError> => sew(() =>
|
||||
withOptionalAuthV2(async () => {
|
||||
const escapedFileName = escapeStringRegexp(fileName);
|
||||
const escapedRepository = escapeStringRegexp(repository);
|
||||
|
||||
let query = `file:${escapedFileName} repo:^${escapedRepository}$`;
|
||||
let query = `file:${fileName} repo:^${repository}$`;
|
||||
if (branch) {
|
||||
query = query.concat(` branch:${branch}`);
|
||||
query = query.concat(` rev:${branch}`);
|
||||
}
|
||||
|
||||
const searchResponse = await search({
|
||||
query,
|
||||
matches: 1,
|
||||
whole: true,
|
||||
isCaseSensitivityEnabled: true,
|
||||
isRegexEnabled: true,
|
||||
});
|
||||
|
||||
if (isServiceError(searchResponse)) {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { Q } from '@/proto/zoekt/webserver/v1/Q';
|
||||
import { Q as ZoektGrpcQuery } from '@/proto/zoekt/webserver/v1/Q';
|
||||
import {
|
||||
AndExpr,
|
||||
ArchivedExpr,
|
||||
|
|
@ -21,6 +21,12 @@ import {
|
|||
Tree,
|
||||
VisibilityExpr,
|
||||
} from '@sourcebot/query-language';
|
||||
import { parser as _lezerQueryParser } from '@sourcebot/query-language';
|
||||
|
||||
const lezerQueryParser = _lezerQueryParser.configure({
|
||||
strict: true,
|
||||
});
|
||||
|
||||
|
||||
type ArchivedValue = 'yes' | 'no' | 'only';
|
||||
type VisibilityValue = 'public' | 'private' | 'any';
|
||||
|
|
@ -38,10 +44,11 @@ const isForkValue = (value: string): value is ForkValue => {
|
|||
return value === 'yes' || value === 'no' || value === 'only';
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform a Lezer parse tree into a Zoekt gRPC query
|
||||
*/
|
||||
export const transformToZoektQuery = ({
|
||||
export const parseQueryIntoLezerTree = (query: string): Tree => {
|
||||
return lezerQueryParser.parse(query);
|
||||
}
|
||||
|
||||
export const transformLezerTreeToZoektGrpcQuery = async ({
|
||||
tree,
|
||||
input,
|
||||
isCaseSensitivityEnabled,
|
||||
|
|
@ -53,9 +60,9 @@ export const transformToZoektQuery = ({
|
|||
isCaseSensitivityEnabled: boolean;
|
||||
isRegexEnabled: boolean;
|
||||
onExpandSearchContext: (contextName: string) => Promise<string[]>;
|
||||
}): Promise<Q> => {
|
||||
}): Promise<ZoektGrpcQuery> => {
|
||||
|
||||
const transformNode = async (node: SyntaxNode): Promise<Q> => {
|
||||
const transformNode = async (node: SyntaxNode): Promise<ZoektGrpcQuery> => {
|
||||
switch (node.type.id) {
|
||||
case Program: {
|
||||
// Program wraps the actual query - transform its child
|
||||
|
|
@ -134,7 +141,7 @@ export const transformToZoektQuery = ({
|
|||
}
|
||||
}
|
||||
|
||||
const transformPrefixExpr = async (node: SyntaxNode): Promise<Q> => {
|
||||
const transformPrefixExpr = async (node: SyntaxNode): Promise<ZoektGrpcQuery> => {
|
||||
// Find which specific prefix type this is
|
||||
const prefixNode = node.firstChild;
|
||||
if (!prefixNode) {
|
||||
|
|
@ -207,13 +214,13 @@ export const transformToZoektQuery = ({
|
|||
return {
|
||||
symbol: {
|
||||
expr: {
|
||||
substring: {
|
||||
pattern: value,
|
||||
regexp: {
|
||||
regexp: value,
|
||||
case_sensitive: isCaseSensitivityEnabled,
|
||||
file_name: false,
|
||||
content: true
|
||||
},
|
||||
query: "substring"
|
||||
query: "regexp"
|
||||
}
|
||||
},
|
||||
query: "symbol"
|
||||
|
|
@ -1,71 +1,444 @@
|
|||
import 'server-only';
|
||||
import { sew } from "@/actions";
|
||||
import { SINGLE_TENANT_ORG_ID } from '@/lib/constants';
|
||||
import type { ProtoGrpcType } from '@/proto/webserver';
|
||||
import { FileMatch__Output as ZoektGrpcFileMatch } from "@/proto/zoekt/webserver/v1/FileMatch";
|
||||
import { Range__Output as ZoektGrpcRange } from "@/proto/zoekt/webserver/v1/Range";
|
||||
import type { SearchRequest as ZoektGrpcSearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest';
|
||||
import { SearchResponse__Output as ZoektGrpcSearchResponse } from "@/proto/zoekt/webserver/v1/SearchResponse";
|
||||
import { StreamSearchRequest as ZoektGrpcStreamSearchRequest } from "@/proto/zoekt/webserver/v1/StreamSearchRequest";
|
||||
import { StreamSearchResponse__Output as ZoektGrpcStreamSearchResponse } from "@/proto/zoekt/webserver/v1/StreamSearchResponse";
|
||||
import { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService';
|
||||
import { withOptionalAuthV2 } from "@/withAuthV2";
|
||||
import * as grpc from '@grpc/grpc-js';
|
||||
import * as protoLoader from '@grpc/proto-loader';
|
||||
import * as Sentry from '@sentry/nextjs';
|
||||
import { PrismaClient, Repo } from "@sourcebot/db";
|
||||
import { base64Decode, createLogger } from "@sourcebot/shared";
|
||||
import { StatusCodes } from "http-status-codes";
|
||||
import { ErrorCode } from "../../lib/errorCodes";
|
||||
import { invalidZoektResponse, ServiceError } from "../../lib/serviceError";
|
||||
import { isServiceError, measure } from "../../lib/utils";
|
||||
import { SearchRequest, SearchResponse, SourceRange } from "./types";
|
||||
import { zoektFetch } from "./zoektClient";
|
||||
import { ZoektSearchResponse } from "./zoektSchema";
|
||||
import { createLogger, env } from "@sourcebot/shared";
|
||||
import path from 'path';
|
||||
import { parseQueryIntoLezerTree, transformLezerTreeToZoektGrpcQuery } from './query';
|
||||
import { RepositoryInfo, SearchRequest, SearchResponse, SearchResultFile, SearchStats, SourceRange, StreamedSearchResponse } from "./types";
|
||||
import { FlushReason as ZoektFlushReason } from "@/proto/zoekt/webserver/v1/FlushReason";
|
||||
|
||||
const logger = createLogger("searchApi");
|
||||
|
||||
// List of supported query prefixes in zoekt.
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/main/query/parse.go#L417
|
||||
enum zoektPrefixes {
|
||||
archived = "archived:",
|
||||
branchShort = "b:",
|
||||
branch = "branch:",
|
||||
caseShort = "c:",
|
||||
case = "case:",
|
||||
content = "content:",
|
||||
fileShort = "f:",
|
||||
file = "file:",
|
||||
fork = "fork:",
|
||||
public = "public:",
|
||||
repoShort = "r:",
|
||||
repo = "repo:",
|
||||
regex = "regex:",
|
||||
lang = "lang:",
|
||||
sym = "sym:",
|
||||
typeShort = "t:",
|
||||
type = "type:",
|
||||
reposet = "reposet:",
|
||||
export const search = (searchRequest: SearchRequest) => sew(() =>
|
||||
withOptionalAuthV2(async ({ prisma }) => {
|
||||
const zoektSearchRequest = await createZoektSearchRequest({
|
||||
searchRequest,
|
||||
prisma,
|
||||
});
|
||||
|
||||
console.debug('zoektSearchRequest:', JSON.stringify(zoektSearchRequest, null, 2));
|
||||
|
||||
return zoektSearch(zoektSearchRequest, prisma);
|
||||
}));
|
||||
|
||||
export const streamSearch = (searchRequest: SearchRequest) => sew(() =>
|
||||
withOptionalAuthV2(async ({ prisma }) => {
|
||||
const zoektSearchRequest = await createZoektSearchRequest({
|
||||
searchRequest,
|
||||
prisma,
|
||||
});
|
||||
|
||||
return zoektStreamSearch(zoektSearchRequest, prisma);
|
||||
}));
|
||||
|
||||
|
||||
const zoektSearch = async (searchRequest: ZoektGrpcSearchRequest, prisma: PrismaClient): Promise<SearchResponse> => {
|
||||
const client = createGrpcClient();
|
||||
const metadata = new grpc.Metadata();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
client.Search(searchRequest, metadata, async (error, response) => {
|
||||
if (error || !response) {
|
||||
reject(error || new Error('No response received'));
|
||||
return;
|
||||
}
|
||||
|
||||
const reposMapCache = await createReposMapForChunk(response, new Map<string | number, Repo>(), prisma);
|
||||
const { stats, files, repositoryInfo } = await transformZoektSearchResponse(response, reposMapCache);
|
||||
|
||||
resolve({
|
||||
stats,
|
||||
files,
|
||||
repositoryInfo,
|
||||
isSearchExhaustive: stats.actualMatchCount <= stats.totalMatchCount,
|
||||
} satisfies SearchResponse);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const transformZoektQuery = async (query: string, orgId: number, prisma: PrismaClient): Promise<string | ServiceError> => {
|
||||
const prevQueryParts = query.split(" ");
|
||||
const newQueryParts = [];
|
||||
const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, prisma: PrismaClient): Promise<ReadableStream> => {
|
||||
const client = createGrpcClient();
|
||||
let grpcStream: ReturnType<WebserverServiceClient['StreamSearch']> | null = null;
|
||||
let isStreamActive = true;
|
||||
let pendingChunks = 0;
|
||||
let accumulatedStats: SearchStats = {
|
||||
actualMatchCount: 0,
|
||||
totalMatchCount: 0,
|
||||
duration: 0,
|
||||
fileCount: 0,
|
||||
filesSkipped: 0,
|
||||
contentBytesLoaded: 0,
|
||||
indexBytesLoaded: 0,
|
||||
crashes: 0,
|
||||
shardFilesConsidered: 0,
|
||||
filesConsidered: 0,
|
||||
filesLoaded: 0,
|
||||
shardsScanned: 0,
|
||||
shardsSkipped: 0,
|
||||
shardsSkippedFilter: 0,
|
||||
ngramMatches: 0,
|
||||
ngramLookups: 0,
|
||||
wait: 0,
|
||||
matchTreeConstruction: 0,
|
||||
matchTreeSearch: 0,
|
||||
regexpsConsidered: 0,
|
||||
flushReason: ZoektFlushReason.FLUSH_REASON_UNKNOWN_UNSPECIFIED,
|
||||
};
|
||||
|
||||
for (const part of prevQueryParts) {
|
||||
return new ReadableStream({
|
||||
async start(controller) {
|
||||
const tryCloseController = () => {
|
||||
if (!isStreamActive && pendingChunks === 0) {
|
||||
const finalResponse: StreamedSearchResponse = {
|
||||
type: 'final',
|
||||
accumulatedStats,
|
||||
isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount,
|
||||
}
|
||||
|
||||
// Handle mapping `rev:` and `revision:` to `branch:`
|
||||
if (part.match(/^-?(rev|revision):.+$/)) {
|
||||
const isNegated = part.startsWith("-");
|
||||
let revisionName = part.slice(part.indexOf(":") + 1);
|
||||
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finalResponse)}\n\n`));
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'));
|
||||
controller.close();
|
||||
client.close();
|
||||
logger.debug('SSE stream closed');
|
||||
}
|
||||
};
|
||||
|
||||
// Special case: `*` -> search all revisions.
|
||||
// In zoekt, providing a blank string will match all branches.
|
||||
// @see: https://github.com/sourcebot-dev/zoekt/blob/main/eval.go#L560-L562
|
||||
if (revisionName === "*") {
|
||||
revisionName = "";
|
||||
try {
|
||||
const metadata = new grpc.Metadata();
|
||||
|
||||
const streamRequest: ZoektGrpcStreamSearchRequest = {
|
||||
request: searchRequest,
|
||||
};
|
||||
|
||||
grpcStream = client.StreamSearch(streamRequest, metadata);
|
||||
|
||||
// `_reposMapCache` is used to cache repository metadata across all chunks.
|
||||
// This reduces the number of database queries required to transform file matches.
|
||||
const _reposMapCache = new Map<string | number, Repo>();
|
||||
|
||||
// Handle incoming data chunks
|
||||
grpcStream.on('data', async (chunk: ZoektGrpcStreamSearchResponse) => {
|
||||
if (!isStreamActive) {
|
||||
logger.debug('SSE stream closed, skipping chunk');
|
||||
return;
|
||||
}
|
||||
|
||||
// Track that we're processing a chunk
|
||||
pendingChunks++;
|
||||
|
||||
// grpcStream.on doesn't actually await on our handler, so we need to
|
||||
// explicitly pause the stream here to prevent the stream from completing
|
||||
// prior to our asynchronous work being completed.
|
||||
grpcStream?.pause();
|
||||
|
||||
try {
|
||||
if (!chunk.response_chunk) {
|
||||
logger.warn('No response chunk received');
|
||||
return;
|
||||
}
|
||||
|
||||
const reposMapCache = await createReposMapForChunk(chunk.response_chunk, _reposMapCache, prisma);
|
||||
const { stats, files, repositoryInfo } = await transformZoektSearchResponse(chunk.response_chunk, reposMapCache);
|
||||
|
||||
accumulatedStats = accumulateStats(accumulatedStats, stats);
|
||||
|
||||
const response: StreamedSearchResponse = {
|
||||
type: 'chunk',
|
||||
files,
|
||||
repositoryInfo,
|
||||
stats
|
||||
}
|
||||
|
||||
const sseData = `data: ${JSON.stringify(response)}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(sseData));
|
||||
} catch (error) {
|
||||
console.error('Error encoding chunk:', error);
|
||||
} finally {
|
||||
pendingChunks--;
|
||||
grpcStream?.resume();
|
||||
|
||||
// @note: we were hitting "Controller is already closed" errors when calling
|
||||
// `controller.enqueue` above for the last chunk. The reasoning was the event
|
||||
// handler for 'end' was being invoked prior to the completion of the last chunk,
|
||||
// resulting in the controller being closed prematurely. The workaround was to
|
||||
// keep track of the number of pending chunks and only close the controller
|
||||
// when there are no more chunks to process. We need to explicitly call
|
||||
// `tryCloseController` since there _seems_ to be no ordering guarantees between
|
||||
// the 'end' event handler and this callback.
|
||||
tryCloseController();
|
||||
}
|
||||
});
|
||||
|
||||
// Handle stream completion
|
||||
grpcStream.on('end', () => {
|
||||
if (!isStreamActive) {
|
||||
return;
|
||||
}
|
||||
isStreamActive = false;
|
||||
tryCloseController();
|
||||
});
|
||||
|
||||
// Handle errors
|
||||
grpcStream.on('error', (error: grpc.ServiceError) => {
|
||||
logger.error('gRPC stream error:', error);
|
||||
Sentry.captureException(error);
|
||||
|
||||
if (!isStreamActive) {
|
||||
return;
|
||||
}
|
||||
isStreamActive = false;
|
||||
|
||||
// Send error as SSE event
|
||||
const errorData = `data: ${JSON.stringify({
|
||||
error: {
|
||||
code: error.code,
|
||||
message: error.details || error.message,
|
||||
}
|
||||
})}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(errorData));
|
||||
|
||||
controller.close();
|
||||
client.close();
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Stream initialization error:', error);
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
const errorData = `data: ${JSON.stringify({
|
||||
error: { message: errorMessage }
|
||||
})}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(errorData));
|
||||
|
||||
controller.close();
|
||||
client.close();
|
||||
}
|
||||
newQueryParts.push(`${isNegated ? "-" : ""}${zoektPrefixes.branch}${revisionName}`);
|
||||
},
|
||||
cancel() {
|
||||
logger.warn('SSE stream cancelled by client');
|
||||
isStreamActive = false;
|
||||
|
||||
// Cancel the gRPC stream to stop receiving data
|
||||
if (grpcStream) {
|
||||
grpcStream.cancel();
|
||||
}
|
||||
|
||||
client.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Creates a mapping between all repository ids in a given response
|
||||
// chunk. The mapping allows us to efficiently lookup repository metadata.
|
||||
const createReposMapForChunk = async (chunk: ZoektGrpcSearchResponse, reposMapCache: Map<string | number, Repo>, prisma: PrismaClient): Promise<Map<string | number, Repo>> => {
|
||||
const reposMap = new Map<string | number, Repo>();
|
||||
await Promise.all(chunk.files.map(async (file) => {
|
||||
const id = getRepoIdForFile(file);
|
||||
|
||||
const repo = await (async () => {
|
||||
// If it's in the cache, return the cached value.
|
||||
if (reposMapCache.has(id)) {
|
||||
return reposMapCache.get(id);
|
||||
}
|
||||
|
||||
// Otherwise, query the database for the record.
|
||||
const repo = typeof id === 'number' ?
|
||||
await prisma.repo.findUnique({
|
||||
where: {
|
||||
id: id,
|
||||
},
|
||||
}) :
|
||||
await prisma.repo.findFirst({
|
||||
where: {
|
||||
name: id,
|
||||
},
|
||||
});
|
||||
|
||||
// If a repository is found, cache it for future lookups.
|
||||
if (repo) {
|
||||
reposMapCache.set(id, repo);
|
||||
}
|
||||
|
||||
return repo;
|
||||
})();
|
||||
|
||||
// Only add the repository to the map if it was found.
|
||||
if (repo) {
|
||||
reposMap.set(id, repo);
|
||||
}
|
||||
}));
|
||||
|
||||
return reposMap;
|
||||
}
|
||||
|
||||
const transformZoektSearchResponse = async (response: ZoektGrpcSearchResponse, reposMapCache: Map<string | number, Repo>): Promise<{
|
||||
stats: SearchStats,
|
||||
files: SearchResultFile[],
|
||||
repositoryInfo: RepositoryInfo[],
|
||||
}> => {
|
||||
const files = response.files.map((file) => {
|
||||
const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name);
|
||||
const repoId = getRepoIdForFile(file);
|
||||
const repo = reposMapCache.get(repoId);
|
||||
|
||||
// This can happen if the user doesn't have access to the repository.
|
||||
if (!repo) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Expand `context:` into `reposet:` atom.
|
||||
else if (part.match(/^-?context:.+$/)) {
|
||||
const isNegated = part.startsWith("-");
|
||||
const contextName = part.slice(part.indexOf(":") + 1);
|
||||
// @todo: address "file_name might not be a valid UTF-8 string" warning.
|
||||
const fileName = file.file_name.toString('utf-8');
|
||||
|
||||
const convertRange = (range: ZoektGrpcRange): SourceRange => ({
|
||||
start: {
|
||||
byteOffset: range.start?.byte_offset ?? 0,
|
||||
column: range.start?.column ?? 0,
|
||||
lineNumber: range.start?.line_number ?? 0,
|
||||
},
|
||||
end: {
|
||||
byteOffset: range.end?.byte_offset ?? 0,
|
||||
column: range.end?.column ?? 0,
|
||||
lineNumber: range.end?.line_number ?? 0,
|
||||
}
|
||||
})
|
||||
|
||||
return {
|
||||
fileName: {
|
||||
text: fileName,
|
||||
matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [],
|
||||
},
|
||||
repository: repo.name,
|
||||
repositoryId: repo.id,
|
||||
language: file.language,
|
||||
// @todo: we will need to have a mechanism of forming the file's web url.
|
||||
webUrl: '',
|
||||
chunks: file.chunk_matches
|
||||
.filter((chunk) => !chunk.file_name) // filter out filename chunks.
|
||||
.map((chunk) => {
|
||||
return {
|
||||
content: chunk.content.toString('utf-8'),
|
||||
matchRanges: chunk.ranges.map(convertRange),
|
||||
contentStart: chunk.content_start ? {
|
||||
byteOffset: chunk.content_start.byte_offset,
|
||||
column: chunk.content_start.column,
|
||||
lineNumber: chunk.content_start.line_number,
|
||||
} : {
|
||||
byteOffset: 0,
|
||||
column: 0,
|
||||
lineNumber: 0,
|
||||
},
|
||||
symbols: chunk.symbol_info.map((symbol) => {
|
||||
return {
|
||||
symbol: symbol.sym,
|
||||
kind: symbol.kind,
|
||||
parent: symbol.parent ? {
|
||||
symbol: symbol.parent,
|
||||
kind: symbol.parent_kind,
|
||||
} : undefined,
|
||||
}
|
||||
})
|
||||
}
|
||||
}),
|
||||
branches: file.branches,
|
||||
content: file.content ? file.content.toString('utf-8') : undefined,
|
||||
}
|
||||
}).filter(file => file !== undefined);
|
||||
|
||||
const actualMatchCount = files.reduce(
|
||||
(acc, file) =>
|
||||
// Match count is the sum of the number of chunk matches and file name matches.
|
||||
acc + file.chunks.reduce(
|
||||
(acc, chunk) => acc + chunk.matchRanges.length,
|
||||
0,
|
||||
) + file.fileName.matchRanges.length,
|
||||
0,
|
||||
);
|
||||
|
||||
const stats: SearchStats = {
|
||||
actualMatchCount,
|
||||
totalMatchCount: response.stats?.match_count ?? 0,
|
||||
duration: response.stats?.duration?.nanos ?? 0,
|
||||
fileCount: response.stats?.file_count ?? 0,
|
||||
filesSkipped: response.stats?.files_skipped ?? 0,
|
||||
contentBytesLoaded: response.stats?.content_bytes_loaded ?? 0,
|
||||
indexBytesLoaded: response.stats?.index_bytes_loaded ?? 0,
|
||||
crashes: response.stats?.crashes ?? 0,
|
||||
shardFilesConsidered: response.stats?.shard_files_considered ?? 0,
|
||||
filesConsidered: response.stats?.files_considered ?? 0,
|
||||
filesLoaded: response.stats?.files_loaded ?? 0,
|
||||
shardsScanned: response.stats?.shards_scanned ?? 0,
|
||||
shardsSkipped: response.stats?.shards_skipped ?? 0,
|
||||
shardsSkippedFilter: response.stats?.shards_skipped_filter ?? 0,
|
||||
ngramMatches: response.stats?.ngram_matches ?? 0,
|
||||
ngramLookups: response.stats?.ngram_lookups ?? 0,
|
||||
wait: response.stats?.wait?.nanos ?? 0,
|
||||
matchTreeConstruction: response.stats?.match_tree_construction?.nanos ?? 0,
|
||||
matchTreeSearch: response.stats?.match_tree_search?.nanos ?? 0,
|
||||
regexpsConsidered: response.stats?.regexps_considered ?? 0,
|
||||
flushReason: response.stats?.flush_reason?.toString() ?? ZoektFlushReason.FLUSH_REASON_UNKNOWN_UNSPECIFIED,
|
||||
}
|
||||
|
||||
return {
|
||||
files,
|
||||
repositoryInfo: Array.from(reposMapCache.values()).map((repo) => ({
|
||||
id: repo.id,
|
||||
codeHostType: repo.external_codeHostType,
|
||||
name: repo.name,
|
||||
displayName: repo.displayName ?? undefined,
|
||||
webUrl: repo.webUrl ?? undefined,
|
||||
})),
|
||||
stats,
|
||||
}
|
||||
}
|
||||
|
||||
// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
|
||||
// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
|
||||
// metadata when transforming (potentially thousands) of file matches, we aggregate a unique
|
||||
// set of repository ids* and map them to their corresponding Repo record.
|
||||
//
|
||||
// *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`?
|
||||
// A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was
|
||||
// always undefined. To make this a non-breaking change, we fallback to using the repository's name
|
||||
// (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in
|
||||
// practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo',
|
||||
// 'gitea.com/org/repo', etc.).
|
||||
//
|
||||
// Note: When a repository is re-indexed (every hour) this ID will be populated.
|
||||
// @see: https://github.com/sourcebot-dev/zoekt/pull/6
|
||||
const getRepoIdForFile = (file: ZoektGrpcFileMatch): string | number => {
|
||||
return file.repository_id ?? file.repository;
|
||||
}
|
||||
|
||||
const createZoektSearchRequest = async ({
|
||||
searchRequest,
|
||||
prisma,
|
||||
}: {
|
||||
searchRequest: SearchRequest;
|
||||
prisma: PrismaClient;
|
||||
}) => {
|
||||
const tree = parseQueryIntoLezerTree(searchRequest.query);
|
||||
const zoektQuery = await transformLezerTreeToZoektGrpcQuery({
|
||||
tree,
|
||||
input: searchRequest.query,
|
||||
isCaseSensitivityEnabled: searchRequest.isCaseSensitivityEnabled ?? false,
|
||||
isRegexEnabled: searchRequest.isRegexEnabled ?? false,
|
||||
onExpandSearchContext: async (contextName: string) => {
|
||||
const context = await prisma.searchContext.findUnique({
|
||||
where: {
|
||||
name_orgId: {
|
||||
name: contextName,
|
||||
orgId,
|
||||
orgId: SINGLE_TENANT_ORG_ID,
|
||||
}
|
||||
},
|
||||
include: {
|
||||
|
|
@ -73,357 +446,139 @@ const transformZoektQuery = async (query: string, orgId: number, prisma: PrismaC
|
|||
}
|
||||
});
|
||||
|
||||
// If the context doesn't exist, return an error.
|
||||
if (!context) {
|
||||
return {
|
||||
errorCode: ErrorCode.SEARCH_CONTEXT_NOT_FOUND,
|
||||
message: `Search context "${contextName}" not found`,
|
||||
statusCode: StatusCodes.NOT_FOUND,
|
||||
} satisfies ServiceError;
|
||||
throw new Error(`Search context "${contextName}" not found`);
|
||||
}
|
||||
|
||||
const names = context.repos.map((repo) => repo.name);
|
||||
newQueryParts.push(`${isNegated ? "-" : ""}${zoektPrefixes.reposet}${names.join(",")}`);
|
||||
}
|
||||
return context.repos.map((repo) => repo.name);
|
||||
},
|
||||
});
|
||||
|
||||
// no-op: add the original part to the new query parts.
|
||||
else {
|
||||
newQueryParts.push(part);
|
||||
}
|
||||
}
|
||||
|
||||
return newQueryParts.join(" ");
|
||||
}
|
||||
|
||||
// Extracts a repository file URL from a zoekt template, branch, and file name.
|
||||
const getFileWebUrl = (template: string, branch: string, fileName: string): string | undefined => {
|
||||
// This is a hacky parser for templates generated by
|
||||
// the go text/template package. Example template:
|
||||
// {{URLJoinPath "https://github.com/sourcebot-dev/sourcebot" "blob" .Version .Path}}
|
||||
|
||||
if (!template.match(/^{{URLJoinPath\s.*}}(\?.+)?$/)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const url =
|
||||
template.substring("{{URLJoinPath ".length, template.indexOf("}}"))
|
||||
.split(" ")
|
||||
.map((part) => {
|
||||
// remove wrapping quotes
|
||||
if (part.startsWith("\"")) part = part.substring(1);
|
||||
if (part.endsWith("\"")) part = part.substring(0, part.length - 1);
|
||||
// Replace variable references
|
||||
if (part == ".Version") part = branch;
|
||||
if (part == ".Path") part = fileName;
|
||||
return part;
|
||||
})
|
||||
.join("/");
|
||||
|
||||
const optionalQueryParams =
|
||||
template.substring(template.indexOf("}}") + 2)
|
||||
.replace("{{.Version}}", branch)
|
||||
.replace("{{.Path}}", fileName);
|
||||
|
||||
return encodeURI(url + optionalQueryParams);
|
||||
}
|
||||
|
||||
export const search = async ({ query, matches, contextLines, whole }: SearchRequest): Promise<SearchResponse | ServiceError> => sew(() =>
|
||||
withOptionalAuthV2(async ({ org, prisma }) => {
|
||||
const transformedQuery = await transformZoektQuery(query, org.id, prisma);
|
||||
if (isServiceError(transformedQuery)) {
|
||||
return transformedQuery;
|
||||
}
|
||||
query = transformedQuery;
|
||||
|
||||
const isBranchFilteringEnabled = (
|
||||
query.includes(zoektPrefixes.branch) ||
|
||||
query.includes(zoektPrefixes.branchShort)
|
||||
);
|
||||
|
||||
// We only want to show matches for the default branch when
|
||||
// the user isn't explicitly filtering by branch.
|
||||
if (!isBranchFilteringEnabled) {
|
||||
query = query.concat(` branch:HEAD`);
|
||||
}
|
||||
|
||||
const body = JSON.stringify({
|
||||
q: query,
|
||||
// @see: https://github.com/sourcebot-dev/zoekt/blob/main/api.go#L892
|
||||
opts: {
|
||||
ChunkMatches: true,
|
||||
// @note: Zoekt has several different ways to limit a given search. The two that
|
||||
// we care about are `MaxMatchDisplayCount` and `TotalMaxMatchCount`:
|
||||
// - `MaxMatchDisplayCount` truncates the number of matches AFTER performing
|
||||
// a search (specifically, after collating and sorting the results). The number of
|
||||
// results returned by the API will be less than or equal to this value.
|
||||
//
|
||||
// - `TotalMaxMatchCount` truncates the number of matches DURING a search. The results
|
||||
// returned by the API the API can be less than, equal to, or greater than this value.
|
||||
// Why greater? Because this value is compared _after_ a given shard has finished
|
||||
// being processed, the number of matches returned by the last shard may have exceeded
|
||||
// this value.
|
||||
//
|
||||
// Let's define two variables:
|
||||
// - `actualMatchCount` : The number of matches that are returned by the API. This is
|
||||
// always less than or equal to `MaxMatchDisplayCount`.
|
||||
// - `totalMatchCount` : The number of matches that zoekt found before it either
|
||||
// 1) found all matches or 2) hit the `TotalMaxMatchCount` limit. This number is
|
||||
// not bounded and can be less than, equal to, or greater than both `TotalMaxMatchCount`
|
||||
// and `MaxMatchDisplayCount`.
|
||||
//
|
||||
//
|
||||
// Our challenge is to determine whether or not the search returned all possible matches/
|
||||
// (it was exaustive) or if it was truncated. By setting the `TotalMaxMatchCount` to
|
||||
// `MaxMatchDisplayCount + 1`, we can determine which of these occurred by comparing
|
||||
// `totalMatchCount` to `MaxMatchDisplayCount`.
|
||||
//
|
||||
// if (totalMatchCount ≤ actualMatchCount):
|
||||
// Search is EXHAUSTIVE (found all possible matches)
|
||||
// Proof: totalMatchCount ≤ MaxMatchDisplayCount < TotalMaxMatchCount
|
||||
// Therefore Zoekt stopped naturally, not due to limit
|
||||
//
|
||||
// if (totalMatchCount > actualMatchCount):
|
||||
// Search is TRUNCATED (more matches exist)
|
||||
// Proof: totalMatchCount > MaxMatchDisplayCount + 1 = TotalMaxMatchCount
|
||||
// Therefore Zoekt hit the limit and stopped searching
|
||||
//
|
||||
MaxMatchDisplayCount: matches,
|
||||
TotalMaxMatchCount: matches + 1,
|
||||
NumContextLines: contextLines,
|
||||
Whole: !!whole,
|
||||
ShardMaxMatchCount: -1,
|
||||
MaxWallTime: 0, // zoekt expects a duration in nanoseconds
|
||||
}
|
||||
});
|
||||
|
||||
let header: Record<string, string> = {};
|
||||
header = {
|
||||
"X-Tenant-ID": org.id.toString()
|
||||
};
|
||||
|
||||
const { data: searchResponse, durationMs: fetchDurationMs } = await measure(
|
||||
() => zoektFetch({
|
||||
path: "/api/search",
|
||||
body,
|
||||
header,
|
||||
method: "POST",
|
||||
}),
|
||||
"zoekt_fetch",
|
||||
false
|
||||
);
|
||||
|
||||
if (!searchResponse.ok) {
|
||||
return invalidZoektResponse(searchResponse);
|
||||
}
|
||||
|
||||
const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse): Promise<SearchResponse> => {
|
||||
// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field
|
||||
// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository
|
||||
// metadata when transforming (potentially thousands) of file matches, we aggregate a unique
|
||||
// set of repository ids* and map them to their corresponding Repo record.
|
||||
//
|
||||
// *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`?
|
||||
// A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was
|
||||
// always undefined. To make this a non-breaking change, we fallback to using the repository's name
|
||||
// (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in
|
||||
// practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo',
|
||||
// 'gitea.com/org/repo', etc.).
|
||||
//
|
||||
// Note: When a repository is re-indexed (every hour) this ID will be populated.
|
||||
// @see: https://github.com/sourcebot-dev/zoekt/pull/6
|
||||
const repoIdentifiers = new Set(Result.Files?.map((file) => file.RepositoryID ?? file.Repository) ?? []);
|
||||
const repos = new Map<string | number, Repo>();
|
||||
|
||||
(await prisma.repo.findMany({
|
||||
where: {
|
||||
id: {
|
||||
in: Array.from(repoIdentifiers).filter((id) => typeof id === "number"),
|
||||
},
|
||||
orgId: org.id,
|
||||
}
|
||||
})).forEach(repo => repos.set(repo.id, repo));
|
||||
|
||||
(await prisma.repo.findMany({
|
||||
where: {
|
||||
name: {
|
||||
in: Array.from(repoIdentifiers).filter((id) => typeof id === "string"),
|
||||
},
|
||||
orgId: org.id,
|
||||
}
|
||||
})).forEach(repo => repos.set(repo.name, repo));
|
||||
|
||||
const files = Result.Files?.map((file) => {
|
||||
const fileNameChunks = file.ChunkMatches.filter((chunk) => chunk.FileName);
|
||||
|
||||
const webUrl = (() => {
|
||||
const template: string | undefined = Result.RepoURLs[file.Repository];
|
||||
if (!template) {
|
||||
return undefined;
|
||||
const zoektSearchRequest: ZoektGrpcSearchRequest = {
|
||||
query: {
|
||||
and: {
|
||||
children: [
|
||||
zoektQuery,
|
||||
// @todo: handle branch filtering.
|
||||
{
|
||||
branch: {
|
||||
pattern: 'HEAD',
|
||||
exact: true,
|
||||
}
|
||||
}
|
||||
|
||||
// If there are multiple branches pointing to the same revision of this file, it doesn't
|
||||
// matter which branch we use here, so use the first one.
|
||||
const branch = file.Branches && file.Branches.length > 0 ? file.Branches[0] : "HEAD";
|
||||
return getFileWebUrl(template, branch, file.FileName);
|
||||
})();
|
||||
|
||||
const identifier = file.RepositoryID ?? file.Repository;
|
||||
const repo = repos.get(identifier);
|
||||
|
||||
// This can happen if the user doesn't have access to the repository.
|
||||
if (!repo) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return {
|
||||
fileName: {
|
||||
text: file.FileName,
|
||||
matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].Ranges.map((range) => ({
|
||||
start: {
|
||||
byteOffset: range.Start.ByteOffset,
|
||||
column: range.Start.Column,
|
||||
lineNumber: range.Start.LineNumber,
|
||||
},
|
||||
end: {
|
||||
byteOffset: range.End.ByteOffset,
|
||||
column: range.End.Column,
|
||||
lineNumber: range.End.LineNumber,
|
||||
}
|
||||
})) : [],
|
||||
},
|
||||
repository: repo.name,
|
||||
repositoryId: repo.id,
|
||||
webUrl: webUrl,
|
||||
language: file.Language,
|
||||
chunks: file.ChunkMatches
|
||||
.filter((chunk) => !chunk.FileName) // Filter out filename chunks.
|
||||
.map((chunk) => {
|
||||
return {
|
||||
content: base64Decode(chunk.Content),
|
||||
matchRanges: chunk.Ranges.map((range) => ({
|
||||
start: {
|
||||
byteOffset: range.Start.ByteOffset,
|
||||
column: range.Start.Column,
|
||||
lineNumber: range.Start.LineNumber,
|
||||
},
|
||||
end: {
|
||||
byteOffset: range.End.ByteOffset,
|
||||
column: range.End.Column,
|
||||
lineNumber: range.End.LineNumber,
|
||||
}
|
||||
}) satisfies SourceRange),
|
||||
contentStart: {
|
||||
byteOffset: chunk.ContentStart.ByteOffset,
|
||||
column: chunk.ContentStart.Column,
|
||||
lineNumber: chunk.ContentStart.LineNumber,
|
||||
},
|
||||
symbols: chunk.SymbolInfo?.map((symbol) => {
|
||||
return {
|
||||
symbol: symbol.Sym,
|
||||
kind: symbol.Kind,
|
||||
parent: symbol.Parent.length > 0 ? {
|
||||
symbol: symbol.Parent,
|
||||
kind: symbol.ParentKind,
|
||||
} : undefined,
|
||||
}
|
||||
}) ?? undefined,
|
||||
}
|
||||
}),
|
||||
branches: file.Branches,
|
||||
content: file.Content ? base64Decode(file.Content) : undefined,
|
||||
}
|
||||
}).filter((file) => file !== undefined) ?? [];
|
||||
|
||||
const actualMatchCount = files.reduce(
|
||||
(acc, file) =>
|
||||
// Match count is the sum of the number of chunk matches and file name matches.
|
||||
acc + file.chunks.reduce(
|
||||
(acc, chunk) => acc + chunk.matchRanges.length,
|
||||
0,
|
||||
) + file.fileName.matchRanges.length,
|
||||
0,
|
||||
);
|
||||
|
||||
const totalMatchCount = Result.MatchCount;
|
||||
const isSearchExhaustive = totalMatchCount <= actualMatchCount;
|
||||
|
||||
return {
|
||||
files,
|
||||
repositoryInfo: Array.from(repos.values()).map((repo) => ({
|
||||
id: repo.id,
|
||||
codeHostType: repo.external_codeHostType,
|
||||
name: repo.name,
|
||||
displayName: repo.displayName ?? undefined,
|
||||
webUrl: repo.webUrl ?? undefined,
|
||||
})),
|
||||
isBranchFilteringEnabled,
|
||||
isSearchExhaustive,
|
||||
stats: {
|
||||
actualMatchCount,
|
||||
totalMatchCount,
|
||||
duration: Result.Duration,
|
||||
fileCount: Result.FileCount,
|
||||
filesSkipped: Result.FilesSkipped,
|
||||
contentBytesLoaded: Result.ContentBytesLoaded,
|
||||
indexBytesLoaded: Result.IndexBytesLoaded,
|
||||
crashes: Result.Crashes,
|
||||
shardFilesConsidered: Result.ShardFilesConsidered,
|
||||
filesConsidered: Result.FilesConsidered,
|
||||
filesLoaded: Result.FilesLoaded,
|
||||
shardsScanned: Result.ShardsScanned,
|
||||
shardsSkipped: Result.ShardsSkipped,
|
||||
shardsSkippedFilter: Result.ShardsSkippedFilter,
|
||||
ngramMatches: Result.NgramMatches,
|
||||
ngramLookups: Result.NgramLookups,
|
||||
wait: Result.Wait,
|
||||
matchTreeConstruction: Result.MatchTreeConstruction,
|
||||
matchTreeSearch: Result.MatchTreeSearch,
|
||||
regexpsConsidered: Result.RegexpsConsidered,
|
||||
flushReason: Result.FlushReason,
|
||||
}
|
||||
} satisfies SearchResponse;
|
||||
}
|
||||
|
||||
const { data: rawZoektResponse, durationMs: parseJsonDurationMs } = await measure(
|
||||
() => searchResponse.json(),
|
||||
"parse_json",
|
||||
false
|
||||
);
|
||||
|
||||
// @note: We do not use zod parseAsync here since in cases where the
|
||||
// response is large (> 40MB), there can be significant performance issues.
|
||||
const zoektResponse = rawZoektResponse as ZoektSearchResponse;
|
||||
|
||||
const { data: response, durationMs: transformZoektResponseDurationMs } = await measure(
|
||||
() => transformZoektSearchResponse(zoektResponse),
|
||||
"transform_zoekt_response",
|
||||
false
|
||||
);
|
||||
|
||||
const totalDurationMs = fetchDurationMs + parseJsonDurationMs + transformZoektResponseDurationMs;
|
||||
|
||||
// Debug log: timing breakdown
|
||||
const timings = [
|
||||
{ name: "zoekt_fetch", duration: fetchDurationMs },
|
||||
{ name: "parse_json", duration: parseJsonDurationMs },
|
||||
{ name: "transform_zoekt_response", duration: transformZoektResponseDurationMs },
|
||||
];
|
||||
|
||||
logger.debug(`Search timing breakdown (query: "${query}"):`);
|
||||
timings.forEach(({ name, duration }) => {
|
||||
const percentage = ((duration / totalDurationMs) * 100).toFixed(1);
|
||||
const durationStr = duration.toFixed(2).padStart(8);
|
||||
const percentageStr = percentage.padStart(5);
|
||||
logger.debug(` ${name.padEnd(25)} ${durationStr}ms (${percentageStr}%)`);
|
||||
});
|
||||
logger.debug(` ${"TOTAL".padEnd(25)} ${totalDurationMs.toFixed(2).padStart(8)}ms (100.0%)`);
|
||||
|
||||
return {
|
||||
...response,
|
||||
__debug_timings: {
|
||||
zoekt_fetch: fetchDurationMs,
|
||||
parse_json: parseJsonDurationMs,
|
||||
transform_zoekt_response: transformZoektResponseDurationMs,
|
||||
]
|
||||
}
|
||||
} satisfies SearchResponse;
|
||||
}));
|
||||
},
|
||||
opts: {
|
||||
chunk_matches: true,
|
||||
// @note: Zoekt has several different ways to limit a given search. The two that
|
||||
// we care about are `MaxMatchDisplayCount` and `TotalMaxMatchCount`:
|
||||
// - `MaxMatchDisplayCount` truncates the number of matches AFTER performing
|
||||
// a search (specifically, after collating and sorting the results). The number of
|
||||
// results returned by the API will be less than or equal to this value.
|
||||
//
|
||||
// - `TotalMaxMatchCount` truncates the number of matches DURING a search. The results
|
||||
// returned by the API the API can be less than, equal to, or greater than this value.
|
||||
// Why greater? Because this value is compared _after_ a given shard has finished
|
||||
// being processed, the number of matches returned by the last shard may have exceeded
|
||||
// this value.
|
||||
//
|
||||
// Let's define two variables:
|
||||
// - `actualMatchCount` : The number of matches that are returned by the API. This is
|
||||
// always less than or equal to `MaxMatchDisplayCount`.
|
||||
// - `totalMatchCount` : The number of matches that zoekt found before it either
|
||||
// 1) found all matches or 2) hit the `TotalMaxMatchCount` limit. This number is
|
||||
// not bounded and can be less than, equal to, or greater than both `TotalMaxMatchCount`
|
||||
// and `MaxMatchDisplayCount`.
|
||||
//
|
||||
//
|
||||
// Our challenge is to determine whether or not the search returned all possible matches/
|
||||
// (it was exaustive) or if it was truncated. By setting the `TotalMaxMatchCount` to
|
||||
// `MaxMatchDisplayCount + 1`, we can determine which of these occurred by comparing
|
||||
// `totalMatchCount` to `MaxMatchDisplayCount`.
|
||||
//
|
||||
// if (totalMatchCount ≤ actualMatchCount):
|
||||
// Search is EXHAUSTIVE (found all possible matches)
|
||||
// Proof: totalMatchCount ≤ MaxMatchDisplayCount < TotalMaxMatchCount
|
||||
// Therefore Zoekt stopped naturally, not due to limit
|
||||
//
|
||||
// if (totalMatchCount > actualMatchCount):
|
||||
// Search is TRUNCATED (more matches exist)
|
||||
// Proof: totalMatchCount > MaxMatchDisplayCount + 1 = TotalMaxMatchCount
|
||||
// Therefore Zoekt hit the limit and stopped searching
|
||||
//
|
||||
max_match_display_count: searchRequest.matches,
|
||||
total_max_match_count: searchRequest.matches + 1,
|
||||
num_context_lines: searchRequest.contextLines ?? 0,
|
||||
whole: !!searchRequest.whole,
|
||||
shard_max_match_count: -1,
|
||||
max_wall_time: {
|
||||
seconds: 0,
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
return zoektSearchRequest;
|
||||
}
|
||||
|
||||
const createGrpcClient = (): WebserverServiceClient => {
|
||||
// Path to proto files - these should match your monorepo structure
|
||||
const protoBasePath = path.join(process.cwd(), '../../vendor/zoekt/grpc/protos');
|
||||
const protoPath = path.join(protoBasePath, 'zoekt/webserver/v1/webserver.proto');
|
||||
|
||||
const packageDefinition = protoLoader.loadSync(protoPath, {
|
||||
keepCase: true,
|
||||
longs: Number,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true,
|
||||
includeDirs: [protoBasePath],
|
||||
});
|
||||
|
||||
const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType;
|
||||
|
||||
// Extract host and port from ZOEKT_WEBSERVER_URL
|
||||
const zoektUrl = new URL(env.ZOEKT_WEBSERVER_URL);
|
||||
const grpcAddress = `${zoektUrl.hostname}:${zoektUrl.port}`;
|
||||
|
||||
return new proto.zoekt.webserver.v1.WebserverService(
|
||||
grpcAddress,
|
||||
grpc.credentials.createInsecure(),
|
||||
{
|
||||
'grpc.max_receive_message_length': 500 * 1024 * 1024, // 500MB
|
||||
'grpc.max_send_message_length': 500 * 1024 * 1024, // 500MB
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
const accumulateStats = (a: SearchStats, b: SearchStats): SearchStats => {
|
||||
return {
|
||||
actualMatchCount: a.actualMatchCount + b.actualMatchCount,
|
||||
totalMatchCount: a.totalMatchCount + b.totalMatchCount,
|
||||
duration: a.duration + b.duration,
|
||||
fileCount: a.fileCount + b.fileCount,
|
||||
filesSkipped: a.filesSkipped + b.filesSkipped,
|
||||
contentBytesLoaded: a.contentBytesLoaded + b.contentBytesLoaded,
|
||||
indexBytesLoaded: a.indexBytesLoaded + b.indexBytesLoaded,
|
||||
crashes: a.crashes + b.crashes,
|
||||
shardFilesConsidered: a.shardFilesConsidered + b.shardFilesConsidered,
|
||||
filesConsidered: a.filesConsidered + b.filesConsidered,
|
||||
filesLoaded: a.filesLoaded + b.filesLoaded,
|
||||
shardsScanned: a.shardsScanned + b.shardsScanned,
|
||||
shardsSkipped: a.shardsSkipped + b.shardsSkipped,
|
||||
shardsSkippedFilter: a.shardsSkippedFilter + b.shardsSkippedFilter,
|
||||
ngramMatches: a.ngramMatches + b.ngramMatches,
|
||||
ngramLookups: a.ngramLookups + b.ngramLookups,
|
||||
wait: a.wait + b.wait,
|
||||
matchTreeConstruction: a.matchTreeConstruction + b.matchTreeConstruction,
|
||||
matchTreeSearch: a.matchTreeSearch + b.matchTreeSearch,
|
||||
regexpsConsidered: a.regexpsConsidered + b.regexpsConsidered,
|
||||
// Capture the first non-unknown flush reason.
|
||||
...(a.flushReason === ZoektFlushReason.FLUSH_REASON_UNKNOWN_UNSPECIFIED ? {
|
||||
flushReason: b.flushReason
|
||||
} : {
|
||||
flushReason: a.flushReason,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ export const searchStatsSchema = z.object({
|
|||
matchTreeConstruction: z.number(), // Aggregate wall clock time spent constructing and pruning the match tree. This accounts for time such as lookups in the trigram index.
|
||||
matchTreeSearch: z.number(), // Aggregate wall clock time spent searching the match tree. This accounts for the bulk of search work done looking for matches.
|
||||
regexpsConsidered: z.number(), // Number of times regexp was called on files that we evaluated.
|
||||
flushReason: z.number(), // FlushReason explains why results were flushed.
|
||||
flushReason: z.string(), // FlushReason explains why results were flushed.
|
||||
});
|
||||
export type SearchStats = z.infer<typeof searchStatsSchema>;
|
||||
|
||||
|
|
@ -96,9 +96,7 @@ export const searchResponseSchema = z.object({
|
|||
stats: searchStatsSchema,
|
||||
files: z.array(searchFileSchema),
|
||||
repositoryInfo: z.array(repositoryInfoSchema),
|
||||
isBranchFilteringEnabled: z.boolean(),
|
||||
isSearchExhaustive: z.boolean(),
|
||||
__debug_timings: z.record(z.string(), z.number()).optional(),
|
||||
});
|
||||
export type SearchResponse = z.infer<typeof searchResponseSchema>;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,34 +0,0 @@
|
|||
import { env } from "@sourcebot/shared";
|
||||
|
||||
interface ZoektRequest {
|
||||
path: string,
|
||||
body: string,
|
||||
method: string,
|
||||
header?: Record<string, string>,
|
||||
cache?: RequestCache,
|
||||
}
|
||||
|
||||
export const zoektFetch = async ({
|
||||
path,
|
||||
body,
|
||||
method,
|
||||
header,
|
||||
cache,
|
||||
}: ZoektRequest) => {
|
||||
const response = await fetch(
|
||||
new URL(path, env.ZOEKT_WEBSERVER_URL),
|
||||
{
|
||||
method,
|
||||
headers: {
|
||||
...header,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body,
|
||||
cache,
|
||||
}
|
||||
);
|
||||
|
||||
// @todo : add metrics
|
||||
|
||||
return response;
|
||||
}
|
||||
|
|
@ -1,135 +0,0 @@
|
|||
|
||||
import { z } from "zod";
|
||||
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/main/api.go#L212
|
||||
export const zoektLocationSchema = z.object({
|
||||
// 0-based byte offset from the beginning of the file
|
||||
ByteOffset: z.number(),
|
||||
// 1-based line number from the beginning of the file
|
||||
LineNumber: z.number(),
|
||||
// 1-based column number (in runes) from the beginning of line
|
||||
Column: z.number(),
|
||||
});
|
||||
|
||||
export const zoektRangeSchema = z.object({
|
||||
Start: zoektLocationSchema,
|
||||
End: zoektLocationSchema,
|
||||
});
|
||||
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L350
|
||||
export const zoektSearchResponseStats = {
|
||||
ContentBytesLoaded: z.number(),
|
||||
IndexBytesLoaded: z.number(),
|
||||
Crashes: z.number(),
|
||||
Duration: z.number(),
|
||||
FileCount: z.number(),
|
||||
ShardFilesConsidered: z.number(),
|
||||
FilesConsidered: z.number(),
|
||||
FilesLoaded: z.number(),
|
||||
FilesSkipped: z.number(),
|
||||
ShardsScanned: z.number(),
|
||||
ShardsSkipped: z.number(),
|
||||
ShardsSkippedFilter: z.number(),
|
||||
MatchCount: z.number(),
|
||||
NgramMatches: z.number(),
|
||||
NgramLookups: z.number(),
|
||||
Wait: z.number(),
|
||||
MatchTreeConstruction: z.number(),
|
||||
MatchTreeSearch: z.number(),
|
||||
RegexpsConsidered: z.number(),
|
||||
FlushReason: z.number(),
|
||||
}
|
||||
|
||||
export const zoektSymbolSchema = z.object({
|
||||
Sym: z.string(),
|
||||
Kind: z.string(),
|
||||
Parent: z.string(),
|
||||
ParentKind: z.string(),
|
||||
});
|
||||
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L497
|
||||
export const zoektSearchResponseSchema = z.object({
|
||||
Result: z.object({
|
||||
...zoektSearchResponseStats,
|
||||
Files: z.array(z.object({
|
||||
FileName: z.string(),
|
||||
Repository: z.string(),
|
||||
RepositoryID: z.number().optional(),
|
||||
Version: z.string().optional(),
|
||||
Language: z.string(),
|
||||
Branches: z.array(z.string()).optional(),
|
||||
ChunkMatches: z.array(z.object({
|
||||
Content: z.string(),
|
||||
Ranges: z.array(zoektRangeSchema),
|
||||
FileName: z.boolean(),
|
||||
ContentStart: zoektLocationSchema,
|
||||
Score: z.number(),
|
||||
SymbolInfo: z.array(zoektSymbolSchema).nullable(),
|
||||
})),
|
||||
Checksum: z.string(),
|
||||
Score: z.number(),
|
||||
// Set if `whole` is true.
|
||||
Content: z.string().optional(),
|
||||
})).nullable(),
|
||||
RepoURLs: z.record(z.string(), z.string()),
|
||||
}),
|
||||
});
|
||||
|
||||
export type ZoektSearchResponse = z.infer<typeof zoektSearchResponseSchema>;
|
||||
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L728
|
||||
const zoektRepoStatsSchema = z.object({
|
||||
Repos: z.number(),
|
||||
Shards: z.number(),
|
||||
Documents: z.number(),
|
||||
IndexBytes: z.number(),
|
||||
ContentBytes: z.number(),
|
||||
NewLinesCount: z.number(),
|
||||
DefaultBranchNewLinesCount: z.number(),
|
||||
OtherBranchesNewLinesCount: z.number(),
|
||||
});
|
||||
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L716
|
||||
const zoektIndexMetadataSchema = z.object({
|
||||
IndexFormatVersion: z.number(),
|
||||
IndexFeatureVersion: z.number(),
|
||||
IndexMinReaderVersion: z.number(),
|
||||
IndexTime: z.string(),
|
||||
PlainASCII: z.boolean(),
|
||||
LanguageMap: z.record(z.string(), z.number()),
|
||||
ZoektVersion: z.string(),
|
||||
ID: z.string(),
|
||||
});
|
||||
|
||||
|
||||
// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L555
|
||||
export const zoektRepositorySchema = z.object({
|
||||
Name: z.string(),
|
||||
URL: z.string(),
|
||||
Source: z.string(),
|
||||
Branches: z.array(z.object({
|
||||
Name: z.string(),
|
||||
Version: z.string(),
|
||||
})).nullable(),
|
||||
CommitURLTemplate: z.string(),
|
||||
FileURLTemplate: z.string(),
|
||||
LineFragmentTemplate: z.string(),
|
||||
RawConfig: z.record(z.string(), z.string()).nullable(),
|
||||
Rank: z.number(),
|
||||
IndexOptions: z.string(),
|
||||
HasSymbols: z.boolean(),
|
||||
Tombstone: z.boolean(),
|
||||
LatestCommitDate: z.string(),
|
||||
FileTombstones: z.string().optional(),
|
||||
});
|
||||
|
||||
export const zoektListRepositoriesResponseSchema = z.object({
|
||||
List: z.object({
|
||||
Repos: z.array(z.object({
|
||||
Repository: zoektRepositorySchema,
|
||||
IndexMetadata: zoektIndexMetadataSchema,
|
||||
Stats: zoektRepoStatsSchema,
|
||||
})),
|
||||
Stats: zoektRepoStatsSchema,
|
||||
})
|
||||
});
|
||||
|
|
@ -25,7 +25,7 @@ export type PosthogEventMap = {
|
|||
matchTreeConstruction: number,
|
||||
matchTreeSearch: number,
|
||||
regexpsConsidered: number,
|
||||
flushReason: number,
|
||||
flushReason: string,
|
||||
fileLanguages: string[],
|
||||
isSearchExhaustive: boolean
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in a new issue