From 898c9097db5c4d06c95fb85876cfb801eab5e007 Mon Sep 17 00:00:00 2001 From: bkellam Date: Wed, 19 Nov 2025 15:40:28 -0800 Subject: [PATCH] migrate blocking search over to grpc. Centralize everything in searchApi --- .../app/api/(server)/stream_search/route.ts | 546 +--------- packages/web/src/features/chat/tools.ts | 4 +- packages/web/src/features/codeNav/api.ts | 6 +- .../web/src/features/search/fileSourceApi.ts | 10 +- .../search/query.ts} | 29 +- packages/web/src/features/search/searchApi.ts | 949 ++++++++++-------- packages/web/src/features/search/types.ts | 4 +- .../web/src/features/search/zoektClient.ts | 34 - .../web/src/features/search/zoektSchema.ts | 135 --- packages/web/src/lib/posthogEvents.ts | 2 +- 10 files changed, 601 insertions(+), 1118 deletions(-) rename packages/web/src/{app/api/(server)/stream_search/transformer.ts => features/search/query.ts} (94%) delete mode 100644 packages/web/src/features/search/zoektClient.ts delete mode 100644 packages/web/src/features/search/zoektSchema.ts diff --git a/packages/web/src/app/api/(server)/stream_search/route.ts b/packages/web/src/app/api/(server)/stream_search/route.ts index 5331957b..2e09b078 100644 --- a/packages/web/src/app/api/(server)/stream_search/route.ts +++ b/packages/web/src/app/api/(server)/stream_search/route.ts @@ -1,540 +1,30 @@ 'use server'; -import { searchRequestSchema, SearchStats, SourceRange, StreamedSearchResponse } from '@/features/search/types'; -import { SINGLE_TENANT_ORG_ID } from '@/lib/constants'; +import { streamSearch } from '@/features/search/searchApi'; +import { searchRequestSchema } from '@/features/search/types'; import { schemaValidationError, serviceErrorResponse } from '@/lib/serviceError'; -import { prisma } from '@/prisma'; -import type { ProtoGrpcType } from '@/proto/webserver'; -import { FileMatch__Output } from '@/proto/zoekt/webserver/v1/FileMatch'; -import { Range__Output } from '@/proto/zoekt/webserver/v1/Range'; -import type { SearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest'; -import { SearchResponse__Output } from '@/proto/zoekt/webserver/v1/SearchResponse'; -import type { StreamSearchRequest } from '@/proto/zoekt/webserver/v1/StreamSearchRequest'; -import type { StreamSearchResponse__Output } from '@/proto/zoekt/webserver/v1/StreamSearchResponse'; -import type { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService'; -import * as grpc from '@grpc/grpc-js'; -import * as protoLoader from '@grpc/proto-loader'; -import * as Sentry from '@sentry/nextjs'; -import { PrismaClient, Repo } from '@sourcebot/db'; -import { parser as _parser } from '@sourcebot/query-language'; -import { createLogger, env } from '@sourcebot/shared'; +import { isServiceError } from '@/lib/utils'; import { NextRequest } from 'next/server'; -import * as path from 'path'; -import { transformToZoektQuery } from './transformer'; -const logger = createLogger('streamSearchApi'); - -/** - * Create a gRPC client for the Zoekt webserver - */ -function createGrpcClient(): WebserverServiceClient { - // Path to proto files - these should match your monorepo structure - const protoBasePath = path.join(process.cwd(), '../../vendor/zoekt/grpc/protos'); - const protoPath = path.join(protoBasePath, 'zoekt/webserver/v1/webserver.proto'); - - const packageDefinition = protoLoader.loadSync(protoPath, { - keepCase: true, - longs: Number, - enums: String, - defaults: true, - oneofs: true, - includeDirs: [protoBasePath], - }); - - const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; - - // Extract host and port from ZOEKT_WEBSERVER_URL - const zoektUrl = new URL(env.ZOEKT_WEBSERVER_URL); - const grpcAddress = `${zoektUrl.hostname}:${zoektUrl.port}`; - - return new proto.zoekt.webserver.v1.WebserverService( - grpcAddress, - grpc.credentials.createInsecure(), - { - 'grpc.max_receive_message_length': 500 * 1024 * 1024, // 500MB - 'grpc.max_send_message_length': 500 * 1024 * 1024, // 500MB - } - ); -} - -/** - * POST handler for streaming search via SSE - */ export const POST = async (request: NextRequest) => { - try { - // Parse and validate request body - const body = await request.json(); - const parsed = await searchRequestSchema.safeParseAsync(body); + const body = await request.json(); + const parsed = await searchRequestSchema.safeParseAsync(body); - if (!parsed.success) { - return serviceErrorResponse(schemaValidationError(parsed.error)); - } - - const { - query, - matches, - contextLines, - whole, - isRegexEnabled = false, - isCaseSensitivityEnabled = false, - } = parsed.data; - - const parser = _parser.configure({ - strict: true, - }); - - const tree = parser.parse(query); - const zoektQuery = await transformToZoektQuery({ - tree, - input: query, - isCaseSensitivityEnabled, - isRegexEnabled, - onExpandSearchContext: async (contextName: string) => { - const context = await prisma.searchContext.findUnique({ - where: { - name_orgId: { - name: contextName, - orgId: SINGLE_TENANT_ORG_ID, - } - }, - include: { - repos: true, - } - }); - - if (!context) { - throw new Error(`Search context "${contextName}" not found`); - } - - return context.repos.map((repo) => repo.name); - }, - }); - - console.log(JSON.stringify(zoektQuery, null, 2)); - - const searchRequest: SearchRequest = { - query: { - and: { - children: [ - zoektQuery, - { - branch: { - pattern: 'HEAD', - exact: true, - } - } - ] - } - }, - opts: { - chunk_matches: true, - max_match_display_count: matches, - total_max_match_count: matches + 1, - num_context_lines: contextLines, - whole: !!whole, - shard_max_match_count: -1, - max_wall_time: { - seconds: 0, - } - }, - }; - - // @nocheckin: this should be using the `prisma` instance from the auth context. - const stream = await createSSESearchStream(searchRequest, prisma); - - - // Return streaming response with SSE headers - return new Response(stream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache, no-transform', - 'Connection': 'keep-alive', - 'X-Accel-Buffering': 'no', // Disable nginx buffering if applicable - }, - }); - } catch (error) { - console.error('Request handling error:', error); - return new Response( - JSON.stringify({ - error: { - message: error instanceof Error ? error.message : 'Internal server error' - } - }), - { - status: 500, - headers: { 'Content-Type': 'application/json' }, - } - ); + if (!parsed.success) { + return serviceErrorResponse(schemaValidationError(parsed.error)); } -}; -const createSSESearchStream = async (searchRequest: SearchRequest, prisma: PrismaClient): Promise => { - const client = createGrpcClient(); - let grpcStream: ReturnType | null = null; - let isStreamActive = true; - let pendingChunks = 0; - let accumulatedStats: SearchStats = { - actualMatchCount: 0, - totalMatchCount: 0, - duration: 0, - fileCount: 0, - filesSkipped: 0, - contentBytesLoaded: 0, - indexBytesLoaded: 0, - crashes: 0, - shardFilesConsidered: 0, - filesConsidered: 0, - filesLoaded: 0, - shardsScanned: 0, - shardsSkipped: 0, - shardsSkippedFilter: 0, - ngramMatches: 0, - ngramLookups: 0, - wait: 0, - matchTreeConstruction: 0, - matchTreeSearch: 0, - regexpsConsidered: 0, - flushReason: 0, - }; + const stream = await streamSearch(parsed.data); + if (isServiceError(stream)) { + return serviceErrorResponse(stream); + } - return new ReadableStream({ - async start(controller) { - const tryCloseController = () => { - if (!isStreamActive && pendingChunks === 0) { - const finalResponse: StreamedSearchResponse = { - type: 'final', - accumulatedStats, - isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount, - } - - controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finalResponse)}\n\n`)); - controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); - controller.close(); - client.close(); - logger.debug('SSE stream closed'); - } - }; - - try { - const metadata = new grpc.Metadata(); - - const streamRequest: StreamSearchRequest = { - request: searchRequest, - }; - - grpcStream = client.StreamSearch(streamRequest, metadata); - - // @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field - // which corresponds to the `id` in the Repo table. In order to efficiently fetch repository - // metadata when transforming (potentially thousands) of file matches, we aggregate a unique - // set of repository ids* and map them to their corresponding Repo record. - // - // *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`? - // A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was - // always undefined. To make this a non-breaking change, we fallback to using the repository's name - // (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in - // practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo', - // 'gitea.com/org/repo', etc.). - // - // Note: When a repository is re-indexed (every hour) this ID will be populated. - // @see: https://github.com/sourcebot-dev/zoekt/pull/6 - const getRepoIdForFile = (file: FileMatch__Output): string | number => { - return file.repository_id ?? file.repository; - } - - // `_reposMapCache` is used to cache repository metadata across all chunks. - // This reduces the number of database queries required to transform file matches. - const _reposMapCache = new Map(); - - // Creates a mapping between all repository ids in a given response - // chunk. The mapping allows us to efficiently lookup repository metadata. - const createReposMapForChunk = async (chunk: SearchResponse__Output): Promise> => { - const reposMap = new Map(); - await Promise.all(chunk.files.map(async (file) => { - const id = getRepoIdForFile(file); - - const repo = await (async () => { - // If it's in the cache, return the cached value. - if (_reposMapCache.has(id)) { - return _reposMapCache.get(id); - } - - // Otherwise, query the database for the record. - const repo = typeof id === 'number' ? - await prisma.repo.findUnique({ - where: { - id: id, - }, - }) : - await prisma.repo.findFirst({ - where: { - name: id, - }, - }); - - // If a repository is found, cache it for future lookups. - if (repo) { - _reposMapCache.set(id, repo); - } - - return repo; - })(); - - // Only add the repository to the map if it was found. - if (repo) { - reposMap.set(id, repo); - } - })); - - return reposMap; - } - - // Handle incoming data chunks - grpcStream.on('data', async (chunk: StreamSearchResponse__Output) => { - if (!isStreamActive) { - logger.debug('SSE stream closed, skipping chunk'); - return; - } - - // Track that we're processing a chunk - pendingChunks++; - - // grpcStream.on doesn't actually await on our handler, so we need to - // explicitly pause the stream here to prevent the stream from completing - // prior to our asynchronous work being completed. - grpcStream?.pause(); - - try { - if (!chunk.response_chunk) { - logger.warn('No response chunk received'); - return; - } - - const repoIdToRepoDBRecordMap = await createReposMapForChunk(chunk.response_chunk); - - const files = chunk.response_chunk.files.map((file) => { - const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name); - const repoId = getRepoIdForFile(file); - const repo = repoIdToRepoDBRecordMap.get(repoId); - - // This can happen if the user doesn't have access to the repository. - if (!repo) { - return undefined; - } - - // @todo: address "file_name might not be a valid UTF-8 string" warning. - const fileName = file.file_name.toString('utf-8'); - - const convertRange = (range: Range__Output): SourceRange => ({ - start: { - byteOffset: range.start?.byte_offset ?? 0, - column: range.start?.column ?? 0, - lineNumber: range.start?.line_number ?? 0, - }, - end: { - byteOffset: range.end?.byte_offset ?? 0, - column: range.end?.column ?? 0, - lineNumber: range.end?.line_number ?? 0, - } - }) - - return { - fileName: { - text: fileName, - matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [], - }, - repository: repo.name, - repositoryId: repo.id, - language: file.language, - // @todo: we will need to have a mechanism of forming the file's web url. - webUrl: '', - chunks: file.chunk_matches - .filter((chunk) => !chunk.file_name) // filter out filename chunks. - .map((chunk) => { - return { - content: chunk.content.toString('utf-8'), - matchRanges: chunk.ranges.map(convertRange), - contentStart: chunk.content_start ? { - byteOffset: chunk.content_start.byte_offset, - column: chunk.content_start.column, - lineNumber: chunk.content_start.line_number, - } : { - byteOffset: 0, - column: 0, - lineNumber: 0, - }, - symbols: chunk.symbol_info.map((symbol) => { - return { - symbol: symbol.sym, - kind: symbol.kind, - parent: symbol.parent ? { - symbol: symbol.parent, - kind: symbol.parent_kind, - } : undefined, - } - }) - } - }), - branches: file.branches, - content: file.content ? file.content.toString('utf-8') : undefined, - } - }).filter(file => file !== undefined); - - const actualMatchCount = files.reduce( - (acc, file) => - // Match count is the sum of the number of chunk matches and file name matches. - acc + file.chunks.reduce( - (acc, chunk) => acc + chunk.matchRanges.length, - 0, - ) + file.fileName.matchRanges.length, - 0, - ); - - const stats: SearchStats = { - actualMatchCount, - totalMatchCount: chunk.response_chunk.stats?.match_count ?? 0, - duration: chunk.response_chunk.stats?.duration?.nanos ?? 0, - fileCount: chunk.response_chunk.stats?.file_count ?? 0, - filesSkipped: chunk.response_chunk.stats?.files_skipped ?? 0, - contentBytesLoaded: chunk.response_chunk.stats?.content_bytes_loaded ?? 0, - indexBytesLoaded: chunk.response_chunk.stats?.index_bytes_loaded ?? 0, - crashes: chunk.response_chunk.stats?.crashes ?? 0, - shardFilesConsidered: chunk.response_chunk.stats?.shard_files_considered ?? 0, - filesConsidered: chunk.response_chunk.stats?.files_considered ?? 0, - filesLoaded: chunk.response_chunk.stats?.files_loaded ?? 0, - shardsScanned: chunk.response_chunk.stats?.shards_scanned ?? 0, - shardsSkipped: chunk.response_chunk.stats?.shards_skipped ?? 0, - shardsSkippedFilter: chunk.response_chunk.stats?.shards_skipped_filter ?? 0, - ngramMatches: chunk.response_chunk.stats?.ngram_matches ?? 0, - ngramLookups: chunk.response_chunk.stats?.ngram_lookups ?? 0, - wait: chunk.response_chunk.stats?.wait?.nanos ?? 0, - matchTreeConstruction: chunk.response_chunk.stats?.match_tree_construction?.nanos ?? 0, - matchTreeSearch: chunk.response_chunk.stats?.match_tree_search?.nanos ?? 0, - regexpsConsidered: chunk.response_chunk.stats?.regexps_considered ?? 0, - // @todo: handle this. - // flushReason: chunk.response_chunk.stats?.flush_reason ?? 0, - flushReason: 0 - } - - accumulatedStats = accumulateStats(accumulatedStats, stats); - - const response: StreamedSearchResponse = { - type: 'chunk', - files, - repositoryInfo: Array.from(repoIdToRepoDBRecordMap.values()).map((repo) => ({ - id: repo.id, - codeHostType: repo.external_codeHostType, - name: repo.name, - displayName: repo.displayName ?? undefined, - webUrl: repo.webUrl ?? undefined, - })), - stats - } - - const sseData = `data: ${JSON.stringify(response)}\n\n`; - controller.enqueue(new TextEncoder().encode(sseData)); - } catch (error) { - console.error('Error encoding chunk:', error); - } finally { - pendingChunks--; - grpcStream?.resume(); - - // @note: we were hitting "Controller is already closed" errors when calling - // `controller.enqueue` above for the last chunk. The reasoning was the event - // handler for 'end' was being invoked prior to the completion of the last chunk, - // resulting in the controller being closed prematurely. The workaround was to - // keep track of the number of pending chunks and only close the controller - // when there are no more chunks to process. We need to explicitly call - // `tryCloseController` since there _seems_ to be no ordering guarantees between - // the 'end' event handler and this callback. - tryCloseController(); - } - }); - - // Handle stream completion - grpcStream.on('end', () => { - if (!isStreamActive) { - return; - } - isStreamActive = false; - tryCloseController(); - }); - - // Handle errors - grpcStream.on('error', (error: grpc.ServiceError) => { - logger.error('gRPC stream error:', error); - Sentry.captureException(error); - - if (!isStreamActive) { - return; - } - isStreamActive = false; - - // Send error as SSE event - const errorData = `data: ${JSON.stringify({ - error: { - code: error.code, - message: error.details || error.message, - } - })}\n\n`; - controller.enqueue(new TextEncoder().encode(errorData)); - - controller.close(); - client.close(); - }); - } catch (error) { - logger.error('Stream initialization error:', error); - - const errorMessage = error instanceof Error ? error.message : 'Unknown error'; - const errorData = `data: ${JSON.stringify({ - error: { message: errorMessage } - })}\n\n`; - controller.enqueue(new TextEncoder().encode(errorData)); - - controller.close(); - client.close(); - } + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', // Disable nginx buffering if applicable }, - cancel() { - logger.warn('SSE stream cancelled by client'); - isStreamActive = false; - - // Cancel the gRPC stream to stop receiving data - if (grpcStream) { - grpcStream.cancel(); - } - - client.close(); - } }); -} - -const accumulateStats = (a: SearchStats, b: SearchStats): SearchStats => { - return { - actualMatchCount: a.actualMatchCount + b.actualMatchCount, - totalMatchCount: a.totalMatchCount + b.totalMatchCount, - duration: a.duration + b.duration, - fileCount: a.fileCount + b.fileCount, - filesSkipped: a.filesSkipped + b.filesSkipped, - contentBytesLoaded: a.contentBytesLoaded + b.contentBytesLoaded, - indexBytesLoaded: a.indexBytesLoaded + b.indexBytesLoaded, - crashes: a.crashes + b.crashes, - shardFilesConsidered: a.shardFilesConsidered + b.shardFilesConsidered, - filesConsidered: a.filesConsidered + b.filesConsidered, - filesLoaded: a.filesLoaded + b.filesLoaded, - shardsScanned: a.shardsScanned + b.shardsScanned, - shardsSkipped: a.shardsSkipped + b.shardsSkipped, - shardsSkippedFilter: a.shardsSkippedFilter + b.shardsSkippedFilter, - ngramMatches: a.ngramMatches + b.ngramMatches, - ngramLookups: a.ngramLookups + b.ngramLookups, - wait: a.wait + b.wait, - matchTreeConstruction: a.matchTreeConstruction + b.matchTreeConstruction, - matchTreeSearch: a.matchTreeSearch + b.matchTreeSearch, - regexpsConsidered: a.regexpsConsidered + b.regexpsConsidered, - ...(a.flushReason === 0 ? { - flushReason: b.flushReason - } : { - flushReason: a.flushReason, - }), - } -} \ No newline at end of file +}; diff --git a/packages/web/src/features/chat/tools.ts b/packages/web/src/features/chat/tools.ts index ab2b2ee6..94e33e73 100644 --- a/packages/web/src/features/chat/tools.ts +++ b/packages/web/src/features/chat/tools.ts @@ -180,10 +180,10 @@ Multiple expressions can be or'd together with or, negated with -, or grouped wi const response = await search({ query, matches: limit ?? 100, - // @todo: we can make this configurable. contextLines: 3, whole: false, - // @todo(mt): handle multi-tenancy. + isCaseSensitivityEnabled: true, + isRegexEnabled: true, }); if (isServiceError(response)) { diff --git a/packages/web/src/features/codeNav/api.ts b/packages/web/src/features/codeNav/api.ts index d2912f82..ace85293 100644 --- a/packages/web/src/features/codeNav/api.ts +++ b/packages/web/src/features/codeNav/api.ts @@ -19,12 +19,14 @@ export const findSearchBasedSymbolReferences = async (props: FindRelatedSymbolsR revisionName = "HEAD", } = props; - const query = `\\b${symbolName}\\b rev:${revisionName} ${getExpandedLanguageFilter(language)} case:yes`; + const query = `\\b${symbolName}\\b rev:${revisionName} ${getExpandedLanguageFilter(language)}`; const searchResult = await search({ query, matches: MAX_REFERENCE_COUNT, contextLines: 0, + isCaseSensitivityEnabled: true, + isRegexEnabled: true, }); if (isServiceError(searchResult)) { @@ -49,6 +51,8 @@ export const findSearchBasedSymbolDefinitions = async (props: FindRelatedSymbols query, matches: MAX_REFERENCE_COUNT, contextLines: 0, + isCaseSensitivityEnabled: true, + isRegexEnabled: true, }); if (isServiceError(searchResult)) { diff --git a/packages/web/src/features/search/fileSourceApi.ts b/packages/web/src/features/search/fileSourceApi.ts index edc346aa..1cb887ec 100644 --- a/packages/web/src/features/search/fileSourceApi.ts +++ b/packages/web/src/features/search/fileSourceApi.ts @@ -1,5 +1,4 @@ import 'server-only'; -import escapeStringRegexp from "escape-string-regexp"; import { fileNotFound, ServiceError, unexpectedError } from "../../lib/serviceError"; import { FileSourceRequest, FileSourceResponse } from "./types"; import { isServiceError } from "../../lib/utils"; @@ -12,18 +11,17 @@ import { withOptionalAuthV2 } from "@/withAuthV2"; export const getFileSource = async ({ fileName, repository, branch }: FileSourceRequest): Promise => sew(() => withOptionalAuthV2(async () => { - const escapedFileName = escapeStringRegexp(fileName); - const escapedRepository = escapeStringRegexp(repository); - - let query = `file:${escapedFileName} repo:^${escapedRepository}$`; + let query = `file:${fileName} repo:^${repository}$`; if (branch) { - query = query.concat(` branch:${branch}`); + query = query.concat(` rev:${branch}`); } const searchResponse = await search({ query, matches: 1, whole: true, + isCaseSensitivityEnabled: true, + isRegexEnabled: true, }); if (isServiceError(searchResponse)) { diff --git a/packages/web/src/app/api/(server)/stream_search/transformer.ts b/packages/web/src/features/search/query.ts similarity index 94% rename from packages/web/src/app/api/(server)/stream_search/transformer.ts rename to packages/web/src/features/search/query.ts index 72a392a5..2c0a831f 100644 --- a/packages/web/src/app/api/(server)/stream_search/transformer.ts +++ b/packages/web/src/features/search/query.ts @@ -1,4 +1,4 @@ -import { Q } from '@/proto/zoekt/webserver/v1/Q'; +import { Q as ZoektGrpcQuery } from '@/proto/zoekt/webserver/v1/Q'; import { AndExpr, ArchivedExpr, @@ -21,6 +21,12 @@ import { Tree, VisibilityExpr, } from '@sourcebot/query-language'; +import { parser as _lezerQueryParser } from '@sourcebot/query-language'; + +const lezerQueryParser = _lezerQueryParser.configure({ + strict: true, +}); + type ArchivedValue = 'yes' | 'no' | 'only'; type VisibilityValue = 'public' | 'private' | 'any'; @@ -38,10 +44,11 @@ const isForkValue = (value: string): value is ForkValue => { return value === 'yes' || value === 'no' || value === 'only'; } -/** - * Transform a Lezer parse tree into a Zoekt gRPC query - */ -export const transformToZoektQuery = ({ +export const parseQueryIntoLezerTree = (query: string): Tree => { + return lezerQueryParser.parse(query); +} + +export const transformLezerTreeToZoektGrpcQuery = async ({ tree, input, isCaseSensitivityEnabled, @@ -53,9 +60,9 @@ export const transformToZoektQuery = ({ isCaseSensitivityEnabled: boolean; isRegexEnabled: boolean; onExpandSearchContext: (contextName: string) => Promise; -}): Promise => { +}): Promise => { - const transformNode = async (node: SyntaxNode): Promise => { + const transformNode = async (node: SyntaxNode): Promise => { switch (node.type.id) { case Program: { // Program wraps the actual query - transform its child @@ -134,7 +141,7 @@ export const transformToZoektQuery = ({ } } - const transformPrefixExpr = async (node: SyntaxNode): Promise => { + const transformPrefixExpr = async (node: SyntaxNode): Promise => { // Find which specific prefix type this is const prefixNode = node.firstChild; if (!prefixNode) { @@ -207,13 +214,13 @@ export const transformToZoektQuery = ({ return { symbol: { expr: { - substring: { - pattern: value, + regexp: { + regexp: value, case_sensitive: isCaseSensitivityEnabled, file_name: false, content: true }, - query: "substring" + query: "regexp" } }, query: "symbol" diff --git a/packages/web/src/features/search/searchApi.ts b/packages/web/src/features/search/searchApi.ts index ee703759..974be894 100644 --- a/packages/web/src/features/search/searchApi.ts +++ b/packages/web/src/features/search/searchApi.ts @@ -1,71 +1,444 @@ -import 'server-only'; import { sew } from "@/actions"; +import { SINGLE_TENANT_ORG_ID } from '@/lib/constants'; +import type { ProtoGrpcType } from '@/proto/webserver'; +import { FileMatch__Output as ZoektGrpcFileMatch } from "@/proto/zoekt/webserver/v1/FileMatch"; +import { Range__Output as ZoektGrpcRange } from "@/proto/zoekt/webserver/v1/Range"; +import type { SearchRequest as ZoektGrpcSearchRequest } from '@/proto/zoekt/webserver/v1/SearchRequest'; +import { SearchResponse__Output as ZoektGrpcSearchResponse } from "@/proto/zoekt/webserver/v1/SearchResponse"; +import { StreamSearchRequest as ZoektGrpcStreamSearchRequest } from "@/proto/zoekt/webserver/v1/StreamSearchRequest"; +import { StreamSearchResponse__Output as ZoektGrpcStreamSearchResponse } from "@/proto/zoekt/webserver/v1/StreamSearchResponse"; +import { WebserverServiceClient } from '@/proto/zoekt/webserver/v1/WebserverService'; import { withOptionalAuthV2 } from "@/withAuthV2"; +import * as grpc from '@grpc/grpc-js'; +import * as protoLoader from '@grpc/proto-loader'; +import * as Sentry from '@sentry/nextjs'; import { PrismaClient, Repo } from "@sourcebot/db"; -import { base64Decode, createLogger } from "@sourcebot/shared"; -import { StatusCodes } from "http-status-codes"; -import { ErrorCode } from "../../lib/errorCodes"; -import { invalidZoektResponse, ServiceError } from "../../lib/serviceError"; -import { isServiceError, measure } from "../../lib/utils"; -import { SearchRequest, SearchResponse, SourceRange } from "./types"; -import { zoektFetch } from "./zoektClient"; -import { ZoektSearchResponse } from "./zoektSchema"; +import { createLogger, env } from "@sourcebot/shared"; +import path from 'path'; +import { parseQueryIntoLezerTree, transformLezerTreeToZoektGrpcQuery } from './query'; +import { RepositoryInfo, SearchRequest, SearchResponse, SearchResultFile, SearchStats, SourceRange, StreamedSearchResponse } from "./types"; +import { FlushReason as ZoektFlushReason } from "@/proto/zoekt/webserver/v1/FlushReason"; const logger = createLogger("searchApi"); -// List of supported query prefixes in zoekt. -// @see : https://github.com/sourcebot-dev/zoekt/blob/main/query/parse.go#L417 -enum zoektPrefixes { - archived = "archived:", - branchShort = "b:", - branch = "branch:", - caseShort = "c:", - case = "case:", - content = "content:", - fileShort = "f:", - file = "file:", - fork = "fork:", - public = "public:", - repoShort = "r:", - repo = "repo:", - regex = "regex:", - lang = "lang:", - sym = "sym:", - typeShort = "t:", - type = "type:", - reposet = "reposet:", +export const search = (searchRequest: SearchRequest) => sew(() => + withOptionalAuthV2(async ({ prisma }) => { + const zoektSearchRequest = await createZoektSearchRequest({ + searchRequest, + prisma, + }); + + console.debug('zoektSearchRequest:', JSON.stringify(zoektSearchRequest, null, 2)); + + return zoektSearch(zoektSearchRequest, prisma); + })); + +export const streamSearch = (searchRequest: SearchRequest) => sew(() => + withOptionalAuthV2(async ({ prisma }) => { + const zoektSearchRequest = await createZoektSearchRequest({ + searchRequest, + prisma, + }); + + return zoektStreamSearch(zoektSearchRequest, prisma); + })); + + +const zoektSearch = async (searchRequest: ZoektGrpcSearchRequest, prisma: PrismaClient): Promise => { + const client = createGrpcClient(); + const metadata = new grpc.Metadata(); + + return new Promise((resolve, reject) => { + client.Search(searchRequest, metadata, async (error, response) => { + if (error || !response) { + reject(error || new Error('No response received')); + return; + } + + const reposMapCache = await createReposMapForChunk(response, new Map(), prisma); + const { stats, files, repositoryInfo } = await transformZoektSearchResponse(response, reposMapCache); + + resolve({ + stats, + files, + repositoryInfo, + isSearchExhaustive: stats.actualMatchCount <= stats.totalMatchCount, + } satisfies SearchResponse); + }); + }); } -const transformZoektQuery = async (query: string, orgId: number, prisma: PrismaClient): Promise => { - const prevQueryParts = query.split(" "); - const newQueryParts = []; +const zoektStreamSearch = async (searchRequest: ZoektGrpcSearchRequest, prisma: PrismaClient): Promise => { + const client = createGrpcClient(); + let grpcStream: ReturnType | null = null; + let isStreamActive = true; + let pendingChunks = 0; + let accumulatedStats: SearchStats = { + actualMatchCount: 0, + totalMatchCount: 0, + duration: 0, + fileCount: 0, + filesSkipped: 0, + contentBytesLoaded: 0, + indexBytesLoaded: 0, + crashes: 0, + shardFilesConsidered: 0, + filesConsidered: 0, + filesLoaded: 0, + shardsScanned: 0, + shardsSkipped: 0, + shardsSkippedFilter: 0, + ngramMatches: 0, + ngramLookups: 0, + wait: 0, + matchTreeConstruction: 0, + matchTreeSearch: 0, + regexpsConsidered: 0, + flushReason: ZoektFlushReason.FLUSH_REASON_UNKNOWN_UNSPECIFIED, + }; - for (const part of prevQueryParts) { + return new ReadableStream({ + async start(controller) { + const tryCloseController = () => { + if (!isStreamActive && pendingChunks === 0) { + const finalResponse: StreamedSearchResponse = { + type: 'final', + accumulatedStats, + isSearchExhaustive: accumulatedStats.totalMatchCount <= accumulatedStats.actualMatchCount, + } - // Handle mapping `rev:` and `revision:` to `branch:` - if (part.match(/^-?(rev|revision):.+$/)) { - const isNegated = part.startsWith("-"); - let revisionName = part.slice(part.indexOf(":") + 1); + controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(finalResponse)}\n\n`)); + controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')); + controller.close(); + client.close(); + logger.debug('SSE stream closed'); + } + }; - // Special case: `*` -> search all revisions. - // In zoekt, providing a blank string will match all branches. - // @see: https://github.com/sourcebot-dev/zoekt/blob/main/eval.go#L560-L562 - if (revisionName === "*") { - revisionName = ""; + try { + const metadata = new grpc.Metadata(); + + const streamRequest: ZoektGrpcStreamSearchRequest = { + request: searchRequest, + }; + + grpcStream = client.StreamSearch(streamRequest, metadata); + + // `_reposMapCache` is used to cache repository metadata across all chunks. + // This reduces the number of database queries required to transform file matches. + const _reposMapCache = new Map(); + + // Handle incoming data chunks + grpcStream.on('data', async (chunk: ZoektGrpcStreamSearchResponse) => { + if (!isStreamActive) { + logger.debug('SSE stream closed, skipping chunk'); + return; + } + + // Track that we're processing a chunk + pendingChunks++; + + // grpcStream.on doesn't actually await on our handler, so we need to + // explicitly pause the stream here to prevent the stream from completing + // prior to our asynchronous work being completed. + grpcStream?.pause(); + + try { + if (!chunk.response_chunk) { + logger.warn('No response chunk received'); + return; + } + + const reposMapCache = await createReposMapForChunk(chunk.response_chunk, _reposMapCache, prisma); + const { stats, files, repositoryInfo } = await transformZoektSearchResponse(chunk.response_chunk, reposMapCache); + + accumulatedStats = accumulateStats(accumulatedStats, stats); + + const response: StreamedSearchResponse = { + type: 'chunk', + files, + repositoryInfo, + stats + } + + const sseData = `data: ${JSON.stringify(response)}\n\n`; + controller.enqueue(new TextEncoder().encode(sseData)); + } catch (error) { + console.error('Error encoding chunk:', error); + } finally { + pendingChunks--; + grpcStream?.resume(); + + // @note: we were hitting "Controller is already closed" errors when calling + // `controller.enqueue` above for the last chunk. The reasoning was the event + // handler for 'end' was being invoked prior to the completion of the last chunk, + // resulting in the controller being closed prematurely. The workaround was to + // keep track of the number of pending chunks and only close the controller + // when there are no more chunks to process. We need to explicitly call + // `tryCloseController` since there _seems_ to be no ordering guarantees between + // the 'end' event handler and this callback. + tryCloseController(); + } + }); + + // Handle stream completion + grpcStream.on('end', () => { + if (!isStreamActive) { + return; + } + isStreamActive = false; + tryCloseController(); + }); + + // Handle errors + grpcStream.on('error', (error: grpc.ServiceError) => { + logger.error('gRPC stream error:', error); + Sentry.captureException(error); + + if (!isStreamActive) { + return; + } + isStreamActive = false; + + // Send error as SSE event + const errorData = `data: ${JSON.stringify({ + error: { + code: error.code, + message: error.details || error.message, + } + })}\n\n`; + controller.enqueue(new TextEncoder().encode(errorData)); + + controller.close(); + client.close(); + }); + } catch (error) { + logger.error('Stream initialization error:', error); + + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + const errorData = `data: ${JSON.stringify({ + error: { message: errorMessage } + })}\n\n`; + controller.enqueue(new TextEncoder().encode(errorData)); + + controller.close(); + client.close(); } - newQueryParts.push(`${isNegated ? "-" : ""}${zoektPrefixes.branch}${revisionName}`); + }, + cancel() { + logger.warn('SSE stream cancelled by client'); + isStreamActive = false; + + // Cancel the gRPC stream to stop receiving data + if (grpcStream) { + grpcStream.cancel(); + } + + client.close(); + } + }); +} + +// Creates a mapping between all repository ids in a given response +// chunk. The mapping allows us to efficiently lookup repository metadata. +const createReposMapForChunk = async (chunk: ZoektGrpcSearchResponse, reposMapCache: Map, prisma: PrismaClient): Promise> => { + const reposMap = new Map(); + await Promise.all(chunk.files.map(async (file) => { + const id = getRepoIdForFile(file); + + const repo = await (async () => { + // If it's in the cache, return the cached value. + if (reposMapCache.has(id)) { + return reposMapCache.get(id); + } + + // Otherwise, query the database for the record. + const repo = typeof id === 'number' ? + await prisma.repo.findUnique({ + where: { + id: id, + }, + }) : + await prisma.repo.findFirst({ + where: { + name: id, + }, + }); + + // If a repository is found, cache it for future lookups. + if (repo) { + reposMapCache.set(id, repo); + } + + return repo; + })(); + + // Only add the repository to the map if it was found. + if (repo) { + reposMap.set(id, repo); + } + })); + + return reposMap; +} + +const transformZoektSearchResponse = async (response: ZoektGrpcSearchResponse, reposMapCache: Map): Promise<{ + stats: SearchStats, + files: SearchResultFile[], + repositoryInfo: RepositoryInfo[], +}> => { + const files = response.files.map((file) => { + const fileNameChunks = file.chunk_matches.filter((chunk) => chunk.file_name); + const repoId = getRepoIdForFile(file); + const repo = reposMapCache.get(repoId); + + // This can happen if the user doesn't have access to the repository. + if (!repo) { + return undefined; } - // Expand `context:` into `reposet:` atom. - else if (part.match(/^-?context:.+$/)) { - const isNegated = part.startsWith("-"); - const contextName = part.slice(part.indexOf(":") + 1); + // @todo: address "file_name might not be a valid UTF-8 string" warning. + const fileName = file.file_name.toString('utf-8'); + const convertRange = (range: ZoektGrpcRange): SourceRange => ({ + start: { + byteOffset: range.start?.byte_offset ?? 0, + column: range.start?.column ?? 0, + lineNumber: range.start?.line_number ?? 0, + }, + end: { + byteOffset: range.end?.byte_offset ?? 0, + column: range.end?.column ?? 0, + lineNumber: range.end?.line_number ?? 0, + } + }) + + return { + fileName: { + text: fileName, + matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].ranges.map(convertRange) : [], + }, + repository: repo.name, + repositoryId: repo.id, + language: file.language, + // @todo: we will need to have a mechanism of forming the file's web url. + webUrl: '', + chunks: file.chunk_matches + .filter((chunk) => !chunk.file_name) // filter out filename chunks. + .map((chunk) => { + return { + content: chunk.content.toString('utf-8'), + matchRanges: chunk.ranges.map(convertRange), + contentStart: chunk.content_start ? { + byteOffset: chunk.content_start.byte_offset, + column: chunk.content_start.column, + lineNumber: chunk.content_start.line_number, + } : { + byteOffset: 0, + column: 0, + lineNumber: 0, + }, + symbols: chunk.symbol_info.map((symbol) => { + return { + symbol: symbol.sym, + kind: symbol.kind, + parent: symbol.parent ? { + symbol: symbol.parent, + kind: symbol.parent_kind, + } : undefined, + } + }) + } + }), + branches: file.branches, + content: file.content ? file.content.toString('utf-8') : undefined, + } + }).filter(file => file !== undefined); + + const actualMatchCount = files.reduce( + (acc, file) => + // Match count is the sum of the number of chunk matches and file name matches. + acc + file.chunks.reduce( + (acc, chunk) => acc + chunk.matchRanges.length, + 0, + ) + file.fileName.matchRanges.length, + 0, + ); + + const stats: SearchStats = { + actualMatchCount, + totalMatchCount: response.stats?.match_count ?? 0, + duration: response.stats?.duration?.nanos ?? 0, + fileCount: response.stats?.file_count ?? 0, + filesSkipped: response.stats?.files_skipped ?? 0, + contentBytesLoaded: response.stats?.content_bytes_loaded ?? 0, + indexBytesLoaded: response.stats?.index_bytes_loaded ?? 0, + crashes: response.stats?.crashes ?? 0, + shardFilesConsidered: response.stats?.shard_files_considered ?? 0, + filesConsidered: response.stats?.files_considered ?? 0, + filesLoaded: response.stats?.files_loaded ?? 0, + shardsScanned: response.stats?.shards_scanned ?? 0, + shardsSkipped: response.stats?.shards_skipped ?? 0, + shardsSkippedFilter: response.stats?.shards_skipped_filter ?? 0, + ngramMatches: response.stats?.ngram_matches ?? 0, + ngramLookups: response.stats?.ngram_lookups ?? 0, + wait: response.stats?.wait?.nanos ?? 0, + matchTreeConstruction: response.stats?.match_tree_construction?.nanos ?? 0, + matchTreeSearch: response.stats?.match_tree_search?.nanos ?? 0, + regexpsConsidered: response.stats?.regexps_considered ?? 0, + flushReason: response.stats?.flush_reason?.toString() ?? ZoektFlushReason.FLUSH_REASON_UNKNOWN_UNSPECIFIED, + } + + return { + files, + repositoryInfo: Array.from(reposMapCache.values()).map((repo) => ({ + id: repo.id, + codeHostType: repo.external_codeHostType, + name: repo.name, + displayName: repo.displayName ?? undefined, + webUrl: repo.webUrl ?? undefined, + })), + stats, + } +} + +// @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field +// which corresponds to the `id` in the Repo table. In order to efficiently fetch repository +// metadata when transforming (potentially thousands) of file matches, we aggregate a unique +// set of repository ids* and map them to their corresponding Repo record. +// +// *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`? +// A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was +// always undefined. To make this a non-breaking change, we fallback to using the repository's name +// (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in +// practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo', +// 'gitea.com/org/repo', etc.). +// +// Note: When a repository is re-indexed (every hour) this ID will be populated. +// @see: https://github.com/sourcebot-dev/zoekt/pull/6 +const getRepoIdForFile = (file: ZoektGrpcFileMatch): string | number => { + return file.repository_id ?? file.repository; +} + +const createZoektSearchRequest = async ({ + searchRequest, + prisma, +}: { + searchRequest: SearchRequest; + prisma: PrismaClient; +}) => { + const tree = parseQueryIntoLezerTree(searchRequest.query); + const zoektQuery = await transformLezerTreeToZoektGrpcQuery({ + tree, + input: searchRequest.query, + isCaseSensitivityEnabled: searchRequest.isCaseSensitivityEnabled ?? false, + isRegexEnabled: searchRequest.isRegexEnabled ?? false, + onExpandSearchContext: async (contextName: string) => { const context = await prisma.searchContext.findUnique({ where: { name_orgId: { name: contextName, - orgId, + orgId: SINGLE_TENANT_ORG_ID, } }, include: { @@ -73,357 +446,139 @@ const transformZoektQuery = async (query: string, orgId: number, prisma: PrismaC } }); - // If the context doesn't exist, return an error. if (!context) { - return { - errorCode: ErrorCode.SEARCH_CONTEXT_NOT_FOUND, - message: `Search context "${contextName}" not found`, - statusCode: StatusCodes.NOT_FOUND, - } satisfies ServiceError; + throw new Error(`Search context "${contextName}" not found`); } - const names = context.repos.map((repo) => repo.name); - newQueryParts.push(`${isNegated ? "-" : ""}${zoektPrefixes.reposet}${names.join(",")}`); - } + return context.repos.map((repo) => repo.name); + }, + }); - // no-op: add the original part to the new query parts. - else { - newQueryParts.push(part); - } - } - - return newQueryParts.join(" "); -} - -// Extracts a repository file URL from a zoekt template, branch, and file name. -const getFileWebUrl = (template: string, branch: string, fileName: string): string | undefined => { - // This is a hacky parser for templates generated by - // the go text/template package. Example template: - // {{URLJoinPath "https://github.com/sourcebot-dev/sourcebot" "blob" .Version .Path}} - - if (!template.match(/^{{URLJoinPath\s.*}}(\?.+)?$/)) { - return undefined; - } - - const url = - template.substring("{{URLJoinPath ".length, template.indexOf("}}")) - .split(" ") - .map((part) => { - // remove wrapping quotes - if (part.startsWith("\"")) part = part.substring(1); - if (part.endsWith("\"")) part = part.substring(0, part.length - 1); - // Replace variable references - if (part == ".Version") part = branch; - if (part == ".Path") part = fileName; - return part; - }) - .join("/"); - - const optionalQueryParams = - template.substring(template.indexOf("}}") + 2) - .replace("{{.Version}}", branch) - .replace("{{.Path}}", fileName); - - return encodeURI(url + optionalQueryParams); -} - -export const search = async ({ query, matches, contextLines, whole }: SearchRequest): Promise => sew(() => - withOptionalAuthV2(async ({ org, prisma }) => { - const transformedQuery = await transformZoektQuery(query, org.id, prisma); - if (isServiceError(transformedQuery)) { - return transformedQuery; - } - query = transformedQuery; - - const isBranchFilteringEnabled = ( - query.includes(zoektPrefixes.branch) || - query.includes(zoektPrefixes.branchShort) - ); - - // We only want to show matches for the default branch when - // the user isn't explicitly filtering by branch. - if (!isBranchFilteringEnabled) { - query = query.concat(` branch:HEAD`); - } - - const body = JSON.stringify({ - q: query, - // @see: https://github.com/sourcebot-dev/zoekt/blob/main/api.go#L892 - opts: { - ChunkMatches: true, - // @note: Zoekt has several different ways to limit a given search. The two that - // we care about are `MaxMatchDisplayCount` and `TotalMaxMatchCount`: - // - `MaxMatchDisplayCount` truncates the number of matches AFTER performing - // a search (specifically, after collating and sorting the results). The number of - // results returned by the API will be less than or equal to this value. - // - // - `TotalMaxMatchCount` truncates the number of matches DURING a search. The results - // returned by the API the API can be less than, equal to, or greater than this value. - // Why greater? Because this value is compared _after_ a given shard has finished - // being processed, the number of matches returned by the last shard may have exceeded - // this value. - // - // Let's define two variables: - // - `actualMatchCount` : The number of matches that are returned by the API. This is - // always less than or equal to `MaxMatchDisplayCount`. - // - `totalMatchCount` : The number of matches that zoekt found before it either - // 1) found all matches or 2) hit the `TotalMaxMatchCount` limit. This number is - // not bounded and can be less than, equal to, or greater than both `TotalMaxMatchCount` - // and `MaxMatchDisplayCount`. - // - // - // Our challenge is to determine whether or not the search returned all possible matches/ - // (it was exaustive) or if it was truncated. By setting the `TotalMaxMatchCount` to - // `MaxMatchDisplayCount + 1`, we can determine which of these occurred by comparing - // `totalMatchCount` to `MaxMatchDisplayCount`. - // - // if (totalMatchCount ≤ actualMatchCount): - // Search is EXHAUSTIVE (found all possible matches) - // Proof: totalMatchCount ≤ MaxMatchDisplayCount < TotalMaxMatchCount - // Therefore Zoekt stopped naturally, not due to limit - // - // if (totalMatchCount > actualMatchCount): - // Search is TRUNCATED (more matches exist) - // Proof: totalMatchCount > MaxMatchDisplayCount + 1 = TotalMaxMatchCount - // Therefore Zoekt hit the limit and stopped searching - // - MaxMatchDisplayCount: matches, - TotalMaxMatchCount: matches + 1, - NumContextLines: contextLines, - Whole: !!whole, - ShardMaxMatchCount: -1, - MaxWallTime: 0, // zoekt expects a duration in nanoseconds - } - }); - - let header: Record = {}; - header = { - "X-Tenant-ID": org.id.toString() - }; - - const { data: searchResponse, durationMs: fetchDurationMs } = await measure( - () => zoektFetch({ - path: "/api/search", - body, - header, - method: "POST", - }), - "zoekt_fetch", - false - ); - - if (!searchResponse.ok) { - return invalidZoektResponse(searchResponse); - } - - const transformZoektSearchResponse = async ({ Result }: ZoektSearchResponse): Promise => { - // @note (2025-05-12): in zoekt, repositories are identified by the `RepositoryID` field - // which corresponds to the `id` in the Repo table. In order to efficiently fetch repository - // metadata when transforming (potentially thousands) of file matches, we aggregate a unique - // set of repository ids* and map them to their corresponding Repo record. - // - // *Q: Why is `RepositoryID` optional? And why are we falling back to `Repository`? - // A: Prior to this change, the repository id was not plumbed into zoekt, so RepositoryID was - // always undefined. To make this a non-breaking change, we fallback to using the repository's name - // (`Repository`) as the identifier in these cases. This is not guaranteed to be unique, but in - // practice it is since the repository name includes the host and path (e.g., 'github.com/org/repo', - // 'gitea.com/org/repo', etc.). - // - // Note: When a repository is re-indexed (every hour) this ID will be populated. - // @see: https://github.com/sourcebot-dev/zoekt/pull/6 - const repoIdentifiers = new Set(Result.Files?.map((file) => file.RepositoryID ?? file.Repository) ?? []); - const repos = new Map(); - - (await prisma.repo.findMany({ - where: { - id: { - in: Array.from(repoIdentifiers).filter((id) => typeof id === "number"), - }, - orgId: org.id, - } - })).forEach(repo => repos.set(repo.id, repo)); - - (await prisma.repo.findMany({ - where: { - name: { - in: Array.from(repoIdentifiers).filter((id) => typeof id === "string"), - }, - orgId: org.id, - } - })).forEach(repo => repos.set(repo.name, repo)); - - const files = Result.Files?.map((file) => { - const fileNameChunks = file.ChunkMatches.filter((chunk) => chunk.FileName); - - const webUrl = (() => { - const template: string | undefined = Result.RepoURLs[file.Repository]; - if (!template) { - return undefined; + const zoektSearchRequest: ZoektGrpcSearchRequest = { + query: { + and: { + children: [ + zoektQuery, + // @todo: handle branch filtering. + { + branch: { + pattern: 'HEAD', + exact: true, + } } - - // If there are multiple branches pointing to the same revision of this file, it doesn't - // matter which branch we use here, so use the first one. - const branch = file.Branches && file.Branches.length > 0 ? file.Branches[0] : "HEAD"; - return getFileWebUrl(template, branch, file.FileName); - })(); - - const identifier = file.RepositoryID ?? file.Repository; - const repo = repos.get(identifier); - - // This can happen if the user doesn't have access to the repository. - if (!repo) { - return undefined; - } - - return { - fileName: { - text: file.FileName, - matchRanges: fileNameChunks.length === 1 ? fileNameChunks[0].Ranges.map((range) => ({ - start: { - byteOffset: range.Start.ByteOffset, - column: range.Start.Column, - lineNumber: range.Start.LineNumber, - }, - end: { - byteOffset: range.End.ByteOffset, - column: range.End.Column, - lineNumber: range.End.LineNumber, - } - })) : [], - }, - repository: repo.name, - repositoryId: repo.id, - webUrl: webUrl, - language: file.Language, - chunks: file.ChunkMatches - .filter((chunk) => !chunk.FileName) // Filter out filename chunks. - .map((chunk) => { - return { - content: base64Decode(chunk.Content), - matchRanges: chunk.Ranges.map((range) => ({ - start: { - byteOffset: range.Start.ByteOffset, - column: range.Start.Column, - lineNumber: range.Start.LineNumber, - }, - end: { - byteOffset: range.End.ByteOffset, - column: range.End.Column, - lineNumber: range.End.LineNumber, - } - }) satisfies SourceRange), - contentStart: { - byteOffset: chunk.ContentStart.ByteOffset, - column: chunk.ContentStart.Column, - lineNumber: chunk.ContentStart.LineNumber, - }, - symbols: chunk.SymbolInfo?.map((symbol) => { - return { - symbol: symbol.Sym, - kind: symbol.Kind, - parent: symbol.Parent.length > 0 ? { - symbol: symbol.Parent, - kind: symbol.ParentKind, - } : undefined, - } - }) ?? undefined, - } - }), - branches: file.Branches, - content: file.Content ? base64Decode(file.Content) : undefined, - } - }).filter((file) => file !== undefined) ?? []; - - const actualMatchCount = files.reduce( - (acc, file) => - // Match count is the sum of the number of chunk matches and file name matches. - acc + file.chunks.reduce( - (acc, chunk) => acc + chunk.matchRanges.length, - 0, - ) + file.fileName.matchRanges.length, - 0, - ); - - const totalMatchCount = Result.MatchCount; - const isSearchExhaustive = totalMatchCount <= actualMatchCount; - - return { - files, - repositoryInfo: Array.from(repos.values()).map((repo) => ({ - id: repo.id, - codeHostType: repo.external_codeHostType, - name: repo.name, - displayName: repo.displayName ?? undefined, - webUrl: repo.webUrl ?? undefined, - })), - isBranchFilteringEnabled, - isSearchExhaustive, - stats: { - actualMatchCount, - totalMatchCount, - duration: Result.Duration, - fileCount: Result.FileCount, - filesSkipped: Result.FilesSkipped, - contentBytesLoaded: Result.ContentBytesLoaded, - indexBytesLoaded: Result.IndexBytesLoaded, - crashes: Result.Crashes, - shardFilesConsidered: Result.ShardFilesConsidered, - filesConsidered: Result.FilesConsidered, - filesLoaded: Result.FilesLoaded, - shardsScanned: Result.ShardsScanned, - shardsSkipped: Result.ShardsSkipped, - shardsSkippedFilter: Result.ShardsSkippedFilter, - ngramMatches: Result.NgramMatches, - ngramLookups: Result.NgramLookups, - wait: Result.Wait, - matchTreeConstruction: Result.MatchTreeConstruction, - matchTreeSearch: Result.MatchTreeSearch, - regexpsConsidered: Result.RegexpsConsidered, - flushReason: Result.FlushReason, - } - } satisfies SearchResponse; - } - - const { data: rawZoektResponse, durationMs: parseJsonDurationMs } = await measure( - () => searchResponse.json(), - "parse_json", - false - ); - - // @note: We do not use zod parseAsync here since in cases where the - // response is large (> 40MB), there can be significant performance issues. - const zoektResponse = rawZoektResponse as ZoektSearchResponse; - - const { data: response, durationMs: transformZoektResponseDurationMs } = await measure( - () => transformZoektSearchResponse(zoektResponse), - "transform_zoekt_response", - false - ); - - const totalDurationMs = fetchDurationMs + parseJsonDurationMs + transformZoektResponseDurationMs; - - // Debug log: timing breakdown - const timings = [ - { name: "zoekt_fetch", duration: fetchDurationMs }, - { name: "parse_json", duration: parseJsonDurationMs }, - { name: "transform_zoekt_response", duration: transformZoektResponseDurationMs }, - ]; - - logger.debug(`Search timing breakdown (query: "${query}"):`); - timings.forEach(({ name, duration }) => { - const percentage = ((duration / totalDurationMs) * 100).toFixed(1); - const durationStr = duration.toFixed(2).padStart(8); - const percentageStr = percentage.padStart(5); - logger.debug(` ${name.padEnd(25)} ${durationStr}ms (${percentageStr}%)`); - }); - logger.debug(` ${"TOTAL".padEnd(25)} ${totalDurationMs.toFixed(2).padStart(8)}ms (100.0%)`); - - return { - ...response, - __debug_timings: { - zoekt_fetch: fetchDurationMs, - parse_json: parseJsonDurationMs, - transform_zoekt_response: transformZoektResponseDurationMs, + ] } - } satisfies SearchResponse; - })); + }, + opts: { + chunk_matches: true, + // @note: Zoekt has several different ways to limit a given search. The two that + // we care about are `MaxMatchDisplayCount` and `TotalMaxMatchCount`: + // - `MaxMatchDisplayCount` truncates the number of matches AFTER performing + // a search (specifically, after collating and sorting the results). The number of + // results returned by the API will be less than or equal to this value. + // + // - `TotalMaxMatchCount` truncates the number of matches DURING a search. The results + // returned by the API the API can be less than, equal to, or greater than this value. + // Why greater? Because this value is compared _after_ a given shard has finished + // being processed, the number of matches returned by the last shard may have exceeded + // this value. + // + // Let's define two variables: + // - `actualMatchCount` : The number of matches that are returned by the API. This is + // always less than or equal to `MaxMatchDisplayCount`. + // - `totalMatchCount` : The number of matches that zoekt found before it either + // 1) found all matches or 2) hit the `TotalMaxMatchCount` limit. This number is + // not bounded and can be less than, equal to, or greater than both `TotalMaxMatchCount` + // and `MaxMatchDisplayCount`. + // + // + // Our challenge is to determine whether or not the search returned all possible matches/ + // (it was exaustive) or if it was truncated. By setting the `TotalMaxMatchCount` to + // `MaxMatchDisplayCount + 1`, we can determine which of these occurred by comparing + // `totalMatchCount` to `MaxMatchDisplayCount`. + // + // if (totalMatchCount ≤ actualMatchCount): + // Search is EXHAUSTIVE (found all possible matches) + // Proof: totalMatchCount ≤ MaxMatchDisplayCount < TotalMaxMatchCount + // Therefore Zoekt stopped naturally, not due to limit + // + // if (totalMatchCount > actualMatchCount): + // Search is TRUNCATED (more matches exist) + // Proof: totalMatchCount > MaxMatchDisplayCount + 1 = TotalMaxMatchCount + // Therefore Zoekt hit the limit and stopped searching + // + max_match_display_count: searchRequest.matches, + total_max_match_count: searchRequest.matches + 1, + num_context_lines: searchRequest.contextLines ?? 0, + whole: !!searchRequest.whole, + shard_max_match_count: -1, + max_wall_time: { + seconds: 0, + } + }, + }; + + return zoektSearchRequest; +} + +const createGrpcClient = (): WebserverServiceClient => { + // Path to proto files - these should match your monorepo structure + const protoBasePath = path.join(process.cwd(), '../../vendor/zoekt/grpc/protos'); + const protoPath = path.join(protoBasePath, 'zoekt/webserver/v1/webserver.proto'); + + const packageDefinition = protoLoader.loadSync(protoPath, { + keepCase: true, + longs: Number, + enums: String, + defaults: true, + oneofs: true, + includeDirs: [protoBasePath], + }); + + const proto = grpc.loadPackageDefinition(packageDefinition) as unknown as ProtoGrpcType; + + // Extract host and port from ZOEKT_WEBSERVER_URL + const zoektUrl = new URL(env.ZOEKT_WEBSERVER_URL); + const grpcAddress = `${zoektUrl.hostname}:${zoektUrl.port}`; + + return new proto.zoekt.webserver.v1.WebserverService( + grpcAddress, + grpc.credentials.createInsecure(), + { + 'grpc.max_receive_message_length': 500 * 1024 * 1024, // 500MB + 'grpc.max_send_message_length': 500 * 1024 * 1024, // 500MB + } + ); +} + + +const accumulateStats = (a: SearchStats, b: SearchStats): SearchStats => { + return { + actualMatchCount: a.actualMatchCount + b.actualMatchCount, + totalMatchCount: a.totalMatchCount + b.totalMatchCount, + duration: a.duration + b.duration, + fileCount: a.fileCount + b.fileCount, + filesSkipped: a.filesSkipped + b.filesSkipped, + contentBytesLoaded: a.contentBytesLoaded + b.contentBytesLoaded, + indexBytesLoaded: a.indexBytesLoaded + b.indexBytesLoaded, + crashes: a.crashes + b.crashes, + shardFilesConsidered: a.shardFilesConsidered + b.shardFilesConsidered, + filesConsidered: a.filesConsidered + b.filesConsidered, + filesLoaded: a.filesLoaded + b.filesLoaded, + shardsScanned: a.shardsScanned + b.shardsScanned, + shardsSkipped: a.shardsSkipped + b.shardsSkipped, + shardsSkippedFilter: a.shardsSkippedFilter + b.shardsSkippedFilter, + ngramMatches: a.ngramMatches + b.ngramMatches, + ngramLookups: a.ngramLookups + b.ngramLookups, + wait: a.wait + b.wait, + matchTreeConstruction: a.matchTreeConstruction + b.matchTreeConstruction, + matchTreeSearch: a.matchTreeSearch + b.matchTreeSearch, + regexpsConsidered: a.regexpsConsidered + b.regexpsConsidered, + // Capture the first non-unknown flush reason. + ...(a.flushReason === ZoektFlushReason.FLUSH_REASON_UNKNOWN_UNSPECIFIED ? { + flushReason: b.flushReason + } : { + flushReason: a.flushReason, + }), + } +} diff --git a/packages/web/src/features/search/types.ts b/packages/web/src/features/search/types.ts index b46320dc..c4b49630 100644 --- a/packages/web/src/features/search/types.ts +++ b/packages/web/src/features/search/types.ts @@ -51,7 +51,7 @@ export const searchStatsSchema = z.object({ matchTreeConstruction: z.number(), // Aggregate wall clock time spent constructing and pruning the match tree. This accounts for time such as lookups in the trigram index. matchTreeSearch: z.number(), // Aggregate wall clock time spent searching the match tree. This accounts for the bulk of search work done looking for matches. regexpsConsidered: z.number(), // Number of times regexp was called on files that we evaluated. - flushReason: z.number(), // FlushReason explains why results were flushed. + flushReason: z.string(), // FlushReason explains why results were flushed. }); export type SearchStats = z.infer; @@ -96,9 +96,7 @@ export const searchResponseSchema = z.object({ stats: searchStatsSchema, files: z.array(searchFileSchema), repositoryInfo: z.array(repositoryInfoSchema), - isBranchFilteringEnabled: z.boolean(), isSearchExhaustive: z.boolean(), - __debug_timings: z.record(z.string(), z.number()).optional(), }); export type SearchResponse = z.infer; diff --git a/packages/web/src/features/search/zoektClient.ts b/packages/web/src/features/search/zoektClient.ts deleted file mode 100644 index bd30fcd1..00000000 --- a/packages/web/src/features/search/zoektClient.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { env } from "@sourcebot/shared"; - -interface ZoektRequest { - path: string, - body: string, - method: string, - header?: Record, - cache?: RequestCache, -} - -export const zoektFetch = async ({ - path, - body, - method, - header, - cache, -}: ZoektRequest) => { - const response = await fetch( - new URL(path, env.ZOEKT_WEBSERVER_URL), - { - method, - headers: { - ...header, - "Content-Type": "application/json", - }, - body, - cache, - } - ); - - // @todo : add metrics - - return response; -} \ No newline at end of file diff --git a/packages/web/src/features/search/zoektSchema.ts b/packages/web/src/features/search/zoektSchema.ts deleted file mode 100644 index c4f37e38..00000000 --- a/packages/web/src/features/search/zoektSchema.ts +++ /dev/null @@ -1,135 +0,0 @@ - -import { z } from "zod"; - -// @see : https://github.com/sourcebot-dev/zoekt/blob/main/api.go#L212 -export const zoektLocationSchema = z.object({ - // 0-based byte offset from the beginning of the file - ByteOffset: z.number(), - // 1-based line number from the beginning of the file - LineNumber: z.number(), - // 1-based column number (in runes) from the beginning of line - Column: z.number(), -}); - -export const zoektRangeSchema = z.object({ - Start: zoektLocationSchema, - End: zoektLocationSchema, -}); - -// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L350 -export const zoektSearchResponseStats = { - ContentBytesLoaded: z.number(), - IndexBytesLoaded: z.number(), - Crashes: z.number(), - Duration: z.number(), - FileCount: z.number(), - ShardFilesConsidered: z.number(), - FilesConsidered: z.number(), - FilesLoaded: z.number(), - FilesSkipped: z.number(), - ShardsScanned: z.number(), - ShardsSkipped: z.number(), - ShardsSkippedFilter: z.number(), - MatchCount: z.number(), - NgramMatches: z.number(), - NgramLookups: z.number(), - Wait: z.number(), - MatchTreeConstruction: z.number(), - MatchTreeSearch: z.number(), - RegexpsConsidered: z.number(), - FlushReason: z.number(), -} - -export const zoektSymbolSchema = z.object({ - Sym: z.string(), - Kind: z.string(), - Parent: z.string(), - ParentKind: z.string(), -}); - -// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L497 -export const zoektSearchResponseSchema = z.object({ - Result: z.object({ - ...zoektSearchResponseStats, - Files: z.array(z.object({ - FileName: z.string(), - Repository: z.string(), - RepositoryID: z.number().optional(), - Version: z.string().optional(), - Language: z.string(), - Branches: z.array(z.string()).optional(), - ChunkMatches: z.array(z.object({ - Content: z.string(), - Ranges: z.array(zoektRangeSchema), - FileName: z.boolean(), - ContentStart: zoektLocationSchema, - Score: z.number(), - SymbolInfo: z.array(zoektSymbolSchema).nullable(), - })), - Checksum: z.string(), - Score: z.number(), - // Set if `whole` is true. - Content: z.string().optional(), - })).nullable(), - RepoURLs: z.record(z.string(), z.string()), - }), -}); - -export type ZoektSearchResponse = z.infer; - -// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L728 -const zoektRepoStatsSchema = z.object({ - Repos: z.number(), - Shards: z.number(), - Documents: z.number(), - IndexBytes: z.number(), - ContentBytes: z.number(), - NewLinesCount: z.number(), - DefaultBranchNewLinesCount: z.number(), - OtherBranchesNewLinesCount: z.number(), -}); - -// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L716 -const zoektIndexMetadataSchema = z.object({ - IndexFormatVersion: z.number(), - IndexFeatureVersion: z.number(), - IndexMinReaderVersion: z.number(), - IndexTime: z.string(), - PlainASCII: z.boolean(), - LanguageMap: z.record(z.string(), z.number()), - ZoektVersion: z.string(), - ID: z.string(), -}); - - -// @see : https://github.com/sourcebot-dev/zoekt/blob/3780e68cdb537d5a7ed2c84d9b3784f80c7c5d04/api.go#L555 -export const zoektRepositorySchema = z.object({ - Name: z.string(), - URL: z.string(), - Source: z.string(), - Branches: z.array(z.object({ - Name: z.string(), - Version: z.string(), - })).nullable(), - CommitURLTemplate: z.string(), - FileURLTemplate: z.string(), - LineFragmentTemplate: z.string(), - RawConfig: z.record(z.string(), z.string()).nullable(), - Rank: z.number(), - IndexOptions: z.string(), - HasSymbols: z.boolean(), - Tombstone: z.boolean(), - LatestCommitDate: z.string(), - FileTombstones: z.string().optional(), -}); - -export const zoektListRepositoriesResponseSchema = z.object({ - List: z.object({ - Repos: z.array(z.object({ - Repository: zoektRepositorySchema, - IndexMetadata: zoektIndexMetadataSchema, - Stats: zoektRepoStatsSchema, - })), - Stats: zoektRepoStatsSchema, - }) -}); \ No newline at end of file diff --git a/packages/web/src/lib/posthogEvents.ts b/packages/web/src/lib/posthogEvents.ts index 12acbc5d..126ecf45 100644 --- a/packages/web/src/lib/posthogEvents.ts +++ b/packages/web/src/lib/posthogEvents.ts @@ -25,7 +25,7 @@ export type PosthogEventMap = { matchTreeConstruction: number, matchTreeSearch: number, regexpsConsidered: number, - flushReason: number, + flushReason: string, fileLanguages: string[], isSearchExhaustive: boolean },