2025-01-15 23:44:42 +00:00
|
|
|
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
|
2024-11-13 02:37:35 +00:00
|
|
|
import { existsSync, watch } from 'fs';
|
2025-01-14 21:37:31 +00:00
|
|
|
import { syncConfig } from "./config.js";
|
|
|
|
|
import { cloneRepository, fetchRepository } from "./git.js";
|
2024-11-13 02:37:35 +00:00
|
|
|
import { createLogger } from "./logger.js";
|
|
|
|
|
import { captureEvent } from "./posthog.js";
|
2025-01-14 21:37:31 +00:00
|
|
|
import { AppContext } from "./types.js";
|
|
|
|
|
import { getRepoPath, isRemotePath, measure } from "./utils.js";
|
|
|
|
|
import { indexGitRepository } from "./zoekt.js";
|
|
|
|
|
import { DEFAULT_SETTINGS } from './constants.js';
|
2025-01-15 23:44:42 +00:00
|
|
|
import { Queue, Worker, Job } from 'bullmq';
|
|
|
|
|
import { Redis } from 'ioredis';
|
|
|
|
|
import * as os from 'os';
|
2024-11-13 02:37:35 +00:00
|
|
|
|
|
|
|
|
const logger = createLogger('main');
|
|
|
|
|
|
2025-01-14 21:37:31 +00:00
|
|
|
const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
|
2024-11-13 02:37:35 +00:00
|
|
|
let fetchDuration_s: number | undefined = undefined;
|
|
|
|
|
let cloneDuration_s: number | undefined = undefined;
|
|
|
|
|
|
2025-01-14 21:37:31 +00:00
|
|
|
const repoPath = getRepoPath(repo, ctx);
|
|
|
|
|
const metadata = repo.metadata as Record<string, string>;
|
|
|
|
|
|
|
|
|
|
if (existsSync(repoPath)) {
|
2024-11-13 02:37:35 +00:00
|
|
|
logger.info(`Fetching ${repo.id}...`);
|
|
|
|
|
|
2025-01-14 21:37:31 +00:00
|
|
|
const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
|
2024-11-13 02:37:35 +00:00
|
|
|
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
|
|
|
|
}));
|
|
|
|
|
fetchDuration_s = durationMs / 1000;
|
|
|
|
|
|
|
|
|
|
process.stdout.write('\n');
|
2025-01-14 21:37:31 +00:00
|
|
|
logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`);
|
2024-11-13 02:37:35 +00:00
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
logger.info(`Cloning ${repo.id}...`);
|
|
|
|
|
|
2025-01-14 21:37:31 +00:00
|
|
|
const { durationMs } = await measure(() => cloneRepository(repo.cloneUrl, repoPath, metadata, ({ method, stage, progress }) => {
|
2024-11-13 02:37:35 +00:00
|
|
|
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
|
|
|
|
}));
|
|
|
|
|
cloneDuration_s = durationMs / 1000;
|
|
|
|
|
|
|
|
|
|
process.stdout.write('\n');
|
|
|
|
|
logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(`Indexing ${repo.id}...`);
|
2025-01-14 21:37:31 +00:00
|
|
|
const { durationMs } = await measure(() => indexGitRepository(repo, ctx));
|
2024-11-13 02:37:35 +00:00
|
|
|
const indexDuration_s = durationMs / 1000;
|
|
|
|
|
logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
fetchDuration_s,
|
|
|
|
|
cloneDuration_s,
|
|
|
|
|
indexDuration_s,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-15 23:44:42 +00:00
|
|
|
async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
|
|
|
|
|
for (const repo of repos) {
|
|
|
|
|
await db.$transaction(async (tx) => {
|
|
|
|
|
await tx.repo.update({
|
|
|
|
|
where: { id: repo.id },
|
|
|
|
|
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Add the job to the queue
|
|
|
|
|
await queue.add('indexJob', repo);
|
|
|
|
|
logger.info(`Added job to queue for repo ${repo.id}`);
|
|
|
|
|
}).catch((err) => {
|
|
|
|
|
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-14 21:37:31 +00:00
|
|
|
export const main = async (db: PrismaClient, context: AppContext) => {
|
2024-11-13 02:37:35 +00:00
|
|
|
let abortController = new AbortController();
|
|
|
|
|
let isSyncing = false;
|
|
|
|
|
const _syncConfig = async () => {
|
|
|
|
|
if (isSyncing) {
|
|
|
|
|
abortController.abort();
|
|
|
|
|
abortController = new AbortController();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(`Syncing configuration file ${context.configPath} ...`);
|
|
|
|
|
isSyncing = true;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context))
|
|
|
|
|
logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`);
|
|
|
|
|
isSyncing = false;
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
if (err.name === "AbortError") {
|
|
|
|
|
// @note: If we're aborting, we don't want to set isSyncing to false
|
|
|
|
|
// since it implies another sync is in progress.
|
|
|
|
|
} else {
|
|
|
|
|
isSyncing = false;
|
|
|
|
|
logger.error(`Failed to sync configuration file ${context.configPath} with error:`);
|
|
|
|
|
console.log(err);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Re-sync on file changes if the config file is local
|
|
|
|
|
if (!isRemotePath(context.configPath)) {
|
|
|
|
|
watch(context.configPath, () => {
|
|
|
|
|
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
|
|
|
|
|
_syncConfig();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-17 05:30:50 +00:00
|
|
|
// Re-sync at a fixed interval
|
2024-11-13 02:37:35 +00:00
|
|
|
setInterval(() => {
|
|
|
|
|
_syncConfig();
|
2025-01-14 21:37:31 +00:00
|
|
|
}, DEFAULT_SETTINGS.resyncIntervalMs);
|
2024-11-13 02:37:35 +00:00
|
|
|
|
|
|
|
|
// Sync immediately on startup
|
|
|
|
|
await _syncConfig();
|
|
|
|
|
|
2025-01-15 23:44:42 +00:00
|
|
|
const redis = new Redis({
|
|
|
|
|
host: 'localhost',
|
|
|
|
|
port: 6379,
|
|
|
|
|
maxRetriesPerRequest: null
|
|
|
|
|
});
|
|
|
|
|
redis.ping().then(() => {
|
|
|
|
|
logger.info('Connected to redis');
|
|
|
|
|
}).catch((err) => {
|
|
|
|
|
logger.error('Failed to connect to redis');
|
|
|
|
|
console.error(err);
|
|
|
|
|
process.exit(1);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const indexQueue = new Queue('indexQueue');
|
|
|
|
|
|
|
|
|
|
const numCores = os.cpus().length;
|
|
|
|
|
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
|
|
|
|
|
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
|
|
|
|
|
const worker = new Worker('indexQueue', async (job) => {
|
|
|
|
|
const repo = job.data as Repo;
|
|
|
|
|
|
|
|
|
|
let indexDuration_s: number | undefined;
|
|
|
|
|
let fetchDuration_s: number | undefined;
|
|
|
|
|
let cloneDuration_s: number | undefined;
|
|
|
|
|
|
|
|
|
|
const stats = await syncGitRepository(repo, context);
|
|
|
|
|
indexDuration_s = stats.indexDuration_s;
|
|
|
|
|
fetchDuration_s = stats.fetchDuration_s;
|
|
|
|
|
cloneDuration_s = stats.cloneDuration_s;
|
|
|
|
|
|
|
|
|
|
captureEvent('repo_synced', {
|
|
|
|
|
vcs: 'git',
|
|
|
|
|
codeHost: repo.external_codeHostType,
|
|
|
|
|
indexDuration_s,
|
|
|
|
|
fetchDuration_s,
|
|
|
|
|
cloneDuration_s,
|
|
|
|
|
});
|
2024-11-13 02:37:35 +00:00
|
|
|
|
2025-01-15 23:44:42 +00:00
|
|
|
await db.repo.update({
|
|
|
|
|
where: {
|
|
|
|
|
id: repo.id,
|
|
|
|
|
},
|
|
|
|
|
data: {
|
|
|
|
|
indexedAt: new Date(),
|
|
|
|
|
repoIndexingStatus: RepoIndexingStatus.INDEXED,
|
2024-11-13 02:37:35 +00:00
|
|
|
}
|
2025-01-15 23:44:42 +00:00
|
|
|
});
|
|
|
|
|
}, { connection: redis, concurrency: numWorkers });
|
|
|
|
|
|
|
|
|
|
worker.on('completed', (job) => {
|
|
|
|
|
logger.info(`Job ${job.id} completed`);
|
|
|
|
|
});
|
|
|
|
|
worker.on('failed', async (job: Job | undefined, err) => {
|
|
|
|
|
logger.info(`Job failed with error: ${err}`);
|
|
|
|
|
if (job) {
|
2025-01-14 21:37:31 +00:00
|
|
|
await db.repo.update({
|
|
|
|
|
where: {
|
2025-01-15 23:44:42 +00:00
|
|
|
id: job.data.id,
|
2025-01-14 21:37:31 +00:00
|
|
|
},
|
|
|
|
|
data: {
|
2025-01-15 23:44:42 +00:00
|
|
|
repoIndexingStatus: RepoIndexingStatus.FAILED,
|
2025-01-14 21:37:31 +00:00
|
|
|
}
|
2025-01-15 23:44:42 +00:00
|
|
|
})
|
2024-11-13 02:37:35 +00:00
|
|
|
}
|
2025-01-15 23:44:42 +00:00
|
|
|
});
|
2024-11-13 02:37:35 +00:00
|
|
|
|
2025-01-15 23:44:42 +00:00
|
|
|
while (true) {
|
|
|
|
|
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
|
|
|
|
|
const repos = await db.repo.findMany({
|
|
|
|
|
where: {
|
|
|
|
|
repoIndexingStatus: {
|
|
|
|
|
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
|
|
|
|
|
},
|
|
|
|
|
OR: [
|
|
|
|
|
{ indexedAt: null },
|
|
|
|
|
{ indexedAt: { lt: thresholdDate } },
|
|
|
|
|
{ repoIndexingStatus: RepoIndexingStatus.NEW }
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
logger.info(`Found ${repos.length} repos to index...`);
|
|
|
|
|
addReposToQueue(db, indexQueue, repos);
|
2024-11-13 02:37:35 +00:00
|
|
|
|
2025-01-15 23:44:42 +00:00
|
|
|
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
2024-11-13 02:37:35 +00:00
|
|
|
}
|
|
|
|
|
}
|