sourcebot/packages/backend/src/main.ts

407 lines
15 KiB
TypeScript
Raw Normal View History

import { readFile, rm } from 'fs/promises';
import { existsSync, watch } from 'fs';
import { SourcebotConfigurationSchema } from "./schemas/v2.js";
import { getGitHubReposFromConfig } from "./github.js";
import { getGitLabReposFromConfig } from "./gitlab.js";
import { getGiteaReposFromConfig } from "./gitea.js";
import { getGerritReposFromConfig } from "./gerrit.js";
import { AppContext, LocalRepository, GitRepository, Repository, Settings } from "./types.js";
import { cloneRepository, fetchRepository } from "./git.js";
import { createLogger } from "./logger.js";
import { createRepository, Database, loadDB, updateRepository, updateSettings } from './db.js';
import { arraysEqualShallow, isRemotePath, measure } from "./utils.js";
import { DEFAULT_SETTINGS, REINDEX_INTERVAL_MS, RESYNC_CONFIG_INTERVAL_MS } from "./constants.js";
import stripJsonComments from 'strip-json-comments';
import { indexGitRepository, indexLocalRepository } from "./zoekt.js";
import { getLocalRepoFromConfig, initLocalRepoFileWatchers } from "./local.js";
import { captureEvent } from "./posthog.js";
import { glob } from 'glob';
import path from 'path';
const logger = createLogger('main');
const syncGitRepository = async (repo: GitRepository, settings: Settings, ctx: AppContext) => {
let fetchDuration_s: number | undefined = undefined;
let cloneDuration_s: number | undefined = undefined;
if (existsSync(repo.path)) {
logger.info(`Fetching ${repo.id}...`);
const { durationMs } = await measure(() => fetchRepository(repo, ({ method, stage , progress}) => {
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
}));
fetchDuration_s = durationMs / 1000;
process.stdout.write('\n');
logger.info(`Fetched ${repo.id} in ${fetchDuration_s}s`);
} else {
logger.info(`Cloning ${repo.id}...`);
const { durationMs } = await measure(() => cloneRepository(repo, ({ method, stage, progress }) => {
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
}));
cloneDuration_s = durationMs / 1000;
process.stdout.write('\n');
logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
}
logger.info(`Indexing ${repo.id}...`);
const { durationMs } = await measure(() => indexGitRepository(repo, settings, ctx));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
return {
fetchDuration_s,
cloneDuration_s,
indexDuration_s,
}
}
const syncLocalRepository = async (repo: LocalRepository, settings: Settings, ctx: AppContext, signal?: AbortSignal) => {
logger.info(`Indexing ${repo.id}...`);
const { durationMs } = await measure(() => indexLocalRepository(repo, settings, ctx, signal));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
return {
indexDuration_s,
}
}
export const deleteStaleRepository = async (repo: Repository, db: Database, ctx: AppContext) => {
logger.info(`Deleting stale repository ${repo.id}:`);
// Delete the checked out git repository (if applicable)
if (repo.vcs === "git") {
logger.info(`\tDeleting git directory ${repo.path}...`);
await rm(repo.path, {
recursive: true
});
}
// Delete all .zoekt index files
{
// .zoekt index files are named with the repository name,
// index version, and shard number. Some examples:
//
// git repos:
// github.com%2Fsourcebot-dev%2Fsourcebot_v16.00000.zoekt
// gitlab.com%2Fmy-org%2Fmy-project.00000.zoekt
//
// local repos:
// UnrealEngine_v16.00000.zoekt
// UnrealEngine_v16.00001.zoekt
// ...
// UnrealEngine_v16.00016.zoekt
//
// Notice that local repos are named with the repository basename and
// git repos are named with the query-encoded repository name. Form a
// glob pattern with the correct prefix & suffix to match the correct
// index file(s) for the repository.
//
// @see : https://github.com/sourcegraph/zoekt/blob/c03b77fbf18b76904c0e061f10f46597eedd7b14/build/builder.go#L348
const indexFilesGlobPattern = (() => {
switch (repo.vcs) {
case 'git':
return `${encodeURIComponent(repo.id)}*.zoekt`;
case 'local':
return `${path.basename(repo.path)}*.zoekt`;
}
})();
const indexFiles = await glob(indexFilesGlobPattern, {
cwd: ctx.indexPath,
absolute: true
});
await Promise.all(indexFiles.map((file) => {
logger.info(`\tDeleting index file ${file}...`);
return rm(file);
}));
}
// Delete db entry
logger.info(`\tDeleting db entry...`);
await db.update(({ repos }) => {
delete repos[repo.id];
});
logger.info(`Deleted stale repository ${repo.id}`);
}
/**
* Certain configuration changes (e.g., a branch is added) require
* a reindexing of the repository.
*/
export const isRepoReindexingRequired = (previous: Repository, current: Repository) => {
/**
* Checks if the any of the `revisions` properties have changed.
*/
const isRevisionsChanged = () => {
if (previous.vcs !== 'git' || current.vcs !== 'git') {
return false;
}
return (
!arraysEqualShallow(previous.branches, current.branches) ||
!arraysEqualShallow(previous.tags, current.tags)
);
}
/**
* Check if the `exclude.paths` property has changed.
*/
const isExcludePathsChanged = () => {
if (previous.vcs !== 'local' || current.vcs !== 'local') {
return false;
}
return !arraysEqualShallow(previous.excludedPaths, current.excludedPaths);
}
return (
isRevisionsChanged() ||
isExcludePathsChanged()
)
}
/**
* Certain settings changes (e.g., the file limit size is changed) require
* a reindexing of _all_ repositories.
*/
export const isAllRepoReindexingRequired = (previous: Settings, current: Settings) => {
return (
previous?.maxFileSize !== current?.maxFileSize
)
}
const syncConfig = async (configPath: string, db: Database, signal: AbortSignal, ctx: AppContext) => {
const configContent = await (async () => {
if (isRemotePath(configPath)) {
const response = await fetch(configPath, {
signal,
});
if (!response.ok) {
throw new Error(`Failed to fetch config file ${configPath}: ${response.statusText}`);
}
return response.text();
} else {
return readFile(configPath, {
encoding: 'utf-8',
signal,
});
}
})();
// @todo: we should validate the configuration file's structure here.
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfigurationSchema;
// Update the settings
const updatedSettings: Settings = {
maxFileSize: config.settings?.maxFileSize ?? DEFAULT_SETTINGS.maxFileSize,
autoDeleteStaleRepos: config.settings?.autoDeleteStaleRepos ?? DEFAULT_SETTINGS.autoDeleteStaleRepos,
}
const _isAllRepoReindexingRequired = isAllRepoReindexingRequired(db.data.settings, updatedSettings);
await updateSettings(updatedSettings, db);
// Fetch all repositories from the config file
let configRepos: Repository[] = [];
for (const repoConfig of config.repos ?? []) {
switch (repoConfig.type) {
case 'github': {
const gitHubRepos = await getGitHubReposFromConfig(repoConfig, signal, ctx);
configRepos.push(...gitHubRepos);
break;
}
case 'gitlab': {
const gitLabRepos = await getGitLabReposFromConfig(repoConfig, ctx);
configRepos.push(...gitLabRepos);
break;
}
case 'gitea': {
const giteaRepos = await getGiteaReposFromConfig(repoConfig, ctx);
configRepos.push(...giteaRepos);
break;
}
case 'gerrit': {
const gerritRepos = await getGerritReposFromConfig(repoConfig, ctx);
configRepos.push(...gerritRepos);
break;
}
case 'local': {
const repo = getLocalRepoFromConfig(repoConfig, ctx);
configRepos.push(repo);
break;
}
}
}
// De-duplicate on id
configRepos.sort((a, b) => {
return a.id.localeCompare(b.id);
});
configRepos = configRepos.filter((item, index, self) => {
if (index === 0) return true;
if (item.id === self[index - 1].id) {
logger.debug(`Duplicate repository ${item.id} found in config file.`);
return false;
}
return true;
});
logger.info(`Discovered ${configRepos.length} unique repositories from config.`);
// Merge the repositories into the database
for (const newRepo of configRepos) {
if (newRepo.id in db.data.repos) {
const existingRepo = db.data.repos[newRepo.id];
const isReindexingRequired = _isAllRepoReindexingRequired || isRepoReindexingRequired(existingRepo, newRepo);
if (isReindexingRequired) {
logger.info(`Marking ${newRepo.id} for reindexing due to configuration change.`);
}
await updateRepository(existingRepo.id, {
...newRepo,
...(isReindexingRequired ? {
lastIndexedDate: undefined,
}: {})
}, db);
} else {
await createRepository(newRepo, db);
captureEvent("repo_created", {
vcs: newRepo.vcs,
codeHost: newRepo.codeHost,
});
}
}
// Find repositories that are in the database, but not in the configuration file
{
const a = configRepos.map(repo => repo.id);
const b = Object.keys(db.data.repos);
const diff = b.filter(x => !a.includes(x));
for (const id of diff) {
await db.update(({ repos }) => {
const repo = repos[id];
if (repo.isStale) {
return;
}
logger.warn(`Repository ${id} is no longer listed in the configuration file or was not found. Marking as stale.`);
repo.isStale = true;
});
}
}
}
export const main = async (context: AppContext) => {
const db = await loadDB(context);
let abortController = new AbortController();
let isSyncing = false;
const _syncConfig = async () => {
if (isSyncing) {
abortController.abort();
abortController = new AbortController();
}
logger.info(`Syncing configuration file ${context.configPath} ...`);
isSyncing = true;
try {
const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context))
logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`);
isSyncing = false;
} catch (err: any) {
if (err.name === "AbortError") {
// @note: If we're aborting, we don't want to set isSyncing to false
// since it implies another sync is in progress.
} else {
isSyncing = false;
logger.error(`Failed to sync configuration file ${context.configPath} with error:`);
console.log(err);
}
}
const localRepos = Object.values(db.data.repos).filter(repo => repo.vcs === 'local');
initLocalRepoFileWatchers(localRepos, async (repo, signal) => {
logger.info(`Change detected to local repository ${repo.id}. Re-syncing...`);
await syncLocalRepository(repo, db.data.settings, context, signal);
await db.update(({ repos }) => repos[repo.id].lastIndexedDate = new Date().toUTCString());
});
}
// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}
// Re-sync every 24 hours
setInterval(() => {
logger.info(`Re-syncing configuration file ${context.configPath}`);
_syncConfig();
}, RESYNC_CONFIG_INTERVAL_MS);
// Sync immediately on startup
await _syncConfig();
while (true) {
const repos = db.data.repos;
for (const [_, repo] of Object.entries(repos)) {
const lastIndexed = repo.lastIndexedDate ? new Date(repo.lastIndexedDate) : new Date(0);
if (repo.isStale) {
if (db.data.settings.autoDeleteStaleRepos) {
await deleteStaleRepository(repo, db, context);
} else {
// skip deletion...
}
continue;
}
if (lastIndexed.getTime() > Date.now() - REINDEX_INTERVAL_MS) {
continue;
}
try {
let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;
if (repo.vcs === 'git') {
const stats = await syncGitRepository(repo, db.data.settings, context);
indexDuration_s = stats.indexDuration_s;
fetchDuration_s = stats.fetchDuration_s;
cloneDuration_s = stats.cloneDuration_s;
} else if (repo.vcs === 'local') {
const stats = await syncLocalRepository(repo, db.data.settings, context);
indexDuration_s = stats.indexDuration_s;
}
captureEvent('repo_synced', {
vcs: repo.vcs,
codeHost: repo.codeHost,
indexDuration_s,
fetchDuration_s,
cloneDuration_s,
});
} catch (err: any) {
// @todo : better error handling here..
logger.error(err);
continue;
}
await db.update(({ repos }) => repos[repo.id].lastIndexedDate = new Date().toUTCString());
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
}