fix(worker): Fix issues with gracefully shutting down (#612)

This commit is contained in:
Brendan Kellam 2025-11-11 20:11:59 -08:00 committed by GitHub
parent 18fad64baa
commit 903d15a2c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 227 additions and 58 deletions

View file

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)
- Fixed connection sync jobs getting stuck in pending or in progress after restarting the worker. [#612](https://github.com/sourcebot-dev/sourcebot/pull/612)
### Added
- Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)

View file

@ -93,8 +93,8 @@ export class ConfigManager {
});
if (connectionNeedsSyncing) {
const [jobId] = await this.connectionManager.createJobs([connection]);
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`);
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Creating sync job.`);
await this.connectionManager.createJobs([connection]);
}
}
}

View file

@ -11,10 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js";
import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js";
const LOG_TAG = 'connection-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
const QUEUE_NAME = 'connection-sync-queue';
type JobPayload = {
jobId: string,
@ -30,19 +32,19 @@ type JobResult = {
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout
export class ConnectionManager {
private worker: Worker;
private worker: Worker<JobPayload>;
private queue: Queue<JobPayload>;
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private redis: Redis,
private promClient: PromClient,
) {
this.queue = new Queue<JobPayload>({
redis,
namespace: 'connection-sync-queue',
namespace: QUEUE_NAME,
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
@ -62,6 +64,10 @@ export class ConnectionManager {
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
// graceful-timeout is triggered when a job is still processing after
// worker.close() is called and the timeout period has elapsed. In this case,
// we fail the job with no retry.
this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
}
public startScheduler() {
@ -128,6 +134,7 @@ export class ConnectionManager {
});
for (const job of jobs) {
logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`);
await this.queue.add({
groupId: `connection:${job.connectionId}`,
data: {
@ -150,6 +157,22 @@ export class ConnectionManager {
const logger = createJobLogger(jobId);
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({
where: {
id: jobId,
},
select: {
status: true,
}
});
// Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
// is in an invalid state and should be skipped.
if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) {
throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
}
this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName });
this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName });
@ -178,7 +201,7 @@ export class ConnectionManager {
const result = await (async () => {
switch (config.type) {
case 'github': {
return await compileGithubConfig(config, job.data.connectionId, abortController);
return await compileGithubConfig(config, job.data.connectionId, abortController.signal);
}
case 'gitlab': {
return await compileGitlabConfig(config, job.data.connectionId);
@ -200,7 +223,7 @@ export class ConnectionManager {
}
}
})();
let { repoData, warnings } = result;
await this.db.connectionSyncJob.update({
@ -383,6 +406,33 @@ export class ConnectionManager {
});
});
private onJobGracefulTimeout = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
const logger = createJobLogger(job.id);
const { connection } = await this.db.connectionSyncJob.update({
where: { id: job.id },
data: {
status: ConnectionSyncJobStatus.FAILED,
completedAt: new Date(),
errorMessage: 'Job timed out',
},
select: {
connection: true,
}
});
this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });
logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`);
captureEvent('backend_connection_sync_job_failed', {
connectionId: connection.id,
error: 'Job timed out',
});
});
private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Connection syncer worker error.`, error);
@ -392,8 +442,28 @@ export class ConnectionManager {
if (this.interval) {
clearInterval(this.interval);
}
await this.worker.close();
await this.queue.close();
const inProgressJobs = this.worker.getCurrentJobs();
await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);
// Manually release group locks for in progress jobs to prevent deadlocks.
// @see: https://github.com/Openpanel-dev/groupmq/issues/8
for (const { job } of inProgressJobs) {
const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`;
logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
try {
await this.redis.del(lockKey);
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error);
}
}
// @note: As of groupmq v1.0.0, queue.close() will just close the underlying
// redis connection. Since we share the same redis client between, skip this
// step and close the redis client directly in index.ts.
// @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900
// await this.queue.close();
}
}

View file

@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [
];
export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
// Maximum time to wait for current job to finish
export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds
// List of shutdown signals
export const SHUTDOWN_SIGNALS: string[] = [
'SIGHUP',
'SIGINT',
'SIGQUIT',
'SIGILL',
'SIGTRAP',
'SIGABRT',
'SIGBUS',
'SIGFPE',
'SIGSEGV',
'SIGUSR2',
'SIGTERM',
// @note: SIGKILL and SIGSTOP cannot have listeners installed.
// @see: https://nodejs.org/api/process.html#signal-events
];

View file

