renames + add abortSignal

This commit is contained in:
bkellam 2025-10-15 11:23:34 -07:00
parent d315292fe7
commit 5fe554e7da
6 changed files with 118 additions and 606 deletions

View file

@ -10,7 +10,7 @@ type onProgressFn = (event: SimpleGitProgressEvent) => void;
* Creates a simple-git client that has it's working directory
* set to the given path.
*/
const createGitClientForPath = (path: string, onProgress?: onProgressFn) => {
const createGitClientForPath = (path: string, onProgress?: onProgressFn, signal?: AbortSignal) => {
if (!existsSync(path)) {
throw new Error(`Path ${path} does not exist`);
}
@ -19,6 +19,7 @@ const createGitClientForPath = (path: string, onProgress?: onProgressFn) => {
const git = simpleGit({
progress: onProgress,
abort: signal,
})
.env({
...process.env,
@ -48,17 +49,19 @@ export const cloneRepository = async (
authHeader,
path,
onProgress,
signal,
}: {
cloneUrl: string,
authHeader?: string,
path: string,
onProgress?: onProgressFn
signal?: AbortSignal
}
) => {
try {
await mkdir(path, { recursive: true });
const git = createGitClientForPath(path, onProgress);
const git = createGitClientForPath(path, onProgress, signal);
const cloneArgs = [
"--bare",
@ -67,7 +70,11 @@ export const cloneRepository = async (
await git.clone(cloneUrl, path, cloneArgs);
await unsetGitConfig(path, ["remote.origin.url"]);
await unsetGitConfig({
path,
keys: ["remote.origin.url"],
signal,
});
} catch (error: unknown) {
const baseLog = `Failed to clone repository: ${path}`;
@ -88,15 +95,17 @@ export const fetchRepository = async (
authHeader,
path,
onProgress,
signal,
}: {
cloneUrl: string,
authHeader?: string,
path: string,
onProgress?: onProgressFn
onProgress?: onProgressFn,
signal?: AbortSignal
}
) => {
try {
const git = createGitClientForPath(path, onProgress);
const git = createGitClientForPath(path, onProgress, signal);
if (authHeader) {
await git.addConfig("http.extraHeader", authHeader);
@ -137,8 +146,19 @@ export const fetchRepository = async (
* that do not exist yet. It will _not_ remove any existing keys that are not
* present in gitConfig.
*/
export const upsertGitConfig = async (path: string, gitConfig: Record<string, string>, onProgress?: onProgressFn) => {
const git = createGitClientForPath(path, onProgress);
export const upsertGitConfig = async (
{
path,
gitConfig,
onProgress,
signal,
}: {
path: string,
gitConfig: Record<string, string>,
onProgress?: onProgressFn,
signal?: AbortSignal
}) => {
const git = createGitClientForPath(path, onProgress, signal);
try {
for (const [key, value] of Object.entries(gitConfig)) {
@ -157,8 +177,19 @@ export const upsertGitConfig = async (path: string, gitConfig: Record<string, st
* Unsets the specified keys in the git config for the repo at the given path.
* If a key is not set, this is a no-op.
*/
export const unsetGitConfig = async (path: string, keys: string[], onProgress?: onProgressFn) => {
const git = createGitClientForPath(path, onProgress);
export const unsetGitConfig = async (
{
path,
keys,
onProgress,
signal,
}: {
path: string,
keys: string[],
onProgress?: onProgressFn,
signal?: AbortSignal
}) => {
const git = createGitClientForPath(path, onProgress, signal);
try {
const configList = await git.listConfig();
@ -181,8 +212,16 @@ export const unsetGitConfig = async (path: string, keys: string[], onProgress?:
/**
* Returns true if `path` is the _root_ of a git repository.
*/
export const isPathAValidGitRepoRoot = async (path: string, onProgress?: onProgressFn) => {
const git = createGitClientForPath(path, onProgress);
export const isPathAValidGitRepoRoot = async ({
path,
onProgress,
signal,
}: {
path: string,
onProgress?: onProgressFn,
signal?: AbortSignal
}) => {
const git = createGitClientForPath(path, onProgress, signal);
try {
return git.checkIsRepo(CheckRepoActions.IS_REPO_ROOT);

View file

@ -11,9 +11,8 @@ import { DEFAULT_SETTINGS, INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js";
import { env } from "./env.js";
import { IndexSyncer } from "./indexSyncer.js";
import { RepoIndexManager } from "./repoIndexManager.js";
import { PromClient } from './promClient.js';
import { RepoManager } from './repoManager.js';
const logger = createLogger('backend-entrypoint');
@ -60,16 +59,12 @@ const promClient = new PromClient();
const settings = await getSettings(env.CONFIG_PATH);
const connectionManager = new ConnectionManager(prisma, settings, redis);
const repoManager = new RepoManager(prisma, settings, redis, promClient);
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
const indexSyncer = new IndexSyncer(prisma, settings, redis);
// await repoManager.validateIndexedReposHaveShards();
const repoIndexManager = new RepoIndexManager(prisma, settings, redis);
connectionManager.startScheduler();
// repoManager.startScheduler();
indexSyncer.startScheduler();
repoIndexManager.startScheduler();
if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && !hasEntitlement('permission-syncing')) {
logger.error('Permission syncing is not supported in current plan. Please contact team@sourcebot.dev for assistance.');
@ -88,8 +83,7 @@ const cleanup = async (signal: string) => {
try {
await Promise.race([
Promise.all([
indexSyncer.dispose(),
repoManager.dispose(),
repoIndexManager.dispose(),
connectionManager.dispose(),
repoPermissionSyncer.dispose(),
userPermissionSyncer.dispose(),

View file

@ -497,7 +497,9 @@ export const compileGenericGitHostConfig_file = async (
};
await Promise.all(repoPaths.map(async (repoPath) => {
const isGitRepo = await isPathAValidGitRepoRoot(repoPath);
const isGitRepo = await isPathAValidGitRepoRoot({
path: repoPath,
});
if (!isGitRepo) {
logger.warn(`Skipping ${repoPath} - not a git repository.`);
notFound.repos.push(repoPath);

View file

@ -12,7 +12,7 @@ import { repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
import { indexGitRepository } from './zoekt.js';
const LOG_TAG = 'index-syncer';
const LOG_TAG = 'repo-index-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
@ -25,7 +25,18 @@ type JobPayload = {
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout
export class IndexSyncer {
/**
* Manages the lifecycle of repository data on disk, including git working copies
* and search index shards. Handles both indexing operations (cloning/fetching repos
* and building search indexes) and cleanup operations (removing orphaned repos and
* their associated data).
*
* Uses a job queue system to process indexing and cleanup tasks asynchronously,
* with configurable concurrency limits and retry logic. Automatically schedules
* re-indexing of repos based on configured intervals and manages garbage collection
* of repos that are no longer connected to any source.
*/
export class RepoIndexManager {
private interval?: NodeJS.Timeout;
private queue: Queue<JobPayload>;
private worker: Worker<JobPayload>;
@ -37,7 +48,7 @@ export class IndexSyncer {
) {
this.queue = new Queue<JobPayload>({
redis,
namespace: 'index-sync-queue',
namespace: 'repo-index-queue',
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
@ -210,6 +221,7 @@ export class IndexSyncer {
const logger = createJobLogger(id);
logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
const { repo, type: jobType } = await this.db.repoJob.update({
where: {
id,
@ -231,14 +243,28 @@ export class IndexSyncer {
}
});
if (jobType === RepoJobType.INDEX) {
await this.indexRepository(repo, logger);
} else if (jobType === RepoJobType.CLEANUP) {
await this.cleanupRepository(repo, logger);
const abortController = new AbortController();
const signalHandler = () => {
logger.info(`Received shutdown signal, aborting...`);
abortController.abort(); // This cancels all operations
};
process.on('SIGTERM', signalHandler);
process.on('SIGINT', signalHandler);
try {
if (jobType === RepoJobType.INDEX) {
await this.indexRepository(repo, logger, abortController.signal);
} else if (jobType === RepoJobType.CLEANUP) {
await this.cleanupRepository(repo, logger);
}
} finally {
process.off('SIGTERM', signalHandler);
process.off('SIGINT', signalHandler);
}
}
private async indexRepository(repo: RepoWithConnections, logger: Logger) {
private async indexRepository(repo: RepoWithConnections, logger: Logger, signal: AbortSignal) {
const { path: repoPath, isReadOnly } = getRepoPath(repo);
const metadata = repoMetadataSchema.parse(repo.metadata);
@ -250,9 +276,16 @@ export class IndexSyncer {
// If the repo path exists but it is not a valid git repository root, this indicates
// that the repository is in a bad state. To fix, we remove the directory and perform
// a fresh clone.
if (existsSync(repoPath) && !(await isPathAValidGitRepoRoot(repoPath)) && !isReadOnly) {
logger.warn(`${repoPath} is not a valid git repository root. Deleting directory and performing fresh clone.`);
await rm(repoPath, { recursive: true, force: true });
if (existsSync(repoPath) && !(await isPathAValidGitRepoRoot( { path: repoPath } ))) {
const isValidGitRepo = await isPathAValidGitRepoRoot({
path: repoPath,
signal,
});
if (!isValidGitRepo && !isReadOnly) {
logger.warn(`${repoPath} is not a valid git repository root. Deleting directory and performing fresh clone.`);
await rm(repoPath, { recursive: true, force: true });
}
}
if (existsSync(repoPath) && !isReadOnly) {
@ -262,7 +295,11 @@ export class IndexSyncer {
// to unset this key since it is no longer needed, hence this line.
// This will no-op if the key is already unset.
// @see: https://github.com/sourcebot-dev/sourcebot/pull/483
await unsetGitConfig(repoPath, ["remote.origin.url"]);
await unsetGitConfig({
path: repoPath,
keys: ["remote.origin.url"],
signal,
});
logger.info(`Fetching ${repo.name} (id: ${repo.id})...`);
const { durationMs } = await measure(() => fetchRepository({
@ -271,7 +308,8 @@ export class IndexSyncer {
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`)
}
},
signal,
}));
const fetchDuration_s = durationMs / 1000;
@ -287,7 +325,8 @@ export class IndexSyncer {
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`)
}
},
signal
}));
const cloneDuration_s = durationMs / 1000;
@ -299,11 +338,15 @@ export class IndexSyncer {
// This ensures that the git config is always up to date for whatever we
// have in the DB.
if (metadata.gitConfig && !isReadOnly) {
await upsertGitConfig(repoPath, metadata.gitConfig);
await upsertGitConfig({
path: repoPath,
gitConfig: metadata.gitConfig,
signal,
});
}
logger.info(`Indexing ${repo.name} (id: ${repo.id})...`);
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings));
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, signal));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.name} (id: ${repo.id}) in ${indexDuration_s}s`);
}

View file

@ -1,566 +0,0 @@
import * as Sentry from "@sentry/node";
import { PrismaClient, Repo, RepoIndexingStatus, StripeSubscriptionStatus } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger";
import { Job, Queue, Worker } from 'bullmq';
import { existsSync, promises, readdirSync } from 'fs';
import { Redis } from 'ioredis';
import { INDEX_CACHE_DIR } from "./constants.js";
import { env } from './env.js';
import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from "./git.js";
import { PromClient } from './promClient.js';
import { RepoWithConnections, Settings, repoMetadataSchema } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from "./utils.js";
import { indexGitRepository } from "./zoekt.js";
const REPO_INDEXING_QUEUE = 'repoIndexingQueue';
const REPO_GC_QUEUE = 'repoGarbageCollectionQueue';
type RepoIndexingPayload = {
repo: RepoWithConnections,
}
type RepoGarbageCollectionPayload = {
repo: Repo,
}
const logger = createLogger('repo-manager');
export class RepoManager {
private indexWorker: Worker;
private indexQueue: Queue<RepoIndexingPayload>;
private gcWorker: Worker;
private gcQueue: Queue<RepoGarbageCollectionPayload>;
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private promClient: PromClient,
) {
// Repo indexing
this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, {
connection: redis,
});
this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), {
connection: redis,
concurrency: this.settings.maxRepoIndexingJobConcurrency,
});
this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this));
this.indexWorker.on('failed', this.onIndexJobFailed.bind(this));
// Garbage collection
this.gcQueue = new Queue<RepoGarbageCollectionPayload>(REPO_GC_QUEUE, {
connection: redis,
});
this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), {
connection: redis,
concurrency: this.settings.maxRepoGarbageCollectionJobConcurrency,
});
this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this));
this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this));
}
public startScheduler() {
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
await this.fetchAndScheduleRepoIndexing();
await this.fetchAndScheduleRepoGarbageCollection();
await this.fetchAndScheduleRepoTimeouts();
}, this.settings.reindexRepoPollingIntervalMs);
}
///////////////////////////
// Repo indexing
///////////////////////////
private async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }
});
const reposByOrg = repos.reduce<Record<number, RepoWithConnections[]>>((acc, repo) => {
if (!acc[repo.orgId]) {
acc[repo.orgId] = [];
}
acc[repo.orgId].push(repo);
return acc;
}, {});
for (const orgId in reposByOrg) {
const orgRepos = reposByOrg[orgId];
// Set priority based on number of repos (more repos = lower priority)
// This helps prevent large orgs from overwhelming the indexQueue
const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152);
await this.indexQueue.addBulk(orgRepos.map(repo => ({
name: 'repoIndexJob',
data: { repo },
opts: {
priority: priority,
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
removeOnFail: env.REDIS_REMOVE_ON_FAIL,
},
})));
// Increment pending jobs counter for each repo added
orgRepos.forEach(repo => {
this.promClient.pendingRepoIndexingJobs.inc({ repo: repo.id.toString() });
});
logger.info(`Added ${orgRepos.length} jobs to indexQueue for org ${orgId} with priority ${priority}`);
}
}).catch((err: unknown) => {
logger.error(`Failed to add jobs to indexQueue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
});
}
private async fetchAndScheduleRepoIndexing() {
const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs);
const repos = await this.db.repo.findMany({
where: {
OR: [
// "NEW" is really a misnomer here - it just means that the repo needs to be indexed
// immediately. In most cases, this will be because the repo was just created and
// is indeed "new". However, it could also be that a "retry" was requested on a failed
// index. So, we don't want to block on the indexedAt timestamp here.
{
repoIndexingStatus: RepoIndexingStatus.NEW,
},
// When the repo has already been indexed, we only want to reindex if the reindexing
// interval has elapsed (or if the date isn't set for some reason).
{
AND: [
{ repoIndexingStatus: RepoIndexingStatus.INDEXED },
{
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
]
}
]
}
]
},
include: {
connections: {
include: {
connection: true
}
}
}
});
if (repos.length > 0) {
await this.scheduleRepoIndexingBulk(repos);
}
}
private async syncGitRepository(repo: RepoWithConnections, repoAlreadyInIndexingState: boolean) {
const { path: repoPath, isReadOnly } = getRepoPath(repo);
const metadata = repoMetadataSchema.parse(repo.metadata);
// If the repo was already in the indexing state, this job was likely killed and picked up again. As a result,
// to ensure the repo state is valid, we delete the repo if it exists so we get a fresh clone
if (repoAlreadyInIndexingState && existsSync(repoPath) && !isReadOnly) {
logger.info(`Deleting repo directory ${repoPath} during sync because it was already in the indexing state`);
await promises.rm(repoPath, { recursive: true, force: true });
}
const credentials = await getAuthCredentialsForRepo(repo, this.db);
const cloneUrlMaybeWithToken = credentials?.cloneUrlWithToken ?? repo.cloneUrl;
const authHeader = credentials?.authHeader ?? undefined;
if (existsSync(repoPath) && !isReadOnly) {
// @NOTE: in #483, we changed the cloning method s.t., we _no longer_
// write the clone URL (which could contain a auth token) to the
// `remote.origin.url` entry. For the upgrade scenario, we want
// to unset this key since it is no longer needed, hence this line.
// This will no-op if the key is already unset.
// @see: https://github.com/sourcebot-dev/sourcebot/pull/483
await unsetGitConfig(repoPath, ["remote.origin.url"]);
logger.info(`Fetching ${repo.displayName}...`);
const { durationMs } = await measure(() => fetchRepository({
cloneUrl: cloneUrlMaybeWithToken,
authHeader,
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
}
}));
const fetchDuration_s = durationMs / 1000;
process.stdout.write('\n');
logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`);
} else if (!isReadOnly) {
logger.info(`Cloning ${repo.displayName}...`);
const { durationMs } = await measure(() => cloneRepository({
cloneUrl: cloneUrlMaybeWithToken,
authHeader,
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
}
}));
const cloneDuration_s = durationMs / 1000;
process.stdout.write('\n');
logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`);
}
// Regardless of clone or fetch, always upsert the git config for the repo.
// This ensures that the git config is always up to date for whatever we
// have in the DB.
if (metadata.gitConfig && !isReadOnly) {
await upsertGitConfig(repoPath, metadata.gitConfig);
}
logger.info(`Indexing ${repo.displayName}...`);
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`);
}
private async runIndexJob(job: Job<RepoIndexingPayload>) {
logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.displayName}`);
const repo = job.data.repo as RepoWithConnections;
// We have to use the existing repo object to get the repoIndexingStatus because the repo object
// inside the job is unchanged from when it was added to the queue.
const existingRepo = await this.db.repo.findUnique({
where: {
id: repo.id,
},
});
if (!existingRepo) {
logger.error(`Repo ${repo.id} not found`);
const e = new Error(`Repo ${repo.id} not found`);
Sentry.captureException(e);
throw e;
}
const repoAlreadyInIndexingState = existingRepo.repoIndexingStatus === RepoIndexingStatus.INDEXING;
await this.db.repo.update({
where: {
id: repo.id,
},
data: {
repoIndexingStatus: RepoIndexingStatus.INDEXING,
}
});
this.promClient.activeRepoIndexingJobs.inc();
this.promClient.pendingRepoIndexingJobs.dec({ repo: repo.id.toString() });
let attempts = 0;
const maxAttempts = 3;
while (attempts < maxAttempts) {
try {
await this.syncGitRepository(repo, repoAlreadyInIndexingState);
break;
} catch (error) {
Sentry.captureException(error);
attempts++;
this.promClient.repoIndexingReattemptsTotal.inc();
if (attempts === maxAttempts) {
logger.error(`Failed to sync repository ${repo.name} (id: ${repo.id}) after ${maxAttempts} attempts. Error: ${error}`);
throw error;
}
const sleepDuration = (env.REPO_SYNC_RETRY_BASE_SLEEP_SECONDS * 1000) * Math.pow(2, attempts - 1);
logger.error(`Failed to sync repository ${repo.name} (id: ${repo.id}), attempt ${attempts}/${maxAttempts}. Sleeping for ${sleepDuration / 1000}s... Error: ${error}`);
await new Promise(resolve => setTimeout(resolve, sleepDuration));
}
}
}
private async onIndexJobCompleted(job: Job<RepoIndexingPayload>) {
logger.info(`Repo index job for repo ${job.data.repo.displayName} (id: ${job.data.repo.id}, jobId: ${job.id}) completed`);
this.promClient.activeRepoIndexingJobs.dec();
this.promClient.repoIndexingSuccessTotal.inc();
await this.db.repo.update({
where: {
id: job.data.repo.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.INDEXED,
}
});
}
private async onIndexJobFailed(job: Job<RepoIndexingPayload> | undefined, err: unknown) {
logger.info(`Repo index job for repo ${job?.data.repo.displayName} (id: ${job?.data.repo.id}, jobId: ${job?.id}) failed with error: ${err}`);
Sentry.captureException(err, {
tags: {
repoId: job?.data.repo.id,
jobId: job?.id,
queue: REPO_INDEXING_QUEUE,
}
});
if (job) {
this.promClient.activeRepoIndexingJobs.dec();
this.promClient.repoIndexingFailTotal.inc();
await this.db.repo.update({
where: {
id: job.data.repo.id,
},
data: {
repoIndexingStatus: RepoIndexingStatus.FAILED,
}
})
}
}
///////////////////////////
// Repo garbage collection
///////////////////////////
private async scheduleRepoGarbageCollectionBulk(repos: Repo[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.IN_GC_QUEUE }
});
await this.gcQueue.addBulk(repos.map(repo => ({
name: 'repoGarbageCollectionJob',
data: { repo },
opts: {
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
removeOnFail: env.REDIS_REMOVE_ON_FAIL,
}
})));
logger.info(`Added ${repos.length} jobs to gcQueue`);
});
}
private async fetchAndScheduleRepoGarbageCollection() {
////////////////////////////////////
// Get repos with no connections
////////////////////////////////////
const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs);
const reposWithNoConnections = await this.db.repo.findMany({
where: {
repoIndexingStatus: {
in: [
RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition)
RepoIndexingStatus.FAILED,
]
},
connections: {
none: {}
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } }
]
},
});
if (reposWithNoConnections.length > 0) {
logger.info(`Garbage collecting ${reposWithNoConnections.length} repos with no connections: ${reposWithNoConnections.map(repo => repo.id).join(', ')}`);
}
////////////////////////////////////
// Get inactive org repos
////////////////////////////////////
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const inactiveOrgRepos = await this.db.repo.findMany({
where: {
org: {
stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE,
stripeLastUpdatedAt: {
lt: sevenDaysAgo
}
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } }
]
}
});
if (inactiveOrgRepos.length > 0) {
logger.info(`Garbage collecting ${inactiveOrgRepos.length} inactive org repos: ${inactiveOrgRepos.map(repo => repo.id).join(', ')}`);
}
const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos];
if (reposToDelete.length > 0) {
await this.scheduleRepoGarbageCollectionBulk(reposToDelete);
}
}
private async runGarbageCollectionJob(job: Job<RepoGarbageCollectionPayload>) {
logger.info(`Running garbage collection job (id: ${job.id}) for repo ${job.data.repo.displayName} (id: ${job.data.repo.id})`);
this.promClient.activeRepoGarbageCollectionJobs.inc();
const repo = job.data.repo as Repo;
await this.db.repo.update({
where: {
id: repo.id
},
data: {
repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTING
}
});
// delete cloned repo
const { path: repoPath, isReadOnly } = getRepoPath(repo);
if (existsSync(repoPath) && !isReadOnly) {
logger.info(`Deleting repo directory ${repoPath}`);
await promises.rm(repoPath, { recursive: true, force: true });
}
// delete shards
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
const files = readdirSync(INDEX_CACHE_DIR).filter(file => file.startsWith(shardPrefix));
for (const file of files) {
const filePath = `${INDEX_CACHE_DIR}/${file}`;
logger.info(`Deleting shard file ${filePath}`);
await promises.rm(filePath, { force: true });
}
}
private async onGarbageCollectionJobCompleted(job: Job<RepoGarbageCollectionPayload>) {
logger.info(`Garbage collection job ${job.id} completed`);
this.promClient.activeRepoGarbageCollectionJobs.dec();
this.promClient.repoGarbageCollectionSuccessTotal.inc();
await this.db.repo.delete({
where: {
id: job.data.repo.id
}
});
}
private async onGarbageCollectionJobFailed(job: Job<RepoGarbageCollectionPayload> | undefined, err: unknown) {
logger.info(`Garbage collection job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`);
Sentry.captureException(err, {
tags: {
repoId: job?.data.repo.id,
jobId: job?.id,
queue: REPO_GC_QUEUE,
}
});
if (job) {
this.promClient.activeRepoGarbageCollectionJobs.dec();
this.promClient.repoGarbageCollectionFailTotal.inc();
await this.db.repo.update({
where: {
id: job.data.repo.id
},
data: {
repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTION_FAILED
}
});
}
}
///////////////////////////
// Repo index validation
///////////////////////////
public async validateIndexedReposHaveShards() {
logger.info('Validating indexed repos have shards...');
const indexedRepos = await this.db.repo.findMany({
where: {
repoIndexingStatus: RepoIndexingStatus.INDEXED
}
});
logger.info(`Found ${indexedRepos.length} repos in the DB marked as INDEXED`);
if (indexedRepos.length === 0) {
return;
}
const files = readdirSync(INDEX_CACHE_DIR);
const reposToReindex: number[] = [];
for (const repo of indexedRepos) {
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
// TODO: this doesn't take into account if a repo has multiple shards and only some of them are missing. To support that, this logic
// would need to know how many total shards are expected for this repo
let hasShards = false;
try {
hasShards = files.some(file => file.startsWith(shardPrefix));
} catch (error) {
logger.error(`Failed to read index directory ${INDEX_CACHE_DIR}: ${error}`);
continue;
}
if (!hasShards) {
logger.info(`Repo ${repo.displayName} (id: ${repo.id}) is marked as INDEXED but has no shards on disk. Marking for reindexing.`);
reposToReindex.push(repo.id);
}
}
if (reposToReindex.length > 0) {
await this.db.repo.updateMany({
where: {
id: { in: reposToReindex }
},
data: {
repoIndexingStatus: RepoIndexingStatus.NEW
}
});
logger.info(`Marked ${reposToReindex.length} repos for reindexing due to missing shards`);
}
logger.info('Done validating indexed repos have shards');
}
private async fetchAndScheduleRepoTimeouts() {
const repos = await this.db.repo.findMany({
where: {
repoIndexingStatus: RepoIndexingStatus.INDEXING,
updatedAt: {
lt: new Date(Date.now() - this.settings.repoIndexTimeoutMs)
}
}
});
if (repos.length > 0) {
logger.info(`Scheduling ${repos.length} repo timeouts`);
await this.scheduleRepoTimeoutsBulk(repos);
}
}
private async scheduleRepoTimeoutsBulk(repos: Repo[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.FAILED }
});
});
}
public async dispose() {
if (this.interval) {
clearInterval(this.interval);
}
await this.indexWorker.close();
await this.indexQueue.close();
await this.gcQueue.close();
await this.gcWorker.close();
}
}

View file

@ -10,7 +10,7 @@ import { getRepoPath, getShardPrefix } from "./utils.js";
const logger = createLogger('zoekt');
export const indexGitRepository = async (repo: Repo, settings: Settings) => {
export const indexGitRepository = async (repo: Repo, settings: Settings, signal?: AbortSignal) => {
let revisions = [
'HEAD'
];
@ -71,7 +71,7 @@ export const indexGitRepository = async (repo: Repo, settings: Settings) => {
].join(' ');
return new Promise<{ stdout: string, stderr: string }>((resolve, reject) => {
exec(command, (error, stdout, stderr) => {
exec(command, { signal }, (error, stdout, stderr) => {
if (error) {
reject(error);
return;