From 5fe554e7da1dbfcd5b23cf40bb5fdd819d826c1d Mon Sep 17 00:00:00 2001 From: bkellam Date: Wed, 15 Oct 2025 11:23:34 -0700 Subject: [PATCH] renames + add abortSignal --- packages/backend/src/git.ts | 61 +- packages/backend/src/index.ts | 14 +- packages/backend/src/repoCompileUtils.ts | 4 +- .../{indexSyncer.ts => repoIndexManager.ts} | 75 ++- packages/backend/src/repoManager.ts | 566 ------------------ packages/backend/src/zoekt.ts | 4 +- 6 files changed, 118 insertions(+), 606 deletions(-) rename packages/backend/src/{indexSyncer.ts => repoIndexManager.ts} (87%) delete mode 100644 packages/backend/src/repoManager.ts diff --git a/packages/backend/src/git.ts b/packages/backend/src/git.ts index e78b7db6..21f30462 100644 --- a/packages/backend/src/git.ts +++ b/packages/backend/src/git.ts @@ -10,7 +10,7 @@ type onProgressFn = (event: SimpleGitProgressEvent) => void; * Creates a simple-git client that has it's working directory * set to the given path. */ -const createGitClientForPath = (path: string, onProgress?: onProgressFn) => { +const createGitClientForPath = (path: string, onProgress?: onProgressFn, signal?: AbortSignal) => { if (!existsSync(path)) { throw new Error(`Path ${path} does not exist`); } @@ -19,6 +19,7 @@ const createGitClientForPath = (path: string, onProgress?: onProgressFn) => { const git = simpleGit({ progress: onProgress, + abort: signal, }) .env({ ...process.env, @@ -48,17 +49,19 @@ export const cloneRepository = async ( authHeader, path, onProgress, + signal, }: { cloneUrl: string, authHeader?: string, path: string, onProgress?: onProgressFn + signal?: AbortSignal } ) => { try { await mkdir(path, { recursive: true }); - const git = createGitClientForPath(path, onProgress); + const git = createGitClientForPath(path, onProgress, signal); const cloneArgs = [ "--bare", @@ -67,7 +70,11 @@ export const cloneRepository = async ( await git.clone(cloneUrl, path, cloneArgs); - await unsetGitConfig(path, ["remote.origin.url"]); + await unsetGitConfig({ + path, + keys: ["remote.origin.url"], + signal, + }); } catch (error: unknown) { const baseLog = `Failed to clone repository: ${path}`; @@ -88,15 +95,17 @@ export const fetchRepository = async ( authHeader, path, onProgress, + signal, }: { cloneUrl: string, authHeader?: string, path: string, - onProgress?: onProgressFn + onProgress?: onProgressFn, + signal?: AbortSignal } ) => { try { - const git = createGitClientForPath(path, onProgress); + const git = createGitClientForPath(path, onProgress, signal); if (authHeader) { await git.addConfig("http.extraHeader", authHeader); @@ -137,8 +146,19 @@ export const fetchRepository = async ( * that do not exist yet. It will _not_ remove any existing keys that are not * present in gitConfig. */ -export const upsertGitConfig = async (path: string, gitConfig: Record, onProgress?: onProgressFn) => { - const git = createGitClientForPath(path, onProgress); +export const upsertGitConfig = async ( + { + path, + gitConfig, + onProgress, + signal, + }: { + path: string, + gitConfig: Record, + onProgress?: onProgressFn, + signal?: AbortSignal + }) => { + const git = createGitClientForPath(path, onProgress, signal); try { for (const [key, value] of Object.entries(gitConfig)) { @@ -157,8 +177,19 @@ export const upsertGitConfig = async (path: string, gitConfig: Record { - const git = createGitClientForPath(path, onProgress); +export const unsetGitConfig = async ( + { + path, + keys, + onProgress, + signal, + }: { + path: string, + keys: string[], + onProgress?: onProgressFn, + signal?: AbortSignal + }) => { + const git = createGitClientForPath(path, onProgress, signal); try { const configList = await git.listConfig(); @@ -181,8 +212,16 @@ export const unsetGitConfig = async (path: string, keys: string[], onProgress?: /** * Returns true if `path` is the _root_ of a git repository. */ -export const isPathAValidGitRepoRoot = async (path: string, onProgress?: onProgressFn) => { - const git = createGitClientForPath(path, onProgress); +export const isPathAValidGitRepoRoot = async ({ + path, + onProgress, + signal, +}: { + path: string, + onProgress?: onProgressFn, + signal?: AbortSignal +}) => { + const git = createGitClientForPath(path, onProgress, signal); try { return git.checkIsRepo(CheckRepoActions.IS_REPO_ROOT); diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index d8319272..78a1ce6c 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -11,9 +11,8 @@ import { DEFAULT_SETTINGS, INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants. import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js'; import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js"; import { env } from "./env.js"; -import { IndexSyncer } from "./indexSyncer.js"; +import { RepoIndexManager } from "./repoIndexManager.js"; import { PromClient } from './promClient.js'; -import { RepoManager } from './repoManager.js'; const logger = createLogger('backend-entrypoint'); @@ -60,16 +59,12 @@ const promClient = new PromClient(); const settings = await getSettings(env.CONFIG_PATH); const connectionManager = new ConnectionManager(prisma, settings, redis); -const repoManager = new RepoManager(prisma, settings, redis, promClient); const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis); const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis); -const indexSyncer = new IndexSyncer(prisma, settings, redis); - -// await repoManager.validateIndexedReposHaveShards(); +const repoIndexManager = new RepoIndexManager(prisma, settings, redis); connectionManager.startScheduler(); -// repoManager.startScheduler(); -indexSyncer.startScheduler(); +repoIndexManager.startScheduler(); if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && !hasEntitlement('permission-syncing')) { logger.error('Permission syncing is not supported in current plan. Please contact team@sourcebot.dev for assistance.'); @@ -88,8 +83,7 @@ const cleanup = async (signal: string) => { try { await Promise.race([ Promise.all([ - indexSyncer.dispose(), - repoManager.dispose(), + repoIndexManager.dispose(), connectionManager.dispose(), repoPermissionSyncer.dispose(), userPermissionSyncer.dispose(), diff --git a/packages/backend/src/repoCompileUtils.ts b/packages/backend/src/repoCompileUtils.ts index 098f39c9..80d80ebe 100644 --- a/packages/backend/src/repoCompileUtils.ts +++ b/packages/backend/src/repoCompileUtils.ts @@ -497,7 +497,9 @@ export const compileGenericGitHostConfig_file = async ( }; await Promise.all(repoPaths.map(async (repoPath) => { - const isGitRepo = await isPathAValidGitRepoRoot(repoPath); + const isGitRepo = await isPathAValidGitRepoRoot({ + path: repoPath, + }); if (!isGitRepo) { logger.warn(`Skipping ${repoPath} - not a git repository.`); notFound.repos.push(repoPath); diff --git a/packages/backend/src/indexSyncer.ts b/packages/backend/src/repoIndexManager.ts similarity index 87% rename from packages/backend/src/indexSyncer.ts rename to packages/backend/src/repoIndexManager.ts index 7380618e..e296220d 100644 --- a/packages/backend/src/indexSyncer.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -12,7 +12,7 @@ import { repoMetadataSchema, RepoWithConnections, Settings } from "./types.js"; import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js'; import { indexGitRepository } from './zoekt.js'; -const LOG_TAG = 'index-syncer'; +const LOG_TAG = 'repo-index-manager'; const logger = createLogger(LOG_TAG); const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`); @@ -25,7 +25,18 @@ type JobPayload = { const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout -export class IndexSyncer { +/** + * Manages the lifecycle of repository data on disk, including git working copies + * and search index shards. Handles both indexing operations (cloning/fetching repos + * and building search indexes) and cleanup operations (removing orphaned repos and + * their associated data). + * + * Uses a job queue system to process indexing and cleanup tasks asynchronously, + * with configurable concurrency limits and retry logic. Automatically schedules + * re-indexing of repos based on configured intervals and manages garbage collection + * of repos that are no longer connected to any source. + */ +export class RepoIndexManager { private interval?: NodeJS.Timeout; private queue: Queue; private worker: Worker; @@ -37,7 +48,7 @@ export class IndexSyncer { ) { this.queue = new Queue({ redis, - namespace: 'index-sync-queue', + namespace: 'repo-index-queue', jobTimeoutMs: JOB_TIMEOUT_MS, maxAttempts: 3, logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true', @@ -210,6 +221,7 @@ export class IndexSyncer { const logger = createJobLogger(id); logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); + const { repo, type: jobType } = await this.db.repoJob.update({ where: { id, @@ -231,14 +243,28 @@ export class IndexSyncer { } }); - if (jobType === RepoJobType.INDEX) { - await this.indexRepository(repo, logger); - } else if (jobType === RepoJobType.CLEANUP) { - await this.cleanupRepository(repo, logger); + const abortController = new AbortController(); + const signalHandler = () => { + logger.info(`Received shutdown signal, aborting...`); + abortController.abort(); // This cancels all operations + }; + + process.on('SIGTERM', signalHandler); + process.on('SIGINT', signalHandler); + + try { + if (jobType === RepoJobType.INDEX) { + await this.indexRepository(repo, logger, abortController.signal); + } else if (jobType === RepoJobType.CLEANUP) { + await this.cleanupRepository(repo, logger); + } + } finally { + process.off('SIGTERM', signalHandler); + process.off('SIGINT', signalHandler); } } - private async indexRepository(repo: RepoWithConnections, logger: Logger) { + private async indexRepository(repo: RepoWithConnections, logger: Logger, signal: AbortSignal) { const { path: repoPath, isReadOnly } = getRepoPath(repo); const metadata = repoMetadataSchema.parse(repo.metadata); @@ -250,9 +276,16 @@ export class IndexSyncer { // If the repo path exists but it is not a valid git repository root, this indicates // that the repository is in a bad state. To fix, we remove the directory and perform // a fresh clone. - if (existsSync(repoPath) && !(await isPathAValidGitRepoRoot(repoPath)) && !isReadOnly) { - logger.warn(`${repoPath} is not a valid git repository root. Deleting directory and performing fresh clone.`); - await rm(repoPath, { recursive: true, force: true }); + if (existsSync(repoPath) && !(await isPathAValidGitRepoRoot( { path: repoPath } ))) { + const isValidGitRepo = await isPathAValidGitRepoRoot({ + path: repoPath, + signal, + }); + + if (!isValidGitRepo && !isReadOnly) { + logger.warn(`${repoPath} is not a valid git repository root. Deleting directory and performing fresh clone.`); + await rm(repoPath, { recursive: true, force: true }); + } } if (existsSync(repoPath) && !isReadOnly) { @@ -262,7 +295,11 @@ export class IndexSyncer { // to unset this key since it is no longer needed, hence this line. // This will no-op if the key is already unset. // @see: https://github.com/sourcebot-dev/sourcebot/pull/483 - await unsetGitConfig(repoPath, ["remote.origin.url"]); + await unsetGitConfig({ + path: repoPath, + keys: ["remote.origin.url"], + signal, + }); logger.info(`Fetching ${repo.name} (id: ${repo.id})...`); const { durationMs } = await measure(() => fetchRepository({ @@ -271,7 +308,8 @@ export class IndexSyncer { path: repoPath, onProgress: ({ method, stage, progress }) => { logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`) - } + }, + signal, })); const fetchDuration_s = durationMs / 1000; @@ -287,7 +325,8 @@ export class IndexSyncer { path: repoPath, onProgress: ({ method, stage, progress }) => { logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`) - } + }, + signal })); const cloneDuration_s = durationMs / 1000; @@ -299,11 +338,15 @@ export class IndexSyncer { // This ensures that the git config is always up to date for whatever we // have in the DB. if (metadata.gitConfig && !isReadOnly) { - await upsertGitConfig(repoPath, metadata.gitConfig); + await upsertGitConfig({ + path: repoPath, + gitConfig: metadata.gitConfig, + signal, + }); } logger.info(`Indexing ${repo.name} (id: ${repo.id})...`); - const { durationMs } = await measure(() => indexGitRepository(repo, this.settings)); + const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, signal)); const indexDuration_s = durationMs / 1000; logger.info(`Indexed ${repo.name} (id: ${repo.id}) in ${indexDuration_s}s`); } diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts deleted file mode 100644 index 570e2b15..00000000 --- a/packages/backend/src/repoManager.ts +++ /dev/null @@ -1,566 +0,0 @@ -import * as Sentry from "@sentry/node"; -import { PrismaClient, Repo, RepoIndexingStatus, StripeSubscriptionStatus } from "@sourcebot/db"; -import { createLogger } from "@sourcebot/logger"; -import { Job, Queue, Worker } from 'bullmq'; -import { existsSync, promises, readdirSync } from 'fs'; -import { Redis } from 'ioredis'; -import { INDEX_CACHE_DIR } from "./constants.js"; -import { env } from './env.js'; -import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from "./git.js"; -import { PromClient } from './promClient.js'; -import { RepoWithConnections, Settings, repoMetadataSchema } from "./types.js"; -import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from "./utils.js"; -import { indexGitRepository } from "./zoekt.js"; - -const REPO_INDEXING_QUEUE = 'repoIndexingQueue'; -const REPO_GC_QUEUE = 'repoGarbageCollectionQueue'; - -type RepoIndexingPayload = { - repo: RepoWithConnections, -} - -type RepoGarbageCollectionPayload = { - repo: Repo, -} - -const logger = createLogger('repo-manager'); - -export class RepoManager { - private indexWorker: Worker; - private indexQueue: Queue; - private gcWorker: Worker; - private gcQueue: Queue; - private interval?: NodeJS.Timeout; - - constructor( - private db: PrismaClient, - private settings: Settings, - redis: Redis, - private promClient: PromClient, - ) { - // Repo indexing - this.indexQueue = new Queue(REPO_INDEXING_QUEUE, { - connection: redis, - }); - this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), { - connection: redis, - concurrency: this.settings.maxRepoIndexingJobConcurrency, - }); - this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this)); - this.indexWorker.on('failed', this.onIndexJobFailed.bind(this)); - - // Garbage collection - this.gcQueue = new Queue(REPO_GC_QUEUE, { - connection: redis, - }); - this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), { - connection: redis, - concurrency: this.settings.maxRepoGarbageCollectionJobConcurrency, - }); - this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this)); - this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this)); - } - - public startScheduler() { - logger.debug('Starting scheduler'); - this.interval = setInterval(async () => { - await this.fetchAndScheduleRepoIndexing(); - await this.fetchAndScheduleRepoGarbageCollection(); - await this.fetchAndScheduleRepoTimeouts(); - }, this.settings.reindexRepoPollingIntervalMs); - } - - /////////////////////////// - // Repo indexing - /////////////////////////// - - private async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) { - await this.db.$transaction(async (tx) => { - await tx.repo.updateMany({ - where: { id: { in: repos.map(repo => repo.id) } }, - data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE } - }); - - const reposByOrg = repos.reduce>((acc, repo) => { - if (!acc[repo.orgId]) { - acc[repo.orgId] = []; - } - acc[repo.orgId].push(repo); - return acc; - }, {}); - - for (const orgId in reposByOrg) { - const orgRepos = reposByOrg[orgId]; - // Set priority based on number of repos (more repos = lower priority) - // This helps prevent large orgs from overwhelming the indexQueue - const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152); - - await this.indexQueue.addBulk(orgRepos.map(repo => ({ - name: 'repoIndexJob', - data: { repo }, - opts: { - priority: priority, - removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE, - removeOnFail: env.REDIS_REMOVE_ON_FAIL, - }, - }))); - - // Increment pending jobs counter for each repo added - orgRepos.forEach(repo => { - this.promClient.pendingRepoIndexingJobs.inc({ repo: repo.id.toString() }); - }); - - logger.info(`Added ${orgRepos.length} jobs to indexQueue for org ${orgId} with priority ${priority}`); - } - - - }).catch((err: unknown) => { - logger.error(`Failed to add jobs to indexQueue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`); - }); - } - - - private async fetchAndScheduleRepoIndexing() { - const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs); - const repos = await this.db.repo.findMany({ - where: { - OR: [ - // "NEW" is really a misnomer here - it just means that the repo needs to be indexed - // immediately. In most cases, this will be because the repo was just created and - // is indeed "new". However, it could also be that a "retry" was requested on a failed - // index. So, we don't want to block on the indexedAt timestamp here. - { - repoIndexingStatus: RepoIndexingStatus.NEW, - }, - // When the repo has already been indexed, we only want to reindex if the reindexing - // interval has elapsed (or if the date isn't set for some reason). - { - AND: [ - { repoIndexingStatus: RepoIndexingStatus.INDEXED }, - { - OR: [ - { indexedAt: null }, - { indexedAt: { lt: thresholdDate } }, - ] - } - ] - } - ] - }, - include: { - connections: { - include: { - connection: true - } - } - } - }); - - if (repos.length > 0) { - await this.scheduleRepoIndexingBulk(repos); - } - } - - private async syncGitRepository(repo: RepoWithConnections, repoAlreadyInIndexingState: boolean) { - const { path: repoPath, isReadOnly } = getRepoPath(repo); - - const metadata = repoMetadataSchema.parse(repo.metadata); - - // If the repo was already in the indexing state, this job was likely killed and picked up again. As a result, - // to ensure the repo state is valid, we delete the repo if it exists so we get a fresh clone - if (repoAlreadyInIndexingState && existsSync(repoPath) && !isReadOnly) { - logger.info(`Deleting repo directory ${repoPath} during sync because it was already in the indexing state`); - await promises.rm(repoPath, { recursive: true, force: true }); - } - - const credentials = await getAuthCredentialsForRepo(repo, this.db); - const cloneUrlMaybeWithToken = credentials?.cloneUrlWithToken ?? repo.cloneUrl; - const authHeader = credentials?.authHeader ?? undefined; - - if (existsSync(repoPath) && !isReadOnly) { - // @NOTE: in #483, we changed the cloning method s.t., we _no longer_ - // write the clone URL (which could contain a auth token) to the - // `remote.origin.url` entry. For the upgrade scenario, we want - // to unset this key since it is no longer needed, hence this line. - // This will no-op if the key is already unset. - // @see: https://github.com/sourcebot-dev/sourcebot/pull/483 - await unsetGitConfig(repoPath, ["remote.origin.url"]); - - logger.info(`Fetching ${repo.displayName}...`); - const { durationMs } = await measure(() => fetchRepository({ - cloneUrl: cloneUrlMaybeWithToken, - authHeader, - path: repoPath, - onProgress: ({ method, stage, progress }) => { - logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`) - } - })); - const fetchDuration_s = durationMs / 1000; - - process.stdout.write('\n'); - logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`); - - } else if (!isReadOnly) { - logger.info(`Cloning ${repo.displayName}...`); - - const { durationMs } = await measure(() => cloneRepository({ - cloneUrl: cloneUrlMaybeWithToken, - authHeader, - path: repoPath, - onProgress: ({ method, stage, progress }) => { - logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`) - } - })); - const cloneDuration_s = durationMs / 1000; - - process.stdout.write('\n'); - logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`); - } - - // Regardless of clone or fetch, always upsert the git config for the repo. - // This ensures that the git config is always up to date for whatever we - // have in the DB. - if (metadata.gitConfig && !isReadOnly) { - await upsertGitConfig(repoPath, metadata.gitConfig); - } - - logger.info(`Indexing ${repo.displayName}...`); - const { durationMs } = await measure(() => indexGitRepository(repo, this.settings)); - const indexDuration_s = durationMs / 1000; - logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`); - } - - private async runIndexJob(job: Job) { - logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.displayName}`); - const repo = job.data.repo as RepoWithConnections; - - // We have to use the existing repo object to get the repoIndexingStatus because the repo object - // inside the job is unchanged from when it was added to the queue. - const existingRepo = await this.db.repo.findUnique({ - where: { - id: repo.id, - }, - }); - if (!existingRepo) { - logger.error(`Repo ${repo.id} not found`); - const e = new Error(`Repo ${repo.id} not found`); - Sentry.captureException(e); - throw e; - } - const repoAlreadyInIndexingState = existingRepo.repoIndexingStatus === RepoIndexingStatus.INDEXING; - - - await this.db.repo.update({ - where: { - id: repo.id, - }, - data: { - repoIndexingStatus: RepoIndexingStatus.INDEXING, - } - }); - this.promClient.activeRepoIndexingJobs.inc(); - this.promClient.pendingRepoIndexingJobs.dec({ repo: repo.id.toString() }); - - let attempts = 0; - const maxAttempts = 3; - - while (attempts < maxAttempts) { - try { - await this.syncGitRepository(repo, repoAlreadyInIndexingState); - break; - } catch (error) { - Sentry.captureException(error); - - attempts++; - this.promClient.repoIndexingReattemptsTotal.inc(); - if (attempts === maxAttempts) { - logger.error(`Failed to sync repository ${repo.name} (id: ${repo.id}) after ${maxAttempts} attempts. Error: ${error}`); - throw error; - } - - const sleepDuration = (env.REPO_SYNC_RETRY_BASE_SLEEP_SECONDS * 1000) * Math.pow(2, attempts - 1); - logger.error(`Failed to sync repository ${repo.name} (id: ${repo.id}), attempt ${attempts}/${maxAttempts}. Sleeping for ${sleepDuration / 1000}s... Error: ${error}`); - await new Promise(resolve => setTimeout(resolve, sleepDuration)); - } - } - } - - private async onIndexJobCompleted(job: Job) { - logger.info(`Repo index job for repo ${job.data.repo.displayName} (id: ${job.data.repo.id}, jobId: ${job.id}) completed`); - this.promClient.activeRepoIndexingJobs.dec(); - this.promClient.repoIndexingSuccessTotal.inc(); - - await this.db.repo.update({ - where: { - id: job.data.repo.id, - }, - data: { - indexedAt: new Date(), - repoIndexingStatus: RepoIndexingStatus.INDEXED, - } - }); - } - - private async onIndexJobFailed(job: Job | undefined, err: unknown) { - logger.info(`Repo index job for repo ${job?.data.repo.displayName} (id: ${job?.data.repo.id}, jobId: ${job?.id}) failed with error: ${err}`); - Sentry.captureException(err, { - tags: { - repoId: job?.data.repo.id, - jobId: job?.id, - queue: REPO_INDEXING_QUEUE, - } - }); - - if (job) { - this.promClient.activeRepoIndexingJobs.dec(); - this.promClient.repoIndexingFailTotal.inc(); - - await this.db.repo.update({ - where: { - id: job.data.repo.id, - }, - data: { - repoIndexingStatus: RepoIndexingStatus.FAILED, - } - }) - } - } - - /////////////////////////// - // Repo garbage collection - /////////////////////////// - - private async scheduleRepoGarbageCollectionBulk(repos: Repo[]) { - await this.db.$transaction(async (tx) => { - await tx.repo.updateMany({ - where: { id: { in: repos.map(repo => repo.id) } }, - data: { repoIndexingStatus: RepoIndexingStatus.IN_GC_QUEUE } - }); - - await this.gcQueue.addBulk(repos.map(repo => ({ - name: 'repoGarbageCollectionJob', - data: { repo }, - opts: { - removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE, - removeOnFail: env.REDIS_REMOVE_ON_FAIL, - } - }))); - - logger.info(`Added ${repos.length} jobs to gcQueue`); - }); - } - - private async fetchAndScheduleRepoGarbageCollection() { - //////////////////////////////////// - // Get repos with no connections - //////////////////////////////////// - - - const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs); - const reposWithNoConnections = await this.db.repo.findMany({ - where: { - repoIndexingStatus: { - in: [ - RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition) - RepoIndexingStatus.FAILED, - ] - }, - connections: { - none: {} - }, - OR: [ - { indexedAt: null }, - { indexedAt: { lt: thresholdDate } } - ] - }, - }); - if (reposWithNoConnections.length > 0) { - logger.info(`Garbage collecting ${reposWithNoConnections.length} repos with no connections: ${reposWithNoConnections.map(repo => repo.id).join(', ')}`); - } - - //////////////////////////////////// - // Get inactive org repos - //////////////////////////////////// - const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); - const inactiveOrgRepos = await this.db.repo.findMany({ - where: { - org: { - stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE, - stripeLastUpdatedAt: { - lt: sevenDaysAgo - } - }, - OR: [ - { indexedAt: null }, - { indexedAt: { lt: thresholdDate } } - ] - } - }); - - if (inactiveOrgRepos.length > 0) { - logger.info(`Garbage collecting ${inactiveOrgRepos.length} inactive org repos: ${inactiveOrgRepos.map(repo => repo.id).join(', ')}`); - } - - const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos]; - if (reposToDelete.length > 0) { - await this.scheduleRepoGarbageCollectionBulk(reposToDelete); - } - } - - private async runGarbageCollectionJob(job: Job) { - logger.info(`Running garbage collection job (id: ${job.id}) for repo ${job.data.repo.displayName} (id: ${job.data.repo.id})`); - this.promClient.activeRepoGarbageCollectionJobs.inc(); - - const repo = job.data.repo as Repo; - await this.db.repo.update({ - where: { - id: repo.id - }, - data: { - repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTING - } - }); - - // delete cloned repo - const { path: repoPath, isReadOnly } = getRepoPath(repo); - if (existsSync(repoPath) && !isReadOnly) { - logger.info(`Deleting repo directory ${repoPath}`); - await promises.rm(repoPath, { recursive: true, force: true }); - } - - // delete shards - const shardPrefix = getShardPrefix(repo.orgId, repo.id); - const files = readdirSync(INDEX_CACHE_DIR).filter(file => file.startsWith(shardPrefix)); - for (const file of files) { - const filePath = `${INDEX_CACHE_DIR}/${file}`; - logger.info(`Deleting shard file ${filePath}`); - await promises.rm(filePath, { force: true }); - } - } - - private async onGarbageCollectionJobCompleted(job: Job) { - logger.info(`Garbage collection job ${job.id} completed`); - this.promClient.activeRepoGarbageCollectionJobs.dec(); - this.promClient.repoGarbageCollectionSuccessTotal.inc(); - - await this.db.repo.delete({ - where: { - id: job.data.repo.id - } - }); - } - - private async onGarbageCollectionJobFailed(job: Job | undefined, err: unknown) { - logger.info(`Garbage collection job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`); - Sentry.captureException(err, { - tags: { - repoId: job?.data.repo.id, - jobId: job?.id, - queue: REPO_GC_QUEUE, - } - }); - - if (job) { - this.promClient.activeRepoGarbageCollectionJobs.dec(); - this.promClient.repoGarbageCollectionFailTotal.inc(); - - await this.db.repo.update({ - where: { - id: job.data.repo.id - }, - data: { - repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTION_FAILED - } - }); - } - } - - /////////////////////////// - // Repo index validation - /////////////////////////// - - public async validateIndexedReposHaveShards() { - logger.info('Validating indexed repos have shards...'); - - const indexedRepos = await this.db.repo.findMany({ - where: { - repoIndexingStatus: RepoIndexingStatus.INDEXED - } - }); - logger.info(`Found ${indexedRepos.length} repos in the DB marked as INDEXED`); - - if (indexedRepos.length === 0) { - return; - } - - const files = readdirSync(INDEX_CACHE_DIR); - const reposToReindex: number[] = []; - for (const repo of indexedRepos) { - const shardPrefix = getShardPrefix(repo.orgId, repo.id); - - // TODO: this doesn't take into account if a repo has multiple shards and only some of them are missing. To support that, this logic - // would need to know how many total shards are expected for this repo - let hasShards = false; - try { - hasShards = files.some(file => file.startsWith(shardPrefix)); - } catch (error) { - logger.error(`Failed to read index directory ${INDEX_CACHE_DIR}: ${error}`); - continue; - } - - if (!hasShards) { - logger.info(`Repo ${repo.displayName} (id: ${repo.id}) is marked as INDEXED but has no shards on disk. Marking for reindexing.`); - reposToReindex.push(repo.id); - } - } - - if (reposToReindex.length > 0) { - await this.db.repo.updateMany({ - where: { - id: { in: reposToReindex } - }, - data: { - repoIndexingStatus: RepoIndexingStatus.NEW - } - }); - logger.info(`Marked ${reposToReindex.length} repos for reindexing due to missing shards`); - } - - logger.info('Done validating indexed repos have shards'); - } - - private async fetchAndScheduleRepoTimeouts() { - const repos = await this.db.repo.findMany({ - where: { - repoIndexingStatus: RepoIndexingStatus.INDEXING, - updatedAt: { - lt: new Date(Date.now() - this.settings.repoIndexTimeoutMs) - } - } - }); - - if (repos.length > 0) { - logger.info(`Scheduling ${repos.length} repo timeouts`); - await this.scheduleRepoTimeoutsBulk(repos); - } - } - - private async scheduleRepoTimeoutsBulk(repos: Repo[]) { - await this.db.$transaction(async (tx) => { - await tx.repo.updateMany({ - where: { id: { in: repos.map(repo => repo.id) } }, - data: { repoIndexingStatus: RepoIndexingStatus.FAILED } - }); - }); - } - - public async dispose() { - if (this.interval) { - clearInterval(this.interval); - } - await this.indexWorker.close(); - await this.indexQueue.close(); - await this.gcQueue.close(); - await this.gcWorker.close(); - } -} \ No newline at end of file diff --git a/packages/backend/src/zoekt.ts b/packages/backend/src/zoekt.ts index 8b02e8ba..ad75927a 100644 --- a/packages/backend/src/zoekt.ts +++ b/packages/backend/src/zoekt.ts @@ -10,7 +10,7 @@ import { getRepoPath, getShardPrefix } from "./utils.js"; const logger = createLogger('zoekt'); -export const indexGitRepository = async (repo: Repo, settings: Settings) => { +export const indexGitRepository = async (repo: Repo, settings: Settings, signal?: AbortSignal) => { let revisions = [ 'HEAD' ]; @@ -71,7 +71,7 @@ export const indexGitRepository = async (repo: Repo, settings: Settings) => { ].join(' '); return new Promise<{ stdout: string, stderr: string }>((resolve, reject) => { - exec(command, (error, stdout, stderr) => { + exec(command, { signal }, (error, stdout, stderr) => { if (error) { reject(error); return;