@ -1,5 +1,6 @@
import "./instrument.js";
import * as Sentry from "@sentry/node";
import { PrismaClient } from "@sourcebot/db";
import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
import 'express-async-errors';
@ -9,7 +10,7 @@ import { Redis } from 'ioredis';
import { Api } from "./api.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { GithubAppManager } from "./ee/githubAppManager.js";
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
@ -17,6 +18,7 @@ import { shutdownPosthog } from "./posthog.js";
import { PromClient } from './promClient.js';
import { RepoIndexManager } from "./repoIndexManager.js";
const logger = createLogger('backend-entrypoint');
const reposPath = REPOS_CACHE_DIR;
@ -40,13 +42,14 @@ const prisma = new PrismaClient({
const redis = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null
});
redis.ping().then(() => {
try {
await redis.ping();
logger.info('Connected to redis');
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
logger.error(err);
} catch (err: unknown) {
logger.error('Failed to connect to redis. Error:', err);
process.exit(1);
});
}
const promClient = new PromClient();
@ -83,45 +86,65 @@ const api = new Api(
logger.info('Worker started.');
const cleanup = async (signal: string) => {
logger.info(`Received ${signal}, cleaning up...`);
const listenToShutdownSignals = () => {
const signals = SHUTDOWN_SIGNALS;
const shutdownTimeout = 30000; // 30 seconds
let receivedSignal = false;
try {
await Promise.race([
Promise.all([
repoIndexManager.dispose(),
connectionManager.dispose(),
repoPermissionSyncer.dispose(),
accountPermissionSyncer.dispose(),
configManager.dispose(),
]),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
)
]);
logger.info('All workers shut down gracefully');
} catch (error) {
logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error));
const cleanup = async (signal: string) => {
try {
if (receivedSignal) {
logger.debug(`Recieved repeat signal ${signal}, ignoring.`);
return;
}
receivedSignal = true;
logger.info(`Received ${signal}, cleaning up...`);
await repoIndexManager.dispose()
await connectionManager.dispose()
await repoPermissionSyncer.dispose()
await accountPermissionSyncer.dispose()
await configManager.dispose()
await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();
logger.info('All workers shut down gracefully');
signals.forEach(sig => process.removeListener(sig, cleanup));
} catch (error) {
Sentry.captureException(error);
logger.error('Error shutting down worker:', error);
}
}
await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();
signals.forEach(signal => {
process.on(signal, (err) => {
cleanup(err).finally(() => {
process.kill(process.pid, signal);
});
});
});
// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => {
process.exit(1);
});
});
process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => {
process.exit(1);
});
});
}
process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0)));
process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0)));
// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => process.exit(1));
});
process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => process.exit(1));
});
listenToShutdownSignals();

View file

@ -39,8 +39,8 @@ type CompileResult = {
export const compileGithubConfig = async (
config: GithubConnectionConfig,
connectionId: number,
abortController: AbortController): Promise<CompileResult> => {
const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal);
signal: AbortSignal): Promise<CompileResult> => {
const gitHubReposResult = await getGitHubReposFromConfig(config, signal);
const gitHubRepos = gitHubReposResult.repos;
const warnings = gitHubReposResult.warnings;

View file

@ -7,7 +7,7 @@ import { readdir, rm } from 'fs/promises';
import { Job, Queue, ReservedJob, Worker } from "groupmq";
import { Redis } from 'ioredis';
import micromatch from 'micromatch';
import { INDEX_CACHE_DIR } from './constants.js';
import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS, INDEX_CACHE_DIR } from './constants.js';
import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName, getTags, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
import { captureEvent } from './posthog.js';
import { PromClient } from './promClient.js';
@ -45,7 +45,7 @@ export class RepoIndexManager {
constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private redis: Redis,
private promClient: PromClient,
) {
this.queue = new Queue<JobPayload>({
@ -70,6 +70,10 @@ export class RepoIndexManager {
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
// graceful-timeout is triggered when a job is still processing after
// worker.close() is called and the timeout period has elapsed. In this case,
// we fail the job with no retry.
this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
}
public startScheduler() {
@ -230,6 +234,23 @@ export class RepoIndexManager {
const logger = createJobLogger(id);
logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
const currentStatus = await this.db.repoIndexingJob.findUniqueOrThrow({
where: {
id,
},
select: {
status: true,
}
});
// Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
// is in an invalid state and should be skipped.
if (
currentStatus.status !== RepoIndexingJobStatus.PENDING &&
currentStatus.status !== RepoIndexingJobStatus.IN_PROGRESS
) {
throw new Error(`Job ${id} is not in a valid state. Expected: ${RepoIndexingJobStatus.PENDING} or ${RepoIndexingJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
}
const { repo, type: jobType } = await this.db.repoIndexingJob.update({
where: {
@ -540,6 +561,28 @@ export class RepoIndexManager {
logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`);
});
private onJobGracefulTimeout = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
const logger = createJobLogger(job.data.jobId);
const jobTypeLabel = getJobTypePrometheusLabel(job.data.type);
const { repo } = await this.db.repoIndexingJob.update({
where: { id: job.data.jobId },
data: {
status: RepoIndexingJobStatus.FAILED,
completedAt: new Date(),
errorMessage: 'Job timed out',
},
select: { repo: true }
});
this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
this.promClient.repoIndexJobFailTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
logger.error(`Job ${job.data.jobId} timed out for repo ${repo.name} (id: ${repo.id}). Failing job.`);
});
private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Index syncer worker error.`, error);
@ -549,8 +592,20 @@ export class RepoIndexManager {
if (this.interval) {
clearInterval(this.interval);
}
await this.worker.close();
await this.queue.close();
const inProgressJobs = this.worker.getCurrentJobs();
await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);
// Manually release group locks for in progress jobs to prevent deadlocks.
// @see: https://github.com/Openpanel-dev/groupmq/issues/8
for (const { job } of inProgressJobs) {
const lockKey = `groupmq:repo-index-queue:lock:${job.groupId}`;
logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
await this.redis.del(lockKey);
}
// @note: As of groupmq v1.0.0, queue.close() will just close the underlying
// redis connection. Since we share the same redis client between, skip this
// step and close the redis client directly in index.ts.
// await this.queue.close();
}
}