sourcebot/packages/backend/src/indexSyncer.ts

423 lines
16 KiB
TypeScript
Raw Normal View History

2025-10-10 03:10:29 +00:00
import { createBullBoard } from '@bull-board/api';
import { ExpressAdapter } from '@bull-board/express';
import * as Sentry from '@sentry/node';
2025-10-15 03:16:43 +00:00
import { PrismaClient, Repo, RepoJobStatus, RepoJobType } from "@sourcebot/db";
import { createLogger, Logger } from "@sourcebot/logger";
2025-10-10 03:10:29 +00:00
import express from 'express';
2025-10-10 19:32:34 +00:00
import { BullBoardGroupMQAdapter, Job, Queue, ReservedJob, Worker } from "groupmq";
2025-10-10 03:10:29 +00:00
import { Redis } from 'ioredis';
2025-10-10 19:32:34 +00:00
import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
2025-10-15 03:16:43 +00:00
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from './utils.js';
2025-10-10 19:32:34 +00:00
import { existsSync } from 'fs';
2025-10-15 03:16:43 +00:00
import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
2025-10-10 19:32:34 +00:00
import { indexGitRepository } from './zoekt.js';
2025-10-15 03:16:43 +00:00
import { rm, readdir } from 'fs/promises';
2025-10-10 03:10:29 +00:00
2025-10-15 03:16:43 +00:00
const LOG_TAG = 'index-syncer';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
2025-10-10 03:10:29 +00:00
2025-10-15 03:16:43 +00:00
type JobPayload = {
type: 'INDEX' | 'CLEANUP';
2025-10-10 03:10:29 +00:00
jobId: string;
2025-10-15 03:16:43 +00:00
repoId: number;
repoName: string;
};
2025-10-10 03:10:29 +00:00
2025-10-10 19:32:34 +00:00
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout
2025-10-10 03:10:29 +00:00
2025-10-15 03:16:43 +00:00
const groupmqLifecycleExceptionWrapper = async (name: string, fn: () => Promise<void>) => {
try {
await fn();
} catch (error) {
Sentry.captureException(error);
logger.error(`Exception thrown while executing lifecycle function \`${name}\`.`, error);
}
}
2025-10-10 03:10:29 +00:00
export class IndexSyncer {
private interval?: NodeJS.Timeout;
2025-10-15 03:16:43 +00:00
private queue: Queue<JobPayload>;
private worker: Worker<JobPayload>;
2025-10-10 03:10:29 +00:00
constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
2025-10-10 19:32:34 +00:00
private ctx: AppContext,
2025-10-10 03:10:29 +00:00
) {
2025-10-15 03:16:43 +00:00
this.queue = new Queue<JobPayload>({
2025-10-10 03:10:29 +00:00
redis,
namespace: 'index-sync-queue',
jobTimeoutMs: JOB_TIMEOUT_MS,
2025-10-15 03:16:43 +00:00
logger,
maxAttempts: 1,
2025-10-10 03:10:29 +00:00
});
2025-10-15 03:16:43 +00:00
this.worker = new Worker<JobPayload>({
2025-10-10 03:10:29 +00:00
queue: this.queue,
maxStalledCount: 1,
2025-10-10 19:32:34 +00:00
handler: this.runJob.bind(this),
concurrency: this.settings.maxRepoIndexingJobConcurrency,
2025-10-15 03:16:43 +00:00
logger,
2025-10-10 03:10:29 +00:00
});
2025-10-10 19:32:34 +00:00
this.worker.on('completed', this.onJobCompleted.bind(this));
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
2025-10-15 03:16:43 +00:00
this.worker.on('error', this.onWorkerError.bind(this));
2025-10-10 03:10:29 +00:00
2025-10-10 19:32:34 +00:00
// @nocheckin
2025-10-10 03:10:29 +00:00
const app = express();
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [new BullBoardGroupMQAdapter(this.queue, { displayName: 'Index Sync' })],
serverAdapter,
});
app.use('/', serverAdapter.getRouter());
app.listen(3070);
}
public async startScheduler() {
this.interval = setInterval(async () => {
2025-10-15 03:16:43 +00:00
await this.scheduleIndexJobs();
await this.scheduleCleanupJobs();
}, 1000 * 5);
this.worker.run();
}
private async scheduleIndexJobs() {
const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs);
const reposToIndex = await this.db.repo.findMany({
where: {
AND: [
{
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
]
},
{
NOT: {
jobs: {
some: {
AND: [
{
type: RepoJobType.INDEX,
},
{
OR: [
// Don't schedule if there are active jobs that were created within the threshold date.
// This handles the case where a job is stuck in a pending state and will never be scheduled.
{
AND: [
{
status: {
in: [
RepoJobStatus.PENDING,
RepoJobStatus.IN_PROGRESS,
]
},
2025-10-10 03:10:29 +00:00
},
2025-10-15 03:16:43 +00:00
{
createdAt: {
gt: thresholdDate,
}
2025-10-10 03:10:29 +00:00
}
2025-10-15 03:16:43 +00:00
]
},
// Don't schedule if there are recent failed jobs (within the threshold date).
{
AND: [
{ status: RepoJobStatus.FAILED },
{ completedAt: { gt: thresholdDate } },
]
}
]
}
]
}
}
}
}
],
}
});
if (reposToIndex.length > 0) {
await this.createJobs(reposToIndex, RepoJobType.INDEX);
}
}
private async scheduleCleanupJobs() {
const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs);
const reposToCleanup = await this.db.repo.findMany({
where: {
connections: {
none: {}
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
],
// Don't schedule if there are active jobs that were created within the threshold date.
NOT: {
jobs: {
some: {
AND: [
{
type: RepoJobType.CLEANUP,
},
{
status: {
in: [
RepoJobStatus.PENDING,
RepoJobStatus.IN_PROGRESS,
2025-10-10 03:10:29 +00:00
]
2025-10-15 03:16:43 +00:00
},
},
{
createdAt: {
gt: thresholdDate,
2025-10-10 03:10:29 +00:00
}
}
2025-10-15 03:16:43 +00:00
]
2025-10-10 03:10:29 +00:00
}
2025-10-15 03:16:43 +00:00
}
2025-10-10 03:10:29 +00:00
}
}
2025-10-15 03:16:43 +00:00
});
2025-10-10 03:10:29 +00:00
2025-10-15 03:16:43 +00:00
if (reposToCleanup.length > 0) {
await this.createJobs(reposToCleanup, RepoJobType.CLEANUP);
}
2025-10-10 03:10:29 +00:00
}
2025-10-15 03:16:43 +00:00
private async createJobs(repos: Repo[], type: RepoJobType) {
2025-10-10 03:10:29 +00:00
// @note: we don't perform this in a transaction because
// we want to avoid the situation where a job is created and run
// prior to the transaction being committed.
2025-10-15 03:16:43 +00:00
const jobs = await this.db.repoJob.createManyAndReturn({
2025-10-10 03:10:29 +00:00
data: repos.map(repo => ({
2025-10-15 03:16:43 +00:00
type,
2025-10-10 03:10:29 +00:00
repoId: repo.id,
2025-10-15 03:16:43 +00:00
})),
include: {
repo: true,
}
2025-10-10 03:10:29 +00:00
});
for (const job of jobs) {
await this.queue.add({
groupId: `repo:${job.repoId}`,
data: {
jobId: job.id,
2025-10-15 03:16:43 +00:00
type,
repoName: job.repo.name,
repoId: job.repo.id,
2025-10-10 03:10:29 +00:00
},
jobId: job.id,
});
}
}
2025-10-15 03:16:43 +00:00
private async runJob(job: ReservedJob<JobPayload>) {
2025-10-10 19:32:34 +00:00
const id = job.data.jobId;
2025-10-15 03:16:43 +00:00
const logger = createJobLogger(id);
logger.info(`Running job ${id} for repo ${job.data.repoName}`);
const { repo, type: jobType } = await this.db.repoJob.update({
2025-10-10 19:32:34 +00:00
where: {
id,
},
data: {
2025-10-15 03:16:43 +00:00
status: RepoJobStatus.IN_PROGRESS,
2025-10-10 19:32:34 +00:00
},
select: {
2025-10-15 03:16:43 +00:00
type: true,
2025-10-10 19:32:34 +00:00
repo: {
include: {
connections: {
include: {
connection: true,
}
}
}
}
}
});
2025-10-15 03:16:43 +00:00
if (jobType === RepoJobType.INDEX) {
await this.indexRepository(repo, logger);
} else if (jobType === RepoJobType.CLEANUP) {
await this.cleanupRepository(repo, logger);
}
2025-10-10 19:32:34 +00:00
}
2025-10-15 03:16:43 +00:00
private async indexRepository(repo: RepoWithConnections, logger: Logger) {
2025-10-10 19:32:34 +00:00
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
const metadata = repoMetadataSchema.parse(repo.metadata);
const credentials = await getAuthCredentialsForRepo(repo, this.db);
const cloneUrlMaybeWithToken = credentials?.cloneUrlWithToken ?? repo.cloneUrl;
const authHeader = credentials?.authHeader ?? undefined;
2025-10-15 03:16:43 +00:00
// If the repo path exists but it is not a valid git repository root, this indicates
// that the repository is in a bad state. To fix, we remove the directory and perform
// a fresh clone.
if (existsSync(repoPath) && !(await isPathAValidGitRepoRoot(repoPath)) && !isReadOnly) {
logger.warn(`${repoPath} is not a valid git repository root. Deleting directory and performing fresh clone.`);
await rm(repoPath, { recursive: true, force: true });
}
2025-10-10 19:32:34 +00:00
if (existsSync(repoPath) && !isReadOnly) {
// @NOTE: in #483, we changed the cloning method s.t., we _no longer_
// write the clone URL (which could contain a auth token) to the
// `remote.origin.url` entry. For the upgrade scenario, we want
// to unset this key since it is no longer needed, hence this line.
// This will no-op if the key is already unset.
// @see: https://github.com/sourcebot-dev/sourcebot/pull/483
await unsetGitConfig(repoPath, ["remote.origin.url"]);
logger.info(`Fetching ${repo.displayName}...`);
const { durationMs } = await measure(() => fetchRepository({
cloneUrl: cloneUrlMaybeWithToken,
authHeader,
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
}
}));
const fetchDuration_s = durationMs / 1000;
process.stdout.write('\n');
logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`);
} else if (!isReadOnly) {
logger.info(`Cloning ${repo.displayName}...`);
const { durationMs } = await measure(() => cloneRepository({
cloneUrl: cloneUrlMaybeWithToken,
authHeader,
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
}
}));
const cloneDuration_s = durationMs / 1000;
process.stdout.write('\n');
logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`);
}
// Regardless of clone or fetch, always upsert the git config for the repo.
// This ensures that the git config is always up to date for whatever we
// have in the DB.
if (metadata.gitConfig && !isReadOnly) {
await upsertGitConfig(repoPath, metadata.gitConfig);
}
logger.info(`Indexing ${repo.displayName}...`);
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`);
}
2025-10-15 03:16:43 +00:00
private async cleanupRepository(repo: Repo, logger: Logger) {
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
if (existsSync(repoPath) && !isReadOnly) {
logger.info(`Deleting repo directory ${repoPath}`);
await rm(repoPath, { recursive: true, force: true });
}
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
const files = (await readdir(this.ctx.indexPath)).filter(file => file.startsWith(shardPrefix));
for (const file of files) {
const filePath = `${this.ctx.indexPath}/${file}`;
logger.info(`Deleting shard file ${filePath}`);
await rm(filePath, { force: true });
}
}
private onJobCompleted = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobCompleted', async () => {
const logger = createJobLogger(job.data.jobId);
const jobData = await this.db.repoJob.update({
where: { id: job.data.jobId },
data: {
status: RepoJobStatus.COMPLETED,
completedAt: new Date(),
}
});
if (jobData.type === RepoJobType.INDEX) {
const repo = await this.db.repo.update({
where: { id: jobData.repoId },
data: {
2025-10-10 19:32:34 +00:00
indexedAt: new Date(),
}
2025-10-15 03:16:43 +00:00
});
logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`);
}
else if (jobData.type === RepoJobType.CLEANUP) {
const repo = await this.db.repo.delete({
where: { id: jobData.repoId },
});
logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name}`);
}
2025-10-10 19:32:34 +00:00
});
2025-10-15 03:16:43 +00:00
private onJobFailed = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobFailed', async () => {
const logger = createJobLogger(job.data.jobId);
2025-10-10 19:32:34 +00:00
2025-10-15 03:16:43 +00:00
const { repo } = await this.db.repoJob.update({
where: { id: job.data.jobId },
data: {
completedAt: new Date(),
errorMessage: job.failedReason,
},
select: { repo: true }
});
logger.error(`Failed job ${job.data.jobId} for repo ${repo.name}`);
2025-10-10 19:32:34 +00:00
});
2025-10-15 03:16:43 +00:00
private onJobStalled = async (jobId: string) =>
groupmqLifecycleExceptionWrapper('onJobStalled', async () => {
const logger = createJobLogger(jobId);
const { repo } = await this.db.repoJob.update({
where: { id: jobId },
data: {
status: RepoJobStatus.FAILED,
completedAt: new Date(),
errorMessage: 'Job stalled',
},
select: { repo: true }
});
2025-10-10 19:32:34 +00:00
2025-10-15 03:16:43 +00:00
logger.error(`Job ${jobId} stalled for repo ${repo.name}`);
2025-10-10 19:32:34 +00:00
});
2025-10-15 03:16:43 +00:00
private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Index syncer worker error.`, error);
2025-10-10 19:32:34 +00:00
}
2025-10-15 03:16:43 +00:00
public async dispose() {
2025-10-10 03:10:29 +00:00
if (this.interval) {
clearInterval(this.interval);
}
2025-10-15 03:16:43 +00:00
await this.worker.close();
await this.queue.close();
2025-10-10 03:10:29 +00:00
}
}