From 32ce1ca16dc99a9df045c6397a9e63ed352f8507 Mon Sep 17 00:00:00 2001 From: bkellam Date: Thu, 9 Oct 2025 20:10:29 -0700 Subject: [PATCH] wip --- packages/backend/package.json | 3 + packages/backend/src/index.ts | 6 +- packages/backend/src/indexSyncer.ts | 225 ++++++++++++++++++++++++++++ packages/db/prisma/schema.prisma | 24 ++- yarn.lock | 171 ++++++++++++++++----- 5 files changed, 390 insertions(+), 39 deletions(-) create mode 100644 packages/backend/src/indexSyncer.ts diff --git a/packages/backend/package.json b/packages/backend/package.json index dade7893..4bd3fe40 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -22,6 +22,8 @@ "vitest": "^2.1.9" }, "dependencies": { + "@bull-board/api": "^6.13.0", + "@bull-board/express": "^6.13.0", "@coderabbitai/bitbucket": "^1.1.3", "@gitbeaker/rest": "^40.5.1", "@octokit/rest": "^21.0.2", @@ -45,6 +47,7 @@ "git-url-parse": "^16.1.0", "gitea-js": "^1.22.0", "glob": "^11.0.0", + "groupmq": "^1.0.0", "ioredis": "^5.4.2", "lowdb": "^7.0.1", "micromatch": "^4.0.8", diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 93f95e0b..3850d07b 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -15,6 +15,7 @@ import { PromClient } from './promClient.js'; import { RepoManager } from './repoManager.js'; import { AppContext } from "./types.js"; import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js"; +import { IndexSyncer } from "./indexSyncer.js"; const logger = createLogger('backend-entrypoint'); @@ -71,11 +72,13 @@ const connectionManager = new ConnectionManager(prisma, settings, redis); const repoManager = new RepoManager(prisma, settings, redis, promClient, context); const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis); const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis); +const indexSyncer = new IndexSyncer(prisma, settings, redis); await repoManager.validateIndexedReposHaveShards(); connectionManager.startScheduler(); -repoManager.startScheduler(); +// repoManager.startScheduler(); +indexSyncer.startScheduler(); if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && !hasEntitlement('permission-syncing')) { logger.error('Permission syncing is not supported in current plan. Please contact team@sourcebot.dev for assistance.'); @@ -93,6 +96,7 @@ const cleanup = async (signal: string) => { repoManager.dispose(); repoPermissionSyncer.dispose(); userPermissionSyncer.dispose(); + indexSyncer.dispose(); await prisma.$disconnect(); await redis.quit(); diff --git a/packages/backend/src/indexSyncer.ts b/packages/backend/src/indexSyncer.ts new file mode 100644 index 00000000..11a88009 --- /dev/null +++ b/packages/backend/src/indexSyncer.ts @@ -0,0 +1,225 @@ +import { createBullBoard } from '@bull-board/api'; +import { ExpressAdapter } from '@bull-board/express'; +import * as Sentry from '@sentry/node'; +import { PrismaClient, Repo, RepoIndexingJobStatus } from "@sourcebot/db"; +import { createLogger } from "@sourcebot/logger"; +import express from 'express'; +import { BullBoardGroupMQAdapter, Queue, Worker } from "groupmq"; +import { Redis } from 'ioredis'; +import { Settings } from "./types.js"; + +const logger = createLogger('index-syncer'); + +type IndexSyncJob = { + jobId: string; +} + +const JOB_TIMEOUT_MS = 1000 * 60; // 60 second timeout. + +export class IndexSyncer { + private interval?: NodeJS.Timeout; + private queue: Queue; + private worker: Worker; + + constructor( + private db: PrismaClient, + private settings: Settings, + redis: Redis, + ) { + this.queue = new Queue({ + redis, + namespace: 'index-sync-queue', + jobTimeoutMs: JOB_TIMEOUT_MS, + // logger: true, + }); + + this.worker = new Worker({ + queue: this.queue, + maxStalledCount: 1, + stalledInterval: 1000, + handler: async (job) => { + const id = job.data.jobId; + const { repo } = await this.db.repoIndexingJob.update({ + where: { + id, + }, + data: { + status: RepoIndexingJobStatus.IN_PROGRESS, + }, + select: { + repo: true, + } + }); + + logger.info(`Running index job ${id} for repo ${repo.name}`); + + await new Promise(resolve => setTimeout(resolve, 1000 * 10)); + + return true; + }, + concurrency: 4, + }); + + this.worker.on('completed', async (job) => { + const { repo } = await this.db.repoIndexingJob.update({ + where: { + id: job.data.jobId, + }, + data: { + status: RepoIndexingJobStatus.COMPLETED, + repo: { + update: { + indexedAt: new Date(), + } + }, + completedAt: new Date(), + }, + select: { + repo: true, + } + }); + + logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`); + }); + + this.worker.on('failed', async (job) => { + const { repo } = await this.db.repoIndexingJob.update({ + where: { + id: job.data.jobId, + }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: job.failedReason, + }, + select: { + repo: true, + } + }); + + logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`); + }); + + this.worker.on('stalled', async (jobId) => { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job stalled', + }, + select: { repo: true } + }); + + logger.warn(`Job ${jobId} stalled for repo ${repo.name}`); + }); + + this.worker.on('error', async (error) => { + Sentry.captureException(error); + logger.error(`Index syncer worker error.`, error); + }); + + const app = express(); + const serverAdapter = new ExpressAdapter(); + + createBullBoard({ + queues: [new BullBoardGroupMQAdapter(this.queue, { displayName: 'Index Sync' })], + serverAdapter, + }); + + app.use('/', serverAdapter.getRouter()); + app.listen(3070); + } + + public async startScheduler() { + this.interval = setInterval(async () => { + const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs); + + const repos = await this.db.repo.findMany({ + where: { + AND: [ + { + OR: [ + { indexedAt: null }, + { indexedAt: { lt: thresholdDate } }, + ] + }, + { + NOT: { + indexingJobs: { + some: { + OR: [ + // Don't schedule if there are active jobs that were created within the threshold date. + // This handles the case where a job is stuck in a pending state and will never be scheduled. + { + AND: [ + { + status: { + in: [ + RepoIndexingJobStatus.PENDING, + RepoIndexingJobStatus.IN_PROGRESS, + ] + }, + }, + { + createdAt: { + gt: thresholdDate, + } + } + ] + }, + // Don't schedule if there are recent failed jobs (within the threshold date). + { + AND: [ + { status: RepoIndexingJobStatus.FAILED }, + { completedAt: { gt: thresholdDate } }, + ] + } + ] + } + } + } + } + ], + } + }); + + if (repos.length === 0) { + return; + } + + await this.scheduleIndexSync(repos); + }, 1000 * 5); + + this.worker.run(); + } + + private async scheduleIndexSync(repos: Repo[]) { + // @note: we don't perform this in a transaction because + // we want to avoid the situation where a job is created and run + // prior to the transaction being committed. + const jobs = await this.db.repoIndexingJob.createManyAndReturn({ + data: repos.map(repo => ({ + repoId: repo.id, + })) + }); + + for (const job of jobs) { + await this.queue.add({ + groupId: `repo:${job.repoId}`, + data: { + jobId: job.id, + }, + jobId: job.id, + }); + } + } + + public dispose() { + if (this.interval) { + clearInterval(this.interval); + } + this.worker.close(); + this.queue.close(); + } +} \ No newline at end of file diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index bdebbc69..75e30ec9 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -46,7 +46,6 @@ model Repo { displayName String? /// Display name of the repo for UI (ex. sourcebot-dev/sourcebot) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - indexedAt DateTime? /// When the repo was last indexed successfully. isFork Boolean isArchived Boolean isPublic Boolean @default(false) @@ -61,6 +60,9 @@ model Repo { permissionSyncJobs RepoPermissionSyncJob[] permissionSyncedAt DateTime? /// When the permissions were last synced successfully. + indexingJobs RepoIndexingJob[] + indexedAt DateTime? /// When the repo was last indexed successfully. + external_id String /// The id of the repo in the external service external_codeHostType String /// The type of the external service (e.g., github, gitlab, etc.) external_codeHostUrl String /// The base url of the external service (e.g., https://github.com) @@ -74,6 +76,26 @@ model Repo { @@index([orgId]) } +enum RepoIndexingJobStatus { + PENDING + IN_PROGRESS + COMPLETED + FAILED +} + +model RepoIndexingJob { + id String @id @default(cuid()) + status RepoIndexingJobStatus @default(PENDING) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? + + errorMessage String? + + repo Repo @relation(fields: [repoId], references: [id], onDelete: Cascade) + repoId Int +} + enum RepoPermissionSyncJobStatus { PENDING IN_PROGRESS diff --git a/yarn.lock b/yarn.lock index 7fae191b..6fe84282 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1075,6 +1075,38 @@ __metadata: languageName: node linkType: hard +"@bull-board/api@npm:6.13.0, @bull-board/api@npm:^6.13.0": + version: 6.13.0 + resolution: "@bull-board/api@npm:6.13.0" + dependencies: + redis-info: "npm:^3.1.0" + peerDependencies: + "@bull-board/ui": 6.13.0 + checksum: 10c0/52d02ec30f7858bcbac0dee16d66d0605323e06706af64281adae78b28602aba798f2119be3c7e90d544f3e31dae9c01977fa26655340bb4dae9473b70587d46 + languageName: node + linkType: hard + +"@bull-board/express@npm:^6.13.0": + version: 6.13.0 + resolution: "@bull-board/express@npm:6.13.0" + dependencies: + "@bull-board/api": "npm:6.13.0" + "@bull-board/ui": "npm:6.13.0" + ejs: "npm:^3.1.10" + express: "npm:^4.21.1 || ^5.0.0" + checksum: 10c0/766125212b63d9ac802af863e4710875c876c84c03e29efa45b4fc7835bf4122db89b9b6e7cbc89db0087d5afe61fd476d1701933f75b9ab92537e08a1bb75e2 + languageName: node + linkType: hard + +"@bull-board/ui@npm:6.13.0": + version: 6.13.0 + resolution: "@bull-board/ui@npm:6.13.0" + dependencies: + "@bull-board/api": "npm:6.13.0" + checksum: 10c0/aa755a14113d19d960eaa61d568a56c843e43cb741b3d65868c43d1444783bafdb2b1ebfd9c2f7031bd792f797696fbaabb8b40ab7766e9e0d86b79b1e1a2202 + languageName: node + linkType: hard + "@codemirror/autocomplete@npm:^6.0.0, @codemirror/autocomplete@npm:^6.16.2, @codemirror/autocomplete@npm:^6.3.2, @codemirror/autocomplete@npm:^6.7.1": version: 6.18.6 resolution: "@codemirror/autocomplete@npm:6.18.6" @@ -7544,6 +7576,8 @@ __metadata: version: 0.0.0-use.local resolution: "@sourcebot/backend@workspace:packages/backend" dependencies: + "@bull-board/api": "npm:^6.13.0" + "@bull-board/express": "npm:^6.13.0" "@coderabbitai/bitbucket": "npm:^1.1.3" "@gitbeaker/rest": "npm:^40.5.1" "@octokit/rest": "npm:^21.0.2" @@ -7571,6 +7605,7 @@ __metadata: git-url-parse: "npm:^16.1.0" gitea-js: "npm:^1.22.0" glob: "npm:^11.0.0" + groupmq: "npm:^1.0.0" ioredis: "npm:^5.4.2" json-schema-to-typescript: "npm:^15.0.4" lowdb: "npm:^7.0.1" @@ -9497,7 +9532,7 @@ __metadata: languageName: node linkType: hard -"async@npm:^3.2.3": +"async@npm:^3.2.3, async@npm:^3.2.6": version: 3.2.6 resolution: "async@npm:3.2.6" checksum: 10c0/36484bb15ceddf07078688d95e27076379cc2f87b10c03b6dd8a83e89475a3c8df5848859dd06a4c95af1e4c16fc973de0171a77f18ea00be899aca2a4f85e70 @@ -11144,6 +11179,17 @@ __metadata: languageName: node linkType: hard +"ejs@npm:^3.1.10": + version: 3.1.10 + resolution: "ejs@npm:3.1.10" + dependencies: + jake: "npm:^10.8.5" + bin: + ejs: bin/cli.js + checksum: 10c0/52eade9e68416ed04f7f92c492183340582a36482836b11eab97b159fcdcfdedc62233a1bf0bf5e5e1851c501f2dca0e2e9afd111db2599e4e7f53ee29429ae1 + languageName: node + linkType: hard + "electron-to-chromium@npm:^1.5.73": version: 1.5.123 resolution: "electron-to-chromium@npm:1.5.123" @@ -12199,6 +12245,41 @@ __metadata: languageName: node linkType: hard +"express@npm:^4.21.1 || ^5.0.0, express@npm:^5.0.1, express@npm:^5.1.0": + version: 5.1.0 + resolution: "express@npm:5.1.0" + dependencies: + accepts: "npm:^2.0.0" + body-parser: "npm:^2.2.0" + content-disposition: "npm:^1.0.0" + content-type: "npm:^1.0.5" + cookie: "npm:^0.7.1" + cookie-signature: "npm:^1.2.1" + debug: "npm:^4.4.0" + encodeurl: "npm:^2.0.0" + escape-html: "npm:^1.0.3" + etag: "npm:^1.8.1" + finalhandler: "npm:^2.1.0" + fresh: "npm:^2.0.0" + http-errors: "npm:^2.0.0" + merge-descriptors: "npm:^2.0.0" + mime-types: "npm:^3.0.0" + on-finished: "npm:^2.4.1" + once: "npm:^1.4.0" + parseurl: "npm:^1.3.3" + proxy-addr: "npm:^2.0.7" + qs: "npm:^6.14.0" + range-parser: "npm:^1.2.1" + router: "npm:^2.2.0" + send: "npm:^1.1.0" + serve-static: "npm:^2.2.0" + statuses: "npm:^2.0.1" + type-is: "npm:^2.0.1" + vary: "npm:^1.1.2" + checksum: 10c0/80ce7c53c5f56887d759b94c3f2283e2e51066c98d4b72a4cc1338e832b77f1e54f30d0239cc10815a0f849bdb753e6a284d2fa48d4ab56faf9c501f55d751d6 + languageName: node + linkType: hard + "express@npm:^4.21.2": version: 4.21.2 resolution: "express@npm:4.21.2" @@ -12238,41 +12319,6 @@ __metadata: languageName: node linkType: hard -"express@npm:^5.0.1, express@npm:^5.1.0": - version: 5.1.0 - resolution: "express@npm:5.1.0" - dependencies: - accepts: "npm:^2.0.0" - body-parser: "npm:^2.2.0" - content-disposition: "npm:^1.0.0" - content-type: "npm:^1.0.5" - cookie: "npm:^0.7.1" - cookie-signature: "npm:^1.2.1" - debug: "npm:^4.4.0" - encodeurl: "npm:^2.0.0" - escape-html: "npm:^1.0.3" - etag: "npm:^1.8.1" - finalhandler: "npm:^2.1.0" - fresh: "npm:^2.0.0" - http-errors: "npm:^2.0.0" - merge-descriptors: "npm:^2.0.0" - mime-types: "npm:^3.0.0" - on-finished: "npm:^2.4.1" - once: "npm:^1.4.0" - parseurl: "npm:^1.3.3" - proxy-addr: "npm:^2.0.7" - qs: "npm:^6.14.0" - range-parser: "npm:^1.2.1" - router: "npm:^2.2.0" - send: "npm:^1.1.0" - serve-static: "npm:^2.2.0" - statuses: "npm:^2.0.1" - type-is: "npm:^2.0.1" - vary: "npm:^1.1.2" - checksum: 10c0/80ce7c53c5f56887d759b94c3f2283e2e51066c98d4b72a4cc1338e832b77f1e54f30d0239cc10815a0f849bdb753e6a284d2fa48d4ab56faf9c501f55d751d6 - languageName: node - linkType: hard - "extend@npm:^3.0.0, extend@npm:^3.0.2": version: 3.0.2 resolution: "extend@npm:3.0.2" @@ -12427,6 +12473,15 @@ __metadata: languageName: node linkType: hard +"filelist@npm:^1.0.4": + version: 1.0.4 + resolution: "filelist@npm:1.0.4" + dependencies: + minimatch: "npm:^5.0.1" + checksum: 10c0/426b1de3944a3d153b053f1c0ebfd02dccd0308a4f9e832ad220707a6d1f1b3c9784d6cadf6b2f68f09a57565f63ebc7bcdc913ccf8012d834f472c46e596f41 + languageName: node + linkType: hard + "fill-range@npm:^7.1.1": version: 7.1.1 resolution: "fill-range@npm:7.1.1" @@ -13024,6 +13079,17 @@ __metadata: languageName: node linkType: hard +"groupmq@npm:^1.0.0": + version: 1.0.0 + resolution: "groupmq@npm:1.0.0" + dependencies: + cron-parser: "npm:^4.9.0" + peerDependencies: + ioredis: ">=5" + checksum: 10c0/1795e483fc712ff91a2f7abb32f29f14f94b9d5e32033df064021f284a12773822e241f0d5e061978632dcf20df5db3f3d7701f04e53979d5b594f618fc53a27 + languageName: node + linkType: hard + "gtoken@npm:^7.0.0": version: 7.1.0 resolution: "gtoken@npm:7.1.0" @@ -13995,6 +14061,19 @@ __metadata: languageName: node linkType: hard +"jake@npm:^10.8.5": + version: 10.9.4 + resolution: "jake@npm:10.9.4" + dependencies: + async: "npm:^3.2.6" + filelist: "npm:^1.0.4" + picocolors: "npm:^1.1.1" + bin: + jake: bin/cli.js + checksum: 10c0/bb52f000340d4a32f1a3893b9abe56ef2b77c25da4dbf2c0c874a8159d082dddda50a5ad10e26060198bd645b928ba8dba3b362710f46a247e335321188c5a9c + languageName: node + linkType: hard + "jiti@npm:^1.21.6": version: 1.21.7 resolution: "jiti@npm:1.21.7" @@ -14425,7 +14504,7 @@ __metadata: languageName: node linkType: hard -"lodash@npm:^4.17.21": +"lodash@npm:^4.17.11, lodash@npm:^4.17.21": version: 4.17.21 resolution: "lodash@npm:4.17.21" checksum: 10c0/d8cbea072bb08655bb4c989da418994b073a608dffa608b09ac04b43a791b12aeae7cd7ad919aa4c925f33b48490b5cfe6c1f71d827956071dae2e7bb3a6b74c @@ -15317,6 +15396,15 @@ __metadata: languageName: node linkType: hard +"minimatch@npm:^5.0.1": + version: 5.1.6 + resolution: "minimatch@npm:5.1.6" + dependencies: + brace-expansion: "npm:^2.0.1" + checksum: 10c0/3defdfd230914f22a8da203747c42ee3c405c39d4d37ffda284dac5e45b7e1f6c49aa8be606509002898e73091ff2a3bbfc59c2c6c71d4660609f63aa92f98e3 + languageName: node + linkType: hard + "minimatch@npm:^8.0.2": version: 8.0.4 resolution: "minimatch@npm:8.0.4" @@ -17377,6 +17465,15 @@ __metadata: languageName: node linkType: hard +"redis-info@npm:^3.1.0": + version: 3.1.0 + resolution: "redis-info@npm:3.1.0" + dependencies: + lodash: "npm:^4.17.11" + checksum: 10c0/ec0f31d97893c5828cec7166486d74198c92160c60073b6f2fe805cdf575a10ddcccc7641737d44b8f451355f0ab5b6c7b0d79e8fc24742b75dd625f91ffee38 + languageName: node + linkType: hard + "redis-parser@npm:^3.0.0": version: 3.0.0 resolution: "redis-parser@npm:3.0.0"