diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index ce023fc5..4179e54a 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -1,33 +1,36 @@ -import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db"; -import { Job, Queue, Worker } from 'bullmq'; +import { Connection, PrismaClient, ConnectionSyncJobStatus } from "@sourcebot/db"; +import { Job, Queue, ReservedJob, Worker } from "groupmq"; import { Settings } from "./types.js"; -import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; import { createLogger } from "@sourcebot/logger"; import { Redis } from 'ioredis'; import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig, compileBitbucketConfig, compileAzureDevOpsConfig, compileGenericGitHostConfig } from "./repoCompileUtils.js"; import { BackendError, BackendException } from "@sourcebot/error"; -import { captureEvent } from "./posthog.js"; import { env } from "./env.js"; import * as Sentry from "@sentry/node"; import { loadConfig, syncSearchContexts } from "@sourcebot/shared"; +import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; +import { groupmqLifecycleExceptionWrapper } from "./utils.js"; -const QUEUE_NAME = 'connectionSyncQueue'; +const LOG_TAG = 'connection-manager'; +const logger = createLogger(LOG_TAG); +const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`); type JobPayload = { + jobId: string, connectionId: number, connectionName: string, orgId: number, - config: ConnectionConfig, }; type JobResult = { repoCount: number, } +const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout + export class ConnectionManager { private worker: Worker; private queue: Queue; - private logger = createLogger('connection-manager'); private interval?: NodeJS.Timeout; constructor( @@ -35,109 +38,133 @@ export class ConnectionManager { private settings: Settings, redis: Redis, ) { - this.queue = new Queue(QUEUE_NAME, { - connection: redis, + this.queue = new Queue({ + redis, + namespace: 'connection-sync-queue', + jobTimeoutMs: JOB_TIMEOUT_MS, + maxAttempts: 3, + logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true', }); - this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), { - connection: redis, + + this.worker = new Worker({ + queue: this.queue, + maxStalledCount: 1, + handler: this.runJob.bind(this), concurrency: this.settings.maxConnectionSyncJobConcurrency, + ...(env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true' ? { + logger: true, + } : {}), }); - this.worker.on('completed', this.onSyncJobCompleted.bind(this)); - this.worker.on('failed', this.onSyncJobFailed.bind(this)); - } - public async scheduleConnectionSync(connection: Connection) { - await this.db.$transaction(async (tx) => { - await tx.connection.update({ - where: { id: connection.id }, - data: { syncStatus: ConnectionSyncStatus.IN_SYNC_QUEUE }, - }); - - const connectionConfig = connection.config as unknown as ConnectionConfig; - - await this.queue.add('connectionSyncJob', { - connectionId: connection.id, - connectionName: connection.name, - orgId: connection.orgId, - config: connectionConfig, - }, { - removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE, - removeOnFail: env.REDIS_REMOVE_ON_FAIL, - }); - this.logger.info(`Added job to queue for connection ${connection.name} (id: ${connection.id})`); - }).catch((err: unknown) => { - this.logger.error(`Failed to add job to queue for connection ${connection.name} (id: ${connection.id}): ${err}`); - }); + this.worker.on('completed', this.onJobCompleted.bind(this)); + this.worker.on('failed', this.onJobFailed.bind(this)); + this.worker.on('stalled', this.onJobStalled.bind(this)); + this.worker.on('error', this.onWorkerError.bind(this)); } public startScheduler() { - this.logger.debug('Starting scheduler'); + logger.debug('Starting scheduler'); this.interval = setInterval(async () => { const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs); + const timeoutDate = new Date(Date.now() - JOB_TIMEOUT_MS); + const connections = await this.db.connection.findMany({ where: { - OR: [ - // When the connection needs to be synced, we want to sync it immediately. + AND: [ { - syncStatus: ConnectionSyncStatus.SYNC_NEEDED, - }, - // When the connection has already been synced, we only want to re-sync if the re-sync interval has elapsed - // (or if the date isn't set for some reason). - { - AND: [ - { - OR: [ - { syncStatus: ConnectionSyncStatus.SYNCED }, - { syncStatus: ConnectionSyncStatus.SYNCED_WITH_WARNINGS }, - ] - }, - { - OR: [ - { syncedAt: null }, - { syncedAt: { lt: thresholdDate } }, - ] - } + OR: [ + { syncedAt: null }, + { syncedAt: { lt: thresholdDate } }, ] + }, + { + NOT: { + syncJobs: { + some: { + OR: [ + // Don't schedule if there are active jobs that were created within the threshold date. + // This handles the case where a job is stuck in a pending state and will never be scheduled. + { + AND: [ + { status: { in: [ConnectionSyncJobStatus.PENDING, ConnectionSyncJobStatus.IN_PROGRESS] } }, + { createdAt: { gt: timeoutDate } }, + ] + }, + // Don't schedule if there are recent failed jobs (within the threshold date). + { + AND: [ + { status: ConnectionSyncJobStatus.FAILED }, + { completedAt: { gt: thresholdDate } }, + ] + } + ] + } + } + } } ] } }); - for (const connection of connections) { - await this.scheduleConnectionSync(connection); + + if (connections.length > 0) { + await this.createJobs(connections); } }, this.settings.resyncConnectionPollingIntervalMs); + + this.worker.run(); } - private async runSyncJob(job: Job): Promise { - const { config, orgId, connectionName } = job.data; + + public async createJobs(connections: Connection[]) { + const jobs = await this.db.connectionSyncJob.createManyAndReturn({ + data: connections.map(connection => ({ + connectionId: connection.id, + })), + include: { + connection: true, + } + }); + + for (const job of jobs) { + await this.queue.add({ + groupId: `connection:${job.connectionId}`, + data: { + jobId: job.id, + connectionId: job.connectionId, + connectionName: job.connection.name, + orgId: job.connection.orgId, + }, + jobId: job.id, + }); + } + } + + private async runJob(job: ReservedJob): Promise { + const { jobId, connectionName } = job.data; + const logger = createJobLogger(jobId); + logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); + // @note: We aren't actually doing anything with this atm. const abortController = new AbortController(); - const connection = await this.db.connection.findUnique({ + const { connection: { config: rawConnectionConfig, orgId } } = await this.db.connectionSyncJob.update({ where: { - id: job.data.connectionId, + id: jobId, + }, + data: { + status: ConnectionSyncJobStatus.IN_PROGRESS, + }, + select: { + connection: { + select: { + config: true, + orgId: true, + } + } }, }); - if (!connection) { - const e = new BackendException(BackendError.CONNECTION_SYNC_CONNECTION_NOT_FOUND, { - message: `Connection ${job.data.connectionId} not found`, - }); - Sentry.captureException(e); - throw e; - } - - // Reset the syncStatusMetadata to an empty object at the start of the sync job - await this.db.connection.update({ - where: { - id: job.data.connectionId, - }, - data: { - syncStatus: ConnectionSyncStatus.SYNCING, - syncStatusMetadata: {} - } - }) - + const config = rawConnectionConfig as unknown as ConnectionConfig; let result: { repoData: RepoData[], @@ -182,7 +209,7 @@ export class ConnectionManager { } })(); } catch (err) { - this.logger.error(`Failed to compile repo data for connection ${job.data.connectionId} (${connectionName}): ${err}`); + logger.error(`Failed to compile repo data for connection ${job.data.connectionId} (${connectionName}): ${err}`); Sentry.captureException(err); if (err instanceof BackendException) { @@ -194,18 +221,7 @@ export class ConnectionManager { } } - let { repoData, notFound } = result; - - // Push the information regarding not found users, orgs, and repos to the connection's syncStatusMetadata. Note that - // this won't be overwritten even if the connection job fails - await this.db.connection.update({ - where: { - id: job.data.connectionId, - }, - data: { - syncStatusMetadata: { notFound } - } - }); + let { repoData } = result; // Filter out any duplicates by external_id and external_codeHostUrl. repoData = repoData.filter((repo, index, self) => { @@ -233,7 +249,7 @@ export class ConnectionManager { } }); const deleteDuration = performance.now() - deleteStart; - this.logger.info(`Deleted all RepoToConnection records for connection ${connectionName} (id: ${job.data.connectionId}) in ${deleteDuration}ms`); + logger.info(`Deleted all RepoToConnection records for connection ${connectionName} (id: ${job.data.connectionId}) in ${deleteDuration}ms`); const totalUpsertStart = performance.now(); for (const repo of repoData) { @@ -250,10 +266,10 @@ export class ConnectionManager { create: repo, }) const upsertDuration = performance.now() - upsertStart; - this.logger.debug(`Upserted repo ${repo.displayName} (id: ${repo.external_id}) in ${upsertDuration}ms`); + logger.debug(`Upserted repo ${repo.displayName} (id: ${repo.external_id}) in ${upsertDuration}ms`); } const totalUpsertDuration = performance.now() - totalUpsertStart; - this.logger.info(`Upserted ${repoData.length} repos for connection ${connectionName} (id: ${job.data.connectionId}) in ${totalUpsertDuration}ms`); + logger.info(`Upserted ${repoData.length} repos for connection ${connectionName} (id: ${job.data.connectionId}) in ${totalUpsertDuration}ms`); }, { timeout: env.CONNECTION_MANAGER_UPSERT_TIMEOUT_MS }); return { @@ -262,106 +278,107 @@ export class ConnectionManager { } - private async onSyncJobCompleted(job: Job, result: JobResult) { - this.logger.info(`Connection sync job for connection ${job.data.connectionName} (id: ${job.data.connectionId}, jobId: ${job.id}) completed`); - const { connectionId, orgId } = job.data; + private onJobCompleted = async (job: Job) => + groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => { + const logger = createJobLogger(job.id); + const { connectionId, orgId } = job.data; - let syncStatusMetadata: Record = (await this.db.connection.findUnique({ - where: { id: connectionId }, - select: { syncStatusMetadata: true } - }))?.syncStatusMetadata as Record ?? {}; - const { notFound } = syncStatusMetadata as { - notFound: { - users: string[], - orgs: string[], - repos: string[], - } - }; - - await this.db.connection.update({ - where: { - id: connectionId, - }, - data: { - syncStatus: - notFound.users.length > 0 || - notFound.orgs.length > 0 || - notFound.repos.length > 0 ? ConnectionSyncStatus.SYNCED_WITH_WARNINGS : ConnectionSyncStatus.SYNCED, - syncedAt: new Date() - } - }); - - // After a connection has synced, we need to re-sync the org's search contexts as - // there may be new repos that match the search context's include/exclude patterns. - if (env.CONFIG_PATH) { - try { - const config = await loadConfig(env.CONFIG_PATH); - - await syncSearchContexts({ - db: this.db, - orgId, - contexts: config.contexts, - }); - } catch (err) { - this.logger.error(`Failed to sync search contexts for connection ${connectionId}: ${err}`); - Sentry.captureException(err); - } - } - - - captureEvent('backend_connection_sync_job_completed', { - connectionId: connectionId, - repoCount: result.repoCount, - }); - } - - private async onSyncJobFailed(job: Job | undefined, err: unknown) { - this.logger.info(`Connection sync job for connection ${job?.data.connectionName} (id: ${job?.data.connectionId}, jobId: ${job?.id}) failed with error: ${err}`); - Sentry.captureException(err, { - tags: { - connectionid: job?.data.connectionId, - jobId: job?.id, - queue: QUEUE_NAME, - } - }); - - if (job) { - const { connectionId } = job.data; - - captureEvent('backend_connection_sync_job_failed', { - connectionId: connectionId, - error: err instanceof BackendException ? err.code : 'UNKNOWN', - }); - - // We may have pushed some metadata during the execution of the job, so we make sure to not overwrite the metadata here - let syncStatusMetadata: Record = (await this.db.connection.findUnique({ - where: { id: connectionId }, - select: { syncStatusMetadata: true } - }))?.syncStatusMetadata as Record ?? {}; - - if (err instanceof BackendException) { - syncStatusMetadata = { - ...syncStatusMetadata, - error: err.code, - ...err.metadata, - } - } else { - syncStatusMetadata = { - ...syncStatusMetadata, - error: 'UNKNOWN', - } - } - - await this.db.connection.update({ + await this.db.connectionSyncJob.update({ where: { - id: connectionId, + id: job.id, }, data: { - syncStatus: ConnectionSyncStatus.FAILED, - syncStatusMetadata: syncStatusMetadata as Prisma.InputJsonValue, + status: ConnectionSyncJobStatus.COMPLETED, + completedAt: new Date(), + connection: { + update: { + syncedAt: new Date(), + } + } } }); - } + + // After a connection has synced, we need to re-sync the org's search contexts as + // there may be new repos that match the search context's include/exclude patterns. + if (env.CONFIG_PATH) { + try { + const config = await loadConfig(env.CONFIG_PATH); + + await syncSearchContexts({ + db: this.db, + orgId, + contexts: config.contexts, + }); + } catch (err) { + logger.error(`Failed to sync search contexts for connection ${connectionId}: ${err}`); + Sentry.captureException(err); + } + } + + logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`); + + // captureEvent('backend_connection_sync_job_completed', { + // connectionId: connectionId, + // repoCount: result.repoCount, + // }); + }); + + private onJobFailed = async (job: Job) => + groupmqLifecycleExceptionWrapper('onJobFailed', logger, async () => { + const logger = createJobLogger(job.id); + + const attempt = job.attemptsMade + 1; + const wasLastAttempt = attempt >= job.opts.attempts; + + if (wasLastAttempt) { + const { connection } = await this.db.connectionSyncJob.update({ + where: { id: job.id }, + data: { + status: ConnectionSyncJobStatus.FAILED, + completedAt: new Date(), + errorMessage: job.failedReason, + }, + select: { + connection: true, + } + }); + + logger.error(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`); + } else { + const connection = await this.db.connection.findUniqueOrThrow({ + where: { id: job.data.connectionId }, + }); + + logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`); + } + + // captureEvent('backend_connection_sync_job_failed', { + // connectionId: connectionId, + // error: err instanceof BackendException ? err.code : 'UNKNOWN', + // }); + }); + + private onJobStalled = async (jobId: string) => + groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => { + const logger = createJobLogger(jobId); + const { connection } = await this.db.connectionSyncJob.update({ + where: { id: jobId }, + data: { + status: ConnectionSyncJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job stalled', + }, + select: { + connection: true, + } + }); + + logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`); + }); + + private async onWorkerError(error: Error) { + Sentry.captureException(error); + logger.error(`Connection syncer worker error.`, error); } public async dispose() { diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index f04d293c..5e9e5ca1 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -132,15 +132,20 @@ model Connection { isDeclarative Boolean @default(false) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - /// When the connection was last synced successfully. - syncedAt DateTime? repos RepoToConnection[] + + /// @deprecated syncStatus ConnectionSyncStatus @default(SYNC_NEEDED) + /// @deprecated syncStatusMetadata Json? // The type of connection (e.g., github, gitlab, etc.) connectionType String + syncJobs ConnectionSyncJob[] + /// When the connection was last synced successfully. + syncedAt DateTime? + // The organization that owns this connection org Org @relation(fields: [orgId], references: [id], onDelete: Cascade) orgId Int @@ -148,6 +153,27 @@ model Connection { @@unique([name, orgId]) } +enum ConnectionSyncJobStatus { + PENDING + IN_PROGRESS + COMPLETED + FAILED +} + +model ConnectionSyncJob { + id String @id @default(cuid()) + status ConnectionSyncJobStatus @default(PENDING) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? + + warningMessages String[] + errorMessage String? + + connection Connection @relation(fields: [connectionId], references: [id], onDelete: Cascade) + connectionId Int +} + model RepoToConnection { addedAt DateTime @default(now())