From 963f6fd69ef30ccd8a44883aa36eb8253f13e65b Mon Sep 17 00:00:00 2001 From: bkellam Date: Fri, 10 Oct 2025 12:32:34 -0700 Subject: [PATCH] add indexing --- packages/backend/src/index.ts | 4 +- packages/backend/src/indexSyncer.ts | 225 ++++++++++++++++++---------- 2 files changed, 149 insertions(+), 80 deletions(-) diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 3850d07b..411de187 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -72,9 +72,9 @@ const connectionManager = new ConnectionManager(prisma, settings, redis); const repoManager = new RepoManager(prisma, settings, redis, promClient, context); const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis); const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis); -const indexSyncer = new IndexSyncer(prisma, settings, redis); +const indexSyncer = new IndexSyncer(prisma, settings, redis, context); -await repoManager.validateIndexedReposHaveShards(); +// await repoManager.validateIndexedReposHaveShards(); connectionManager.startScheduler(); // repoManager.startScheduler(); diff --git a/packages/backend/src/indexSyncer.ts b/packages/backend/src/indexSyncer.ts index 11a88009..aba95fbf 100644 --- a/packages/backend/src/indexSyncer.ts +++ b/packages/backend/src/indexSyncer.ts @@ -4,9 +4,13 @@ import * as Sentry from '@sentry/node'; import { PrismaClient, Repo, RepoIndexingJobStatus } from "@sourcebot/db"; import { createLogger } from "@sourcebot/logger"; import express from 'express'; -import { BullBoardGroupMQAdapter, Queue, Worker } from "groupmq"; +import { BullBoardGroupMQAdapter, Job, Queue, ReservedJob, Worker } from "groupmq"; import { Redis } from 'ioredis'; -import { Settings } from "./types.js"; +import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js"; +import { getAuthCredentialsForRepo, getRepoPath, measure } from './utils.js'; +import { existsSync } from 'fs'; +import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from './git.js'; +import { indexGitRepository } from './zoekt.js'; const logger = createLogger('index-syncer'); @@ -14,7 +18,7 @@ type IndexSyncJob = { jobId: string; } -const JOB_TIMEOUT_MS = 1000 * 60; // 60 second timeout. +const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout export class IndexSyncer { private interval?: NodeJS.Timeout; @@ -25,6 +29,7 @@ export class IndexSyncer { private db: PrismaClient, private settings: Settings, redis: Redis, + private ctx: AppContext, ) { this.queue = new Queue({ redis, @@ -37,88 +42,19 @@ export class IndexSyncer { queue: this.queue, maxStalledCount: 1, stalledInterval: 1000, - handler: async (job) => { - const id = job.data.jobId; - const { repo } = await this.db.repoIndexingJob.update({ - where: { - id, - }, - data: { - status: RepoIndexingJobStatus.IN_PROGRESS, - }, - select: { - repo: true, - } - }); - - logger.info(`Running index job ${id} for repo ${repo.name}`); - - await new Promise(resolve => setTimeout(resolve, 1000 * 10)); - - return true; - }, - concurrency: 4, - }); - - this.worker.on('completed', async (job) => { - const { repo } = await this.db.repoIndexingJob.update({ - where: { - id: job.data.jobId, - }, - data: { - status: RepoIndexingJobStatus.COMPLETED, - repo: { - update: { - indexedAt: new Date(), - } - }, - completedAt: new Date(), - }, - select: { - repo: true, - } - }); - - logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`); - }); - - this.worker.on('failed', async (job) => { - const { repo } = await this.db.repoIndexingJob.update({ - where: { - id: job.data.jobId, - }, - data: { - status: RepoIndexingJobStatus.FAILED, - completedAt: new Date(), - errorMessage: job.failedReason, - }, - select: { - repo: true, - } - }); - - logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`); - }); - - this.worker.on('stalled', async (jobId) => { - const { repo } = await this.db.repoIndexingJob.update({ - where: { id: jobId }, - data: { - status: RepoIndexingJobStatus.FAILED, - completedAt: new Date(), - errorMessage: 'Job stalled', - }, - select: { repo: true } - }); - - logger.warn(`Job ${jobId} stalled for repo ${repo.name}`); + handler: this.runJob.bind(this), + concurrency: this.settings.maxRepoIndexingJobConcurrency, }); + this.worker.on('completed', this.onJobCompleted.bind(this)); + this.worker.on('failed', this.onJobFailed.bind(this)); + this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', async (error) => { Sentry.captureException(error); logger.error(`Index syncer worker error.`, error); }); + // @nocheckin const app = express(); const serverAdapter = new ExpressAdapter(); @@ -215,6 +151,139 @@ export class IndexSyncer { } } + private async runJob(job: ReservedJob) { + const id = job.data.jobId; + const { repo } = await this.db.repoIndexingJob.update({ + where: { + id, + }, + data: { + status: RepoIndexingJobStatus.IN_PROGRESS, + }, + select: { + repo: { + include: { + connections: { + include: { + connection: true, + } + } + } + } + } + }); + + await this.syncGitRepository(repo); + } + + private async syncGitRepository(repo: RepoWithConnections) { + const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx); + + const metadata = repoMetadataSchema.parse(repo.metadata); + + const credentials = await getAuthCredentialsForRepo(repo, this.db); + const cloneUrlMaybeWithToken = credentials?.cloneUrlWithToken ?? repo.cloneUrl; + const authHeader = credentials?.authHeader ?? undefined; + + if (existsSync(repoPath) && !isReadOnly) { + // @NOTE: in #483, we changed the cloning method s.t., we _no longer_ + // write the clone URL (which could contain a auth token) to the + // `remote.origin.url` entry. For the upgrade scenario, we want + // to unset this key since it is no longer needed, hence this line. + // This will no-op if the key is already unset. + // @see: https://github.com/sourcebot-dev/sourcebot/pull/483 + await unsetGitConfig(repoPath, ["remote.origin.url"]); + + logger.info(`Fetching ${repo.displayName}...`); + const { durationMs } = await measure(() => fetchRepository({ + cloneUrl: cloneUrlMaybeWithToken, + authHeader, + path: repoPath, + onProgress: ({ method, stage, progress }) => { + logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`) + } + })); + const fetchDuration_s = durationMs / 1000; + + process.stdout.write('\n'); + logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`); + + } else if (!isReadOnly) { + logger.info(`Cloning ${repo.displayName}...`); + + const { durationMs } = await measure(() => cloneRepository({ + cloneUrl: cloneUrlMaybeWithToken, + authHeader, + path: repoPath, + onProgress: ({ method, stage, progress }) => { + logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`) + } + })); + const cloneDuration_s = durationMs / 1000; + + process.stdout.write('\n'); + logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`); + } + + // Regardless of clone or fetch, always upsert the git config for the repo. + // This ensures that the git config is always up to date for whatever we + // have in the DB. + if (metadata.gitConfig && !isReadOnly) { + await upsertGitConfig(repoPath, metadata.gitConfig); + } + + logger.info(`Indexing ${repo.displayName}...`); + const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx)); + const indexDuration_s = durationMs / 1000; + logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`); + } + + private async onJobCompleted(job: Job) { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoIndexingJobStatus.COMPLETED, + repo: { + update: { + indexedAt: new Date(), + } + }, + completedAt: new Date(), + }, + select: { repo: true } + }); + + logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`); + } + + private async onJobFailed(job: Job) { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: job.failedReason, + }, + select: { repo: true} + }); + + logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`); + } + + private async onJobStalled(jobId: string) { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job stalled', + }, + select: { repo: true } + }); + + logger.error(`Job ${jobId} stalled for repo ${repo.name}`); + } + public dispose() { if (this.interval) { clearInterval(this.interval);