This commit is contained in:
bkellam 2025-11-08 16:36:57 -08:00
parent 1be6e8842e
commit b324f88144
5 changed files with 30 additions and 8 deletions

View file

@ -7,7 +7,7 @@ import { Job, Queue, ReservedJob, Worker } from "groupmq";
import { Redis } from 'ioredis';
import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js";
import { Settings } from "./types.js";
import { groupmqLifecycleExceptionWrapper } from "./utils.js";
import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js";
@ -66,7 +66,7 @@ export class ConnectionManager {
public startScheduler() {
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
this.interval = setIntervalAsync(async () => {
const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs);
const timeoutDate = new Date(Date.now() - JOB_TIMEOUT_MS);

View file

@ -7,6 +7,7 @@ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
import { createOctokitFromToken, getReposForAuthenticatedUser } from "../github.js";
import { createGitLabFromOAuthToken, getProjectsForAuthenticatedUser } from "../gitlab.js";
import { Settings } from "../types.js";
import { setIntervalAsync } from "../utils.js";
const LOG_TAG = 'user-permission-syncer';
const logger = createLogger(LOG_TAG);
@ -46,7 +47,7 @@ export class AccountPermissionSyncer {
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
this.interval = setIntervalAsync(async () => {
const thresholdDate = new Date(Date.now() - this.settings.experiment_userDrivenPermissionSyncIntervalMs);
const accounts = await this.db.account.findMany({

View file

@ -8,7 +8,7 @@ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
import { createOctokitFromToken, getRepoCollaborators, GITHUB_CLOUD_HOSTNAME } from "../github.js";
import { createGitLabFromPersonalAccessToken, getProjectMembers } from "../gitlab.js";
import { Settings } from "../types.js";
import { getAuthCredentialsForRepo } from "../utils.js";
import { getAuthCredentialsForRepo, setIntervalAsync } from "../utils.js";
type RepoPermissionSyncJob = {
jobId: string;
@ -48,7 +48,7 @@ export class RepoPermissionSyncer {
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
this.interval = setIntervalAsync(async () => {
// @todo: make this configurable
const thresholdDate = new Date(Date.now() - this.settings.experiment_repoDrivenPermissionSyncIntervalMs);

View file

@ -12,7 +12,7 @@ import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName,
import { captureEvent } from './posthog.js';
import { PromClient } from './promClient.js';
import { RepoWithConnections, Settings } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure, setIntervalAsync } from './utils.js';
import { indexGitRepository } from './zoekt.js';
const LOG_TAG = 'repo-index-manager';
@ -72,9 +72,9 @@ export class RepoIndexManager {
this.worker.on('error', this.onWorkerError.bind(this));
}
public async startScheduler() {
public startScheduler() {
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
this.interval = setIntervalAsync(async () => {
await this.scheduleIndexJobs();
await this.scheduleCleanupJobs();
}, this.settings.reindexRepoPollingIntervalMs);

View file

@ -268,3 +268,24 @@ export const groupmqLifecycleExceptionWrapper = async (name: string, logger: Log
}
}
// setInterval wrapper that ensures async callbacks are not executed concurrently.
// @see: https://mottaquikarim.github.io/dev/posts/setinterval-that-blocks-on-await/
export const setIntervalAsync = (target: () => Promise<void>, pollingIntervalMs: number): NodeJS.Timeout => {
const setIntervalWithPromise = <T extends (...args: any[]) => Promise<any>>(
target: T
): (...args: Parameters<T>) => Promise<void> => {
return async function (...args: Parameters<T>): Promise<void> {
if ((target as any).isRunning) return;
(target as any).isRunning = true;
await target(...args);
(target as any).isRunning = false;
};
}
return setInterval(
setIntervalWithPromise(target),
pollingIntervalMs
);
}