fix(worker): Run setInterval as blocking (#607)
Some checks are pending
Publish to ghcr / build (linux/amd64, blacksmith-4vcpu-ubuntu-2404) (push) Waiting to run
Publish to ghcr / build (linux/arm64, blacksmith-8vcpu-ubuntu-2204-arm) (push) Waiting to run
Publish to ghcr / merge (push) Blocked by required conditions
Update Roadmap Released / update (push) Waiting to run

This commit is contained in:
Brendan Kellam 2025-11-09 14:49:24 -08:00 committed by GitHub
parent 1be6e8842e
commit 6f64d5bb8d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 34 additions and 8 deletions

View file

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609) - Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)
## [4.9.1] - 2025-11-07 ## [4.9.1] - 2025-11-07

View file

@ -7,7 +7,7 @@ import { Job, Queue, ReservedJob, Worker } from "groupmq";
import { Redis } from 'ioredis'; import { Redis } from 'ioredis';
import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js"; import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js";
import { Settings } from "./types.js"; import { Settings } from "./types.js";
import { groupmqLifecycleExceptionWrapper } from "./utils.js"; import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js"; import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js"; import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js"; import { PromClient } from "./promClient.js";
@ -66,7 +66,7 @@ export class ConnectionManager {
public startScheduler() { public startScheduler() {
logger.debug('Starting scheduler'); logger.debug('Starting scheduler');
this.interval = setInterval(async () => { this.interval = setIntervalAsync(async () => {
const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs); const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs);
const timeoutDate = new Date(Date.now() - JOB_TIMEOUT_MS); const timeoutDate = new Date(Date.now() - JOB_TIMEOUT_MS);

View file

@ -7,6 +7,7 @@ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
import { createOctokitFromToken, getReposForAuthenticatedUser } from "../github.js"; import { createOctokitFromToken, getReposForAuthenticatedUser } from "../github.js";
import { createGitLabFromOAuthToken, getProjectsForAuthenticatedUser } from "../gitlab.js"; import { createGitLabFromOAuthToken, getProjectsForAuthenticatedUser } from "../gitlab.js";
import { Settings } from "../types.js"; import { Settings } from "../types.js";
import { setIntervalAsync } from "../utils.js";
const LOG_TAG = 'user-permission-syncer'; const LOG_TAG = 'user-permission-syncer';
const logger = createLogger(LOG_TAG); const logger = createLogger(LOG_TAG);
@ -46,7 +47,7 @@ export class AccountPermissionSyncer {
logger.debug('Starting scheduler'); logger.debug('Starting scheduler');
this.interval = setInterval(async () => { this.interval = setIntervalAsync(async () => {
const thresholdDate = new Date(Date.now() - this.settings.experiment_userDrivenPermissionSyncIntervalMs); const thresholdDate = new Date(Date.now() - this.settings.experiment_userDrivenPermissionSyncIntervalMs);
const accounts = await this.db.account.findMany({ const accounts = await this.db.account.findMany({

View file

@ -8,7 +8,7 @@ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
import { createOctokitFromToken, getRepoCollaborators, GITHUB_CLOUD_HOSTNAME } from "../github.js"; import { createOctokitFromToken, getRepoCollaborators, GITHUB_CLOUD_HOSTNAME } from "../github.js";
import { createGitLabFromPersonalAccessToken, getProjectMembers } from "../gitlab.js"; import { createGitLabFromPersonalAccessToken, getProjectMembers } from "../gitlab.js";
import { Settings } from "../types.js"; import { Settings } from "../types.js";
import { getAuthCredentialsForRepo } from "../utils.js"; import { getAuthCredentialsForRepo, setIntervalAsync } from "../utils.js";
type RepoPermissionSyncJob = { type RepoPermissionSyncJob = {
jobId: string; jobId: string;
@ -48,7 +48,7 @@ export class RepoPermissionSyncer {
logger.debug('Starting scheduler'); logger.debug('Starting scheduler');
this.interval = setInterval(async () => { this.interval = setIntervalAsync(async () => {
// @todo: make this configurable // @todo: make this configurable
const thresholdDate = new Date(Date.now() - this.settings.experiment_repoDrivenPermissionSyncIntervalMs); const thresholdDate = new Date(Date.now() - this.settings.experiment_repoDrivenPermissionSyncIntervalMs);

View file

@ -12,7 +12,7 @@ import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName,
import { captureEvent } from './posthog.js'; import { captureEvent } from './posthog.js';
import { PromClient } from './promClient.js'; import { PromClient } from './promClient.js';
import { RepoWithConnections, Settings } from "./types.js"; import { RepoWithConnections, Settings } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js'; import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure, setIntervalAsync } from './utils.js';
import { indexGitRepository } from './zoekt.js'; import { indexGitRepository } from './zoekt.js';
const LOG_TAG = 'repo-index-manager'; const LOG_TAG = 'repo-index-manager';
@ -72,9 +72,9 @@ export class RepoIndexManager {
this.worker.on('error', this.onWorkerError.bind(this)); this.worker.on('error', this.onWorkerError.bind(this));
} }
public async startScheduler() { public startScheduler() {
logger.debug('Starting scheduler'); logger.debug('Starting scheduler');
this.interval = setInterval(async () => { this.interval = setIntervalAsync(async () => {
await this.scheduleIndexJobs(); await this.scheduleIndexJobs();
await this.scheduleCleanupJobs(); await this.scheduleCleanupJobs();
}, this.settings.reindexRepoPollingIntervalMs); }, this.settings.reindexRepoPollingIntervalMs);

View file

@ -268,3 +268,27 @@ export const groupmqLifecycleExceptionWrapper = async (name: string, logger: Log
} }
} }
// setInterval wrapper that ensures async callbacks are not executed concurrently.
// @see: https://mottaquikarim.github.io/dev/posts/setinterval-that-blocks-on-await/
export const setIntervalAsync = (target: () => Promise<void>, pollingIntervalMs: number): NodeJS.Timeout => {
const setIntervalWithPromise = <T extends (...args: any[]) => Promise<any>>(
target: T
): (...args: Parameters<T>) => Promise<void> => {
return async function (...args: Parameters<T>): Promise<void> {
if ((target as any).isRunning) return;
(target as any).isRunning = true;
try {
await target(...args);
} finally {
(target as any).isRunning = false;
}
};
}
return setInterval(
setIntervalWithPromise(target),
pollingIntervalMs
);
}