mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 12:25:22 +00:00
add indexing
This commit is contained in:
parent
32ce1ca16d
commit
963f6fd69e
2 changed files with 149 additions and 80 deletions
|
|
@ -72,9 +72,9 @@ const connectionManager = new ConnectionManager(prisma, settings, redis);
|
||||||
const repoManager = new RepoManager(prisma, settings, redis, promClient, context);
|
const repoManager = new RepoManager(prisma, settings, redis, promClient, context);
|
||||||
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
||||||
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
||||||
const indexSyncer = new IndexSyncer(prisma, settings, redis);
|
const indexSyncer = new IndexSyncer(prisma, settings, redis, context);
|
||||||
|
|
||||||
await repoManager.validateIndexedReposHaveShards();
|
// await repoManager.validateIndexedReposHaveShards();
|
||||||
|
|
||||||
connectionManager.startScheduler();
|
connectionManager.startScheduler();
|
||||||
// repoManager.startScheduler();
|
// repoManager.startScheduler();
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,13 @@ import * as Sentry from '@sentry/node';
|
||||||
import { PrismaClient, Repo, RepoIndexingJobStatus } from "@sourcebot/db";
|
import { PrismaClient, Repo, RepoIndexingJobStatus } from "@sourcebot/db";
|
||||||
import { createLogger } from "@sourcebot/logger";
|
import { createLogger } from "@sourcebot/logger";
|
||||||
import express from 'express';
|
import express from 'express';
|
||||||
import { BullBoardGroupMQAdapter, Queue, Worker } from "groupmq";
|
import { BullBoardGroupMQAdapter, Job, Queue, ReservedJob, Worker } from "groupmq";
|
||||||
import { Redis } from 'ioredis';
|
import { Redis } from 'ioredis';
|
||||||
import { Settings } from "./types.js";
|
import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
|
||||||
|
import { getAuthCredentialsForRepo, getRepoPath, measure } from './utils.js';
|
||||||
|
import { existsSync } from 'fs';
|
||||||
|
import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from './git.js';
|
||||||
|
import { indexGitRepository } from './zoekt.js';
|
||||||
|
|
||||||
const logger = createLogger('index-syncer');
|
const logger = createLogger('index-syncer');
|
||||||
|
|
||||||
|
|
@ -14,7 +18,7 @@ type IndexSyncJob = {
|
||||||
jobId: string;
|
jobId: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
const JOB_TIMEOUT_MS = 1000 * 60; // 60 second timeout.
|
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout
|
||||||
|
|
||||||
export class IndexSyncer {
|
export class IndexSyncer {
|
||||||
private interval?: NodeJS.Timeout;
|
private interval?: NodeJS.Timeout;
|
||||||
|
|
@ -25,6 +29,7 @@ export class IndexSyncer {
|
||||||
private db: PrismaClient,
|
private db: PrismaClient,
|
||||||
private settings: Settings,
|
private settings: Settings,
|
||||||
redis: Redis,
|
redis: Redis,
|
||||||
|
private ctx: AppContext,
|
||||||
) {
|
) {
|
||||||
this.queue = new Queue<IndexSyncJob>({
|
this.queue = new Queue<IndexSyncJob>({
|
||||||
redis,
|
redis,
|
||||||
|
|
@ -37,88 +42,19 @@ export class IndexSyncer {
|
||||||
queue: this.queue,
|
queue: this.queue,
|
||||||
maxStalledCount: 1,
|
maxStalledCount: 1,
|
||||||
stalledInterval: 1000,
|
stalledInterval: 1000,
|
||||||
handler: async (job) => {
|
handler: this.runJob.bind(this),
|
||||||
const id = job.data.jobId;
|
concurrency: this.settings.maxRepoIndexingJobConcurrency,
|
||||||
const { repo } = await this.db.repoIndexingJob.update({
|
|
||||||
where: {
|
|
||||||
id,
|
|
||||||
},
|
|
||||||
data: {
|
|
||||||
status: RepoIndexingJobStatus.IN_PROGRESS,
|
|
||||||
},
|
|
||||||
select: {
|
|
||||||
repo: true,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info(`Running index job ${id} for repo ${repo.name}`);
|
|
||||||
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 1000 * 10));
|
|
||||||
|
|
||||||
return true;
|
|
||||||
},
|
|
||||||
concurrency: 4,
|
|
||||||
});
|
|
||||||
|
|
||||||
this.worker.on('completed', async (job) => {
|
|
||||||
const { repo } = await this.db.repoIndexingJob.update({
|
|
||||||
where: {
|
|
||||||
id: job.data.jobId,
|
|
||||||
},
|
|
||||||
data: {
|
|
||||||
status: RepoIndexingJobStatus.COMPLETED,
|
|
||||||
repo: {
|
|
||||||
update: {
|
|
||||||
indexedAt: new Date(),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
completedAt: new Date(),
|
|
||||||
},
|
|
||||||
select: {
|
|
||||||
repo: true,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`);
|
|
||||||
});
|
|
||||||
|
|
||||||
this.worker.on('failed', async (job) => {
|
|
||||||
const { repo } = await this.db.repoIndexingJob.update({
|
|
||||||
where: {
|
|
||||||
id: job.data.jobId,
|
|
||||||
},
|
|
||||||
data: {
|
|
||||||
status: RepoIndexingJobStatus.FAILED,
|
|
||||||
completedAt: new Date(),
|
|
||||||
errorMessage: job.failedReason,
|
|
||||||
},
|
|
||||||
select: {
|
|
||||||
repo: true,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`);
|
|
||||||
});
|
|
||||||
|
|
||||||
this.worker.on('stalled', async (jobId) => {
|
|
||||||
const { repo } = await this.db.repoIndexingJob.update({
|
|
||||||
where: { id: jobId },
|
|
||||||
data: {
|
|
||||||
status: RepoIndexingJobStatus.FAILED,
|
|
||||||
completedAt: new Date(),
|
|
||||||
errorMessage: 'Job stalled',
|
|
||||||
},
|
|
||||||
select: { repo: true }
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.warn(`Job ${jobId} stalled for repo ${repo.name}`);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.worker.on('completed', this.onJobCompleted.bind(this));
|
||||||
|
this.worker.on('failed', this.onJobFailed.bind(this));
|
||||||
|
this.worker.on('stalled', this.onJobStalled.bind(this));
|
||||||
this.worker.on('error', async (error) => {
|
this.worker.on('error', async (error) => {
|
||||||
Sentry.captureException(error);
|
Sentry.captureException(error);
|
||||||
logger.error(`Index syncer worker error.`, error);
|
logger.error(`Index syncer worker error.`, error);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// @nocheckin
|
||||||
const app = express();
|
const app = express();
|
||||||
const serverAdapter = new ExpressAdapter();
|
const serverAdapter = new ExpressAdapter();
|
||||||
|
|
||||||
|
|
@ -215,6 +151,139 @@ export class IndexSyncer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async runJob(job: ReservedJob<IndexSyncJob>) {
|
||||||
|
const id = job.data.jobId;
|
||||||
|
const { repo } = await this.db.repoIndexingJob.update({
|
||||||
|
where: {
|
||||||
|
id,
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
status: RepoIndexingJobStatus.IN_PROGRESS,
|
||||||
|
},
|
||||||
|
select: {
|
||||||
|
repo: {
|
||||||
|
include: {
|
||||||
|
connections: {
|
||||||
|
include: {
|
||||||
|
connection: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.syncGitRepository(repo);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async syncGitRepository(repo: RepoWithConnections) {
|
||||||
|
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
|
||||||
|
|
||||||
|
const metadata = repoMetadataSchema.parse(repo.metadata);
|
||||||
|
|
||||||
|
const credentials = await getAuthCredentialsForRepo(repo, this.db);
|
||||||
|
const cloneUrlMaybeWithToken = credentials?.cloneUrlWithToken ?? repo.cloneUrl;
|
||||||
|
const authHeader = credentials?.authHeader ?? undefined;
|
||||||
|
|
||||||
|
if (existsSync(repoPath) && !isReadOnly) {
|
||||||
|
// @NOTE: in #483, we changed the cloning method s.t., we _no longer_
|
||||||
|
// write the clone URL (which could contain a auth token) to the
|
||||||
|
// `remote.origin.url` entry. For the upgrade scenario, we want
|
||||||
|
// to unset this key since it is no longer needed, hence this line.
|
||||||
|
// This will no-op if the key is already unset.
|
||||||
|
// @see: https://github.com/sourcebot-dev/sourcebot/pull/483
|
||||||
|
await unsetGitConfig(repoPath, ["remote.origin.url"]);
|
||||||
|
|
||||||
|
logger.info(`Fetching ${repo.displayName}...`);
|
||||||
|
const { durationMs } = await measure(() => fetchRepository({
|
||||||
|
cloneUrl: cloneUrlMaybeWithToken,
|
||||||
|
authHeader,
|
||||||
|
path: repoPath,
|
||||||
|
onProgress: ({ method, stage, progress }) => {
|
||||||
|
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
const fetchDuration_s = durationMs / 1000;
|
||||||
|
|
||||||
|
process.stdout.write('\n');
|
||||||
|
logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`);
|
||||||
|
|
||||||
|
} else if (!isReadOnly) {
|
||||||
|
logger.info(`Cloning ${repo.displayName}...`);
|
||||||
|
|
||||||
|
const { durationMs } = await measure(() => cloneRepository({
|
||||||
|
cloneUrl: cloneUrlMaybeWithToken,
|
||||||
|
authHeader,
|
||||||
|
path: repoPath,
|
||||||
|
onProgress: ({ method, stage, progress }) => {
|
||||||
|
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
const cloneDuration_s = durationMs / 1000;
|
||||||
|
|
||||||
|
process.stdout.write('\n');
|
||||||
|
logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regardless of clone or fetch, always upsert the git config for the repo.
|
||||||
|
// This ensures that the git config is always up to date for whatever we
|
||||||
|
// have in the DB.
|
||||||
|
if (metadata.gitConfig && !isReadOnly) {
|
||||||
|
await upsertGitConfig(repoPath, metadata.gitConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Indexing ${repo.displayName}...`);
|
||||||
|
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx));
|
||||||
|
const indexDuration_s = durationMs / 1000;
|
||||||
|
logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async onJobCompleted(job: Job<IndexSyncJob>) {
|
||||||
|
const { repo } = await this.db.repoIndexingJob.update({
|
||||||
|
where: { id: job.data.jobId },
|
||||||
|
data: {
|
||||||
|
status: RepoIndexingJobStatus.COMPLETED,
|
||||||
|
repo: {
|
||||||
|
update: {
|
||||||
|
indexedAt: new Date(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
completedAt: new Date(),
|
||||||
|
},
|
||||||
|
select: { repo: true }
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async onJobFailed(job: Job<IndexSyncJob>) {
|
||||||
|
const { repo } = await this.db.repoIndexingJob.update({
|
||||||
|
where: { id: job.data.jobId },
|
||||||
|
data: {
|
||||||
|
status: RepoIndexingJobStatus.FAILED,
|
||||||
|
completedAt: new Date(),
|
||||||
|
errorMessage: job.failedReason,
|
||||||
|
},
|
||||||
|
select: { repo: true}
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async onJobStalled(jobId: string) {
|
||||||
|
const { repo } = await this.db.repoIndexingJob.update({
|
||||||
|
where: { id: jobId },
|
||||||
|
data: {
|
||||||
|
status: RepoIndexingJobStatus.FAILED,
|
||||||
|
completedAt: new Date(),
|
||||||
|
errorMessage: 'Job stalled',
|
||||||
|
},
|
||||||
|
select: { repo: true }
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.error(`Job ${jobId} stalled for repo ${repo.name}`);
|
||||||
|
}
|
||||||
|
|
||||||
public dispose() {
|
public dispose() {
|
||||||
if (this.interval) {
|
if (this.interval) {
|
||||||
clearInterval(this.interval);
|
clearInterval(this.interval);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue