This commit is contained in:
bkellam 2025-10-26 02:01:38 -04:00
parent 3ff88da33b
commit c6a9569309
2 changed files with 237 additions and 194 deletions

View file

@ -1,33 +1,36 @@
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
import { Job, Queue, Worker } from 'bullmq';
import { Connection, PrismaClient, ConnectionSyncJobStatus } from "@sourcebot/db";
import { Job, Queue, ReservedJob, Worker } from "groupmq";
import { Settings } from "./types.js";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { createLogger } from "@sourcebot/logger";
import { Redis } from 'ioredis';
import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig, compileBitbucketConfig, compileAzureDevOpsConfig, compileGenericGitHostConfig } from "./repoCompileUtils.js";
import { BackendError, BackendException } from "@sourcebot/error";
import { captureEvent } from "./posthog.js";
import { env } from "./env.js";
import * as Sentry from "@sentry/node";
import { loadConfig, syncSearchContexts } from "@sourcebot/shared";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { groupmqLifecycleExceptionWrapper } from "./utils.js";
const QUEUE_NAME = 'connectionSyncQueue';
const LOG_TAG = 'connection-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
type JobPayload = {
jobId: string,
connectionId: number,
connectionName: string,
orgId: number,
config: ConnectionConfig,
};
type JobResult = {
repoCount: number,
}
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout
export class ConnectionManager {
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('connection-manager');
private interval?: NodeJS.Timeout;
constructor(
@ -35,109 +38,133 @@ export class ConnectionManager {
private settings: Settings,
redis: Redis,
) {
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis,
this.queue = new Queue<JobPayload>({
redis,
namespace: 'connection-sync-queue',
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
});
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
connection: redis,
this.worker = new Worker<JobPayload>({
queue: this.queue,
maxStalledCount: 1,
handler: this.runJob.bind(this),
concurrency: this.settings.maxConnectionSyncJobConcurrency,
});
this.worker.on('completed', this.onSyncJobCompleted.bind(this));
this.worker.on('failed', this.onSyncJobFailed.bind(this));
}
public async scheduleConnectionSync(connection: Connection) {
await this.db.$transaction(async (tx) => {
await tx.connection.update({
where: { id: connection.id },
data: { syncStatus: ConnectionSyncStatus.IN_SYNC_QUEUE },
...(env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true' ? {
logger: true,
} : {}),
});
const connectionConfig = connection.config as unknown as ConnectionConfig;
await this.queue.add('connectionSyncJob', {
connectionId: connection.id,
connectionName: connection.name,
orgId: connection.orgId,
config: connectionConfig,
}, {
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
removeOnFail: env.REDIS_REMOVE_ON_FAIL,
});
this.logger.info(`Added job to queue for connection ${connection.name} (id: ${connection.id})`);
}).catch((err: unknown) => {
this.logger.error(`Failed to add job to queue for connection ${connection.name} (id: ${connection.id}): ${err}`);
});
this.worker.on('completed', this.onJobCompleted.bind(this));
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
}
public startScheduler() {
this.logger.debug('Starting scheduler');
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs);
const timeoutDate = new Date(Date.now() - JOB_TIMEOUT_MS);
const connections = await this.db.connection.findMany({
where: {
OR: [
// When the connection needs to be synced, we want to sync it immediately.
{
syncStatus: ConnectionSyncStatus.SYNC_NEEDED,
},
// When the connection has already been synced, we only want to re-sync if the re-sync interval has elapsed
// (or if the date isn't set for some reason).
{
AND: [
{
OR: [
{ syncStatus: ConnectionSyncStatus.SYNCED },
{ syncStatus: ConnectionSyncStatus.SYNCED_WITH_WARNINGS },
]
},
{
OR: [
{ syncedAt: null },
{ syncedAt: { lt: thresholdDate } },
]
},
{
NOT: {
syncJobs: {
some: {
OR: [
// Don't schedule if there are active jobs that were created within the threshold date.
// This handles the case where a job is stuck in a pending state and will never be scheduled.
{
AND: [
{ status: { in: [ConnectionSyncJobStatus.PENDING, ConnectionSyncJobStatus.IN_PROGRESS] } },
{ createdAt: { gt: timeoutDate } },
]
},
// Don't schedule if there are recent failed jobs (within the threshold date).
{
AND: [
{ status: ConnectionSyncJobStatus.FAILED },
{ completedAt: { gt: thresholdDate } },
]
}
]
}
}
}
}
]
}
});
for (const connection of connections) {
await this.scheduleConnectionSync(connection);
if (connections.length > 0) {
await this.createJobs(connections);
}
}, this.settings.resyncConnectionPollingIntervalMs);
this.worker.run();
}
private async runSyncJob(job: Job<JobPayload>): Promise<JobResult> {
const { config, orgId, connectionName } = job.data;
public async createJobs(connections: Connection[]) {
const jobs = await this.db.connectionSyncJob.createManyAndReturn({
data: connections.map(connection => ({
connectionId: connection.id,
})),
include: {
connection: true,
}
});
for (const job of jobs) {
await this.queue.add({
groupId: `connection:${job.connectionId}`,
data: {
jobId: job.id,
connectionId: job.connectionId,
connectionName: job.connection.name,
orgId: job.connection.orgId,
},
jobId: job.id,
});
}
}
private async runJob(job: ReservedJob<JobPayload>): Promise<JobResult> {
const { jobId, connectionName } = job.data;
const logger = createJobLogger(jobId);
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
// @note: We aren't actually doing anything with this atm.
const abortController = new AbortController();
const connection = await this.db.connection.findUnique({
const { connection: { config: rawConnectionConfig, orgId } } = await this.db.connectionSyncJob.update({
where: {
id: job.data.connectionId,
},
});
if (!connection) {
const e = new BackendException(BackendError.CONNECTION_SYNC_CONNECTION_NOT_FOUND, {
message: `Connection ${job.data.connectionId} not found`,
});
Sentry.captureException(e);
throw e;
}
// Reset the syncStatusMetadata to an empty object at the start of the sync job
await this.db.connection.update({
where: {
id: job.data.connectionId,
id: jobId,
},
data: {
syncStatus: ConnectionSyncStatus.SYNCING,
syncStatusMetadata: {}
status: ConnectionSyncJobStatus.IN_PROGRESS,
},
select: {
connection: {
select: {
config: true,
orgId: true,
}
})
}
},
});
const config = rawConnectionConfig as unknown as ConnectionConfig;
let result: {
repoData: RepoData[],
@ -182,7 +209,7 @@ export class ConnectionManager {
}
})();
} catch (err) {
this.logger.error(`Failed to compile repo data for connection ${job.data.connectionId} (${connectionName}): ${err}`);
logger.error(`Failed to compile repo data for connection ${job.data.connectionId} (${connectionName}): ${err}`);
Sentry.captureException(err);
if (err instanceof BackendException) {
@ -194,18 +221,7 @@ export class ConnectionManager {
}
}
let { repoData, notFound } = result;
// Push the information regarding not found users, orgs, and repos to the connection's syncStatusMetadata. Note that
// this won't be overwritten even if the connection job fails
await this.db.connection.update({
where: {
id: job.data.connectionId,
},
data: {
syncStatusMetadata: { notFound }
}
});
let { repoData } = result;
// Filter out any duplicates by external_id and external_codeHostUrl.
repoData = repoData.filter((repo, index, self) => {
@ -233,7 +249,7 @@ export class ConnectionManager {
}
});
const deleteDuration = performance.now() - deleteStart;
this.logger.info(`Deleted all RepoToConnection records for connection ${connectionName} (id: ${job.data.connectionId}) in ${deleteDuration}ms`);
logger.info(`Deleted all RepoToConnection records for connection ${connectionName} (id: ${job.data.connectionId}) in ${deleteDuration}ms`);
const totalUpsertStart = performance.now();
for (const repo of repoData) {
@ -250,10 +266,10 @@ export class ConnectionManager {
create: repo,
})
const upsertDuration = performance.now() - upsertStart;
this.logger.debug(`Upserted repo ${repo.displayName} (id: ${repo.external_id}) in ${upsertDuration}ms`);
logger.debug(`Upserted repo ${repo.displayName} (id: ${repo.external_id}) in ${upsertDuration}ms`);
}
const totalUpsertDuration = performance.now() - totalUpsertStart;
this.logger.info(`Upserted ${repoData.length} repos for connection ${connectionName} (id: ${job.data.connectionId}) in ${totalUpsertDuration}ms`);
logger.info(`Upserted ${repoData.length} repos for connection ${connectionName} (id: ${job.data.connectionId}) in ${totalUpsertDuration}ms`);
}, { timeout: env.CONNECTION_MANAGER_UPSERT_TIMEOUT_MS });
return {
@ -262,32 +278,23 @@ export class ConnectionManager {
}
private async onSyncJobCompleted(job: Job<JobPayload>, result: JobResult) {
this.logger.info(`Connection sync job for connection ${job.data.connectionName} (id: ${job.data.connectionId}, jobId: ${job.id}) completed`);
private onJobCompleted = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => {
const logger = createJobLogger(job.id);
const { connectionId, orgId } = job.data;
let syncStatusMetadata: Record<string, unknown> = (await this.db.connection.findUnique({
where: { id: connectionId },
select: { syncStatusMetadata: true }
}))?.syncStatusMetadata as Record<string, unknown> ?? {};
const { notFound } = syncStatusMetadata as {
notFound: {
users: string[],
orgs: string[],
repos: string[],
}
};
await this.db.connection.update({
await this.db.connectionSyncJob.update({
where: {
id: connectionId,
id: job.id,
},
data: {
syncStatus:
notFound.users.length > 0 ||
notFound.orgs.length > 0 ||
notFound.repos.length > 0 ? ConnectionSyncStatus.SYNCED_WITH_WARNINGS : ConnectionSyncStatus.SYNCED,
syncedAt: new Date()
status: ConnectionSyncJobStatus.COMPLETED,
completedAt: new Date(),
connection: {
update: {
syncedAt: new Date(),
}
}
}
});
@ -303,65 +310,75 @@ export class ConnectionManager {
contexts: config.contexts,
});
} catch (err) {
this.logger.error(`Failed to sync search contexts for connection ${connectionId}: ${err}`);
logger.error(`Failed to sync search contexts for connection ${connectionId}: ${err}`);
Sentry.captureException(err);
}
}
logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`);
captureEvent('backend_connection_sync_job_completed', {
connectionId: connectionId,
repoCount: result.repoCount,
});
}
private async onSyncJobFailed(job: Job<JobPayload> | undefined, err: unknown) {
this.logger.info(`Connection sync job for connection ${job?.data.connectionName} (id: ${job?.data.connectionId}, jobId: ${job?.id}) failed with error: ${err}`);
Sentry.captureException(err, {
tags: {
connectionid: job?.data.connectionId,
jobId: job?.id,
queue: QUEUE_NAME,
}
// captureEvent('backend_connection_sync_job_completed', {
// connectionId: connectionId,
// repoCount: result.repoCount,
// });
});
if (job) {
const { connectionId } = job.data;
private onJobFailed = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobFailed', logger, async () => {
const logger = createJobLogger(job.id);
captureEvent('backend_connection_sync_job_failed', {
connectionId: connectionId,
error: err instanceof BackendException ? err.code : 'UNKNOWN',
});
const attempt = job.attemptsMade + 1;
const wasLastAttempt = attempt >= job.opts.attempts;
// We may have pushed some metadata during the execution of the job, so we make sure to not overwrite the metadata here
let syncStatusMetadata: Record<string, unknown> = (await this.db.connection.findUnique({
where: { id: connectionId },
select: { syncStatusMetadata: true }
}))?.syncStatusMetadata as Record<string, unknown> ?? {};
if (err instanceof BackendException) {
syncStatusMetadata = {
...syncStatusMetadata,
error: err.code,
...err.metadata,
}
} else {
syncStatusMetadata = {
...syncStatusMetadata,
error: 'UNKNOWN',
}
}
await this.db.connection.update({
where: {
id: connectionId,
},
if (wasLastAttempt) {
const { connection } = await this.db.connectionSyncJob.update({
where: { id: job.id },
data: {
syncStatus: ConnectionSyncStatus.FAILED,
syncStatusMetadata: syncStatusMetadata as Prisma.InputJsonValue,
status: ConnectionSyncJobStatus.FAILED,
completedAt: new Date(),
errorMessage: job.failedReason,
},
select: {
connection: true,
}
});
logger.error(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`);
} else {
const connection = await this.db.connection.findUniqueOrThrow({
where: { id: job.data.connectionId },
});
logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
}
// captureEvent('backend_connection_sync_job_failed', {
// connectionId: connectionId,
// error: err instanceof BackendException ? err.code : 'UNKNOWN',
// });
});
private onJobStalled = async (jobId: string) =>
groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => {
const logger = createJobLogger(jobId);
const { connection } = await this.db.connectionSyncJob.update({
where: { id: jobId },
data: {
status: ConnectionSyncJobStatus.FAILED,
completedAt: new Date(),
errorMessage: 'Job stalled',
},
select: {
connection: true,
}
});
logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`);
});
private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Connection syncer worker error.`, error);
}
public async dispose() {

View file

@ -132,15 +132,20 @@ model Connection {
isDeclarative Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
/// When the connection was last synced successfully.
syncedAt DateTime?
repos RepoToConnection[]
/// @deprecated
syncStatus ConnectionSyncStatus @default(SYNC_NEEDED)
/// @deprecated
syncStatusMetadata Json?
// The type of connection (e.g., github, gitlab, etc.)
connectionType String
syncJobs ConnectionSyncJob[]
/// When the connection was last synced successfully.
syncedAt DateTime?
// The organization that owns this connection
org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
orgId Int
@ -148,6 +153,27 @@ model Connection {
@@unique([name, orgId])
}
enum ConnectionSyncJobStatus {
PENDING
IN_PROGRESS
COMPLETED
FAILED
}
model ConnectionSyncJob {
id String @id @default(cuid())
status ConnectionSyncJobStatus @default(PENDING)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
completedAt DateTime?
warningMessages String[]
errorMessage String?
connection Connection @relation(fields: [connectionId], references: [id], onDelete: Cascade)
connectionId Int
}
model RepoToConnection {
addedAt DateTime @default(now())