diff --git a/packages/backend/src/indexSyncer.ts b/packages/backend/src/indexSyncer.ts index 19b20a77..6bf2ac0a 100644 --- a/packages/backend/src/indexSyncer.ts +++ b/packages/backend/src/indexSyncer.ts @@ -1,17 +1,15 @@ -import { createBullBoard } from '@bull-board/api'; -import { ExpressAdapter } from '@bull-board/express'; import * as Sentry from '@sentry/node'; import { PrismaClient, Repo, RepoJobStatus, RepoJobType } from "@sourcebot/db"; import { createLogger, Logger } from "@sourcebot/logger"; -import express from 'express'; -import { BullBoardGroupMQAdapter, Job, Queue, ReservedJob, Worker } from "groupmq"; -import { Redis } from 'ioredis'; -import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js"; -import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from './utils.js'; import { existsSync } from 'fs'; +import { readdir, rm } from 'fs/promises'; +import { Job, Queue, ReservedJob, Worker } from "groupmq"; +import { Redis } from 'ioredis'; +import { env } from './env.js'; import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js'; +import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js"; +import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js'; import { indexGitRepository } from './zoekt.js'; -import { rm, readdir } from 'fs/promises'; const LOG_TAG = 'index-syncer'; const logger = createLogger(LOG_TAG); @@ -26,17 +24,6 @@ type JobPayload = { const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout - -const groupmqLifecycleExceptionWrapper = async (name: string, fn: () => Promise) => { - try { - await fn(); - } catch (error) { - Sentry.captureException(error); - logger.error(`Exception thrown while executing lifecycle function \`${name}\`.`, error); - } -} - - export class IndexSyncer { private interval?: NodeJS.Timeout; private queue: Queue; @@ -52,8 +39,10 @@ export class IndexSyncer { redis, namespace: 'index-sync-queue', jobTimeoutMs: JOB_TIMEOUT_MS, - logger, - maxAttempts: 1, + maxAttempts: 3, + ...(env.SOURCEBOT_LOG_LEVEL === 'debug' ? { + logger, + }: {}), }); this.worker = new Worker({ @@ -61,25 +50,15 @@ export class IndexSyncer { maxStalledCount: 1, handler: this.runJob.bind(this), concurrency: this.settings.maxRepoIndexingJobConcurrency, - logger, + ...(env.SOURCEBOT_LOG_LEVEL === 'debug' ? { + logger, + }: {}), }); this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); - - // @nocheckin - const app = express(); - const serverAdapter = new ExpressAdapter(); - - createBullBoard({ - queues: [new BullBoardGroupMQAdapter(this.queue, { displayName: 'Index Sync' })], - serverAdapter, - }); - - app.use('/', serverAdapter.getRouter()); - app.listen(3070); } public async startScheduler() { @@ -215,7 +194,7 @@ export class IndexSyncer { for (const job of jobs) { await this.queue.add({ - groupId: `repo:${job.repoId}`, + groupId: `repo:${job.repoId}_${job.repo.name}`, data: { jobId: job.id, type, @@ -230,7 +209,7 @@ export class IndexSyncer { private async runJob(job: ReservedJob) { const id = job.data.jobId; const logger = createJobLogger(id); - logger.info(`Running job ${id} for repo ${job.data.repoName}`); + logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); const { repo, type: jobType } = await this.db.repoJob.update({ where: { @@ -286,35 +265,35 @@ export class IndexSyncer { // @see: https://github.com/sourcebot-dev/sourcebot/pull/483 await unsetGitConfig(repoPath, ["remote.origin.url"]); - logger.info(`Fetching ${repo.displayName}...`); + logger.info(`Fetching ${repo.name} (id: ${repo.id})...`); const { durationMs } = await measure(() => fetchRepository({ cloneUrl: cloneUrlMaybeWithToken, authHeader, path: repoPath, onProgress: ({ method, stage, progress }) => { - logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`) + logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`) } })); const fetchDuration_s = durationMs / 1000; process.stdout.write('\n'); - logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`); + logger.info(`Fetched ${repo.name} (id: ${repo.id}) in ${fetchDuration_s}s`); } else if (!isReadOnly) { - logger.info(`Cloning ${repo.displayName}...`); + logger.info(`Cloning ${repo.name} (id: ${repo.id})...`); const { durationMs } = await measure(() => cloneRepository({ cloneUrl: cloneUrlMaybeWithToken, authHeader, path: repoPath, onProgress: ({ method, stage, progress }) => { - logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`) + logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`) } })); const cloneDuration_s = durationMs / 1000; process.stdout.write('\n'); - logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`); + logger.info(`Cloned ${repo.name} (id: ${repo.id}) in ${cloneDuration_s}s`); } // Regardless of clone or fetch, always upsert the git config for the repo. @@ -324,10 +303,10 @@ export class IndexSyncer { await upsertGitConfig(repoPath, metadata.gitConfig); } - logger.info(`Indexing ${repo.displayName}...`); + logger.info(`Indexing ${repo.name} (id: ${repo.id})...`); const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx)); const indexDuration_s = durationMs / 1000; - logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`); + logger.info(`Indexed ${repo.name} (id: ${repo.id}) in ${indexDuration_s}s`); } private async cleanupRepository(repo: Repo, logger: Logger) { @@ -347,7 +326,7 @@ export class IndexSyncer { } private onJobCompleted = async (job: Job) => - groupmqLifecycleExceptionWrapper('onJobCompleted', async () => { + groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => { const logger = createJobLogger(job.data.jobId); const jobData = await this.db.repoJob.update({ where: { id: job.data.jobId }, @@ -365,35 +344,47 @@ export class IndexSyncer { } }); - logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`); + logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`); } else if (jobData.type === RepoJobType.CLEANUP) { const repo = await this.db.repo.delete({ where: { id: jobData.repoId }, }); - logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name}`); + logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`); } }); private onJobFailed = async (job: Job) => - groupmqLifecycleExceptionWrapper('onJobFailed', async () => { + groupmqLifecycleExceptionWrapper('onJobFailed', logger, async () => { const logger = createJobLogger(job.data.jobId); - const { repo } = await this.db.repoJob.update({ - where: { id: job.data.jobId }, - data: { - completedAt: new Date(), - errorMessage: job.failedReason, - }, - select: { repo: true } - }); + const attempt = job.attemptsMade + 1; + const wasLastAttempt = attempt >= job.opts.attempts; - logger.error(`Failed job ${job.data.jobId} for repo ${repo.name}`); + if (wasLastAttempt) { + const { repo } = await this.db.repoJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoJobStatus.FAILED, + completedAt: new Date(), + errorMessage: job.failedReason, + }, + select: { repo: true } + }); + + logger.error(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`); + } else { + const repo = await this.db.repo.findUniqueOrThrow({ + where: { id: job.data.repoId }, + }); + + logger.warn(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`); + } }); private onJobStalled = async (jobId: string) => - groupmqLifecycleExceptionWrapper('onJobStalled', async () => { + groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => { const logger = createJobLogger(jobId); const { repo } = await this.db.repoJob.update({ where: { id: jobId }, @@ -405,7 +396,7 @@ export class IndexSyncer { select: { repo: true } }); - logger.error(`Job ${jobId} stalled for repo ${repo.name}`); + logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`); }); private async onWorkerError(error: Error) { diff --git a/packages/backend/src/utils.ts b/packages/backend/src/utils.ts index e6ac5f93..f081ce1b 100644 --- a/packages/backend/src/utils.ts +++ b/packages/backend/src/utils.ts @@ -241,3 +241,20 @@ const createGitCloneUrlWithToken = (cloneUrl: string, credentials: { username?: } return url.toString(); } + + +/** + * Wraps groupmq worker lifecycle callbacks with exception handling. This prevents + * uncaught exceptions (e.g., like a RepoJob not existing in the DB) from crashing + * the app. + * @see: https://openpanel-dev.github.io/groupmq/api-worker/#events + */ +export const groupmqLifecycleExceptionWrapper = async (name: string, logger: Logger, fn: () => Promise) => { + try { + await fn(); + } catch (error) { + Sentry.captureException(error); + logger.error(`Exception thrown while executing lifecycle function \`${name}\`.`, error); + } +} +