This commit is contained in:
bkellam 2025-10-28 20:25:21 -07:00
parent 158bad9268
commit d6783e4371
6 changed files with 69 additions and 5 deletions

View file

@ -11,6 +11,7 @@ import { Settings } from "./types.js";
import { groupmqLifecycleExceptionWrapper } from "./utils.js"; import { groupmqLifecycleExceptionWrapper } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js"; import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js"; import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js";
const LOG_TAG = 'connection-manager'; const LOG_TAG = 'connection-manager';
const logger = createLogger(LOG_TAG); const logger = createLogger(LOG_TAG);
@ -38,6 +39,7 @@ export class ConnectionManager {
private db: PrismaClient, private db: PrismaClient,
private settings: Settings, private settings: Settings,
redis: Redis, redis: Redis,
private promClient: PromClient,
) { ) {
this.queue = new Queue<JobPayload>({ this.queue = new Queue<JobPayload>({
redis, redis,
@ -137,6 +139,8 @@ export class ConnectionManager {
}, },
jobId: job.id, jobId: job.id,
}); });
this.promClient.pendingConnectionSyncJobs.inc({ connection: job.connection.name });
} }
return jobs.map(job => job.id); return jobs.map(job => job.id);
@ -147,6 +151,9 @@ export class ConnectionManager {
const logger = createJobLogger(jobId); const logger = createJobLogger(jobId);
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName });
this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName });
// @note: We aren't actually doing anything with this atm. // @note: We aren't actually doing anything with this atm.
const abortController = new AbortController(); const abortController = new AbortController();
@ -265,7 +272,7 @@ export class ConnectionManager {
private onJobCompleted = async (job: Job<JobPayload>) => private onJobCompleted = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => { groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => {
const logger = createJobLogger(job.id); const logger = createJobLogger(job.id);
const { connectionId, orgId } = job.data; const { connectionId, connectionName, orgId } = job.data;
await this.db.connectionSyncJob.update({ await this.db.connectionSyncJob.update({
where: { where: {
@ -301,6 +308,9 @@ export class ConnectionManager {
logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`); logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`);
this.promClient.activeConnectionSyncJobs.dec({ connection: connectionName });
this.promClient.connectionSyncJobSuccessTotal.inc({ connection: connectionName });
const result = job.returnvalue as JobResult; const result = job.returnvalue as JobResult;
captureEvent('backend_connection_sync_job_completed', { captureEvent('backend_connection_sync_job_completed', {
connectionId: connectionId, connectionId: connectionId,
@ -328,12 +338,17 @@ export class ConnectionManager {
} }
}); });
this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });
logger.error(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`); logger.error(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`);
} else { } else {
const connection = await this.db.connection.findUniqueOrThrow({ const connection = await this.db.connection.findUniqueOrThrow({
where: { id: job.data.connectionId }, where: { id: job.data.connectionId },
}); });
this.promClient.connectionSyncJobReattemptsTotal.inc({ connection: connection.name });
logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`); logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
} }
@ -358,6 +373,9 @@ export class ConnectionManager {
} }
}); });
this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });
logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`); logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`);
captureEvent('backend_connection_sync_job_failed', { captureEvent('backend_connection_sync_job_failed', {

View file

@ -56,8 +56,6 @@ export const env = createEnv({
EXPERIMENT_EE_PERMISSION_SYNC_ENABLED: booleanSchema.default('false'), EXPERIMENT_EE_PERMISSION_SYNC_ENABLED: booleanSchema.default('false'),
AUTH_EE_GITHUB_BASE_URL: z.string().optional(), AUTH_EE_GITHUB_BASE_URL: z.string().optional(),
FORCE_ENABLE_ANONYMOUS_ACCESS: booleanSchema.default('false'),
}, },
runtimeEnv: process.env, runtimeEnv: process.env,
emptyStringAsUndefined: true, emptyStringAsUndefined: true,

View file

@ -50,7 +50,7 @@ if (hasEntitlement('github-app')) {
await GithubAppManager.getInstance().init(prisma); await GithubAppManager.getInstance().init(prisma);
} }
const connectionManager = new ConnectionManager(prisma, settings, redis); const connectionManager = new ConnectionManager(prisma, settings, redis, promClient);
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis); const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis); const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient); const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);

View file

@ -16,6 +16,12 @@ export class PromClient {
public repoIndexJobFailTotal: Counter<string>; public repoIndexJobFailTotal: Counter<string>;
public repoIndexJobSuccessTotal: Counter<string>; public repoIndexJobSuccessTotal: Counter<string>;
public activeConnectionSyncJobs: Gauge<string>;
public pendingConnectionSyncJobs: Gauge<string>;
public connectionSyncJobReattemptsTotal: Counter<string>;
public connectionSyncJobFailTotal: Counter<string>;
public connectionSyncJobSuccessTotal: Counter<string>;
public readonly PORT = 3060; public readonly PORT = 3060;
constructor() { constructor() {
@ -56,6 +62,41 @@ export class PromClient {
}); });
this.registry.registerMetric(this.repoIndexJobSuccessTotal); this.registry.registerMetric(this.repoIndexJobSuccessTotal);
this.activeConnectionSyncJobs = new Gauge({
name: 'active_connection_sync_jobs',
help: 'The number of connection sync jobs in progress',
labelNames: ['connection'],
});
this.registry.registerMetric(this.activeConnectionSyncJobs);
this.pendingConnectionSyncJobs = new Gauge({
name: 'pending_connection_sync_jobs',
help: 'The number of connection sync jobs waiting in queue',
labelNames: ['connection'],
});
this.registry.registerMetric(this.pendingConnectionSyncJobs);
this.connectionSyncJobReattemptsTotal = new Counter({
name: 'connection_sync_job_reattempts',
help: 'The number of connection sync job reattempts',
labelNames: ['connection'],
});
this.registry.registerMetric(this.connectionSyncJobReattemptsTotal);
this.connectionSyncJobFailTotal = new Counter({
name: 'connection_sync_job_fails',
help: 'The number of connection sync job fails',
labelNames: ['connection'],
});
this.registry.registerMetric(this.connectionSyncJobFailTotal);
this.connectionSyncJobSuccessTotal = new Counter({
name: 'connection_sync_job_successes',
help: 'The number of connection sync job successes',
labelNames: ['connection'],
});
this.registry.registerMetric(this.connectionSyncJobSuccessTotal);
client.collectDefaultMetrics({ client.collectDefaultMetrics({
register: this.registry, register: this.registry,
}); });

View file

@ -1,3 +1,7 @@
-- Installs the pgcrypto extension. Required for the gen_random_uuid() function.
-- @see: https://www.prisma.io/docs/orm/prisma-migrate/workflows/native-database-functions#how-to-install-a-postgresql-extension-as-part-of-a-migration
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Ensure single tenant organization exists -- Ensure single tenant organization exists
INSERT INTO "Org" (id, name, domain, "inviteLinkId", "createdAt", "updatedAt") INSERT INTO "Org" (id, name, domain, "inviteLinkId", "createdAt", "updatedAt")
VALUES (1, 'default', '~', gen_random_uuid(), NOW(), NOW()) VALUES (1, 'default', '~', gen_random_uuid(), NOW(), NOW())

View file

@ -50,8 +50,11 @@ export default async function ConnectionsPage() {
} }
const getConnectionsWithLatestJob = async () => sew(() => const getConnectionsWithLatestJob = async () => sew(() =>
withAuthV2(async ({ prisma }) => { withAuthV2(async ({ prisma, org }) => {
const connections = await prisma.connection.findMany({ const connections = await prisma.connection.findMany({
where: {
orgId: org.id,
},
include: { include: {
_count: { _count: {
select: { select: {