This commit is contained in:
bkellam 2025-10-14 22:52:15 -07:00
parent 32c68e7b72
commit d315292fe7
9 changed files with 96 additions and 94 deletions

View file

@ -1,4 +1,6 @@
import { env } from "./env.js";
import { Settings } from "./types.js";
import path from "path";
/**
* Default settings.
@ -22,4 +24,7 @@ export const DEFAULT_SETTINGS: Settings = {
export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES = [
'github',
];
];
export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');

View file

@ -44,6 +44,7 @@ export const env = createEnv({
LOGTAIL_TOKEN: z.string().optional(),
LOGTAIL_HOST: z.string().url().optional(),
SOURCEBOT_LOG_LEVEL: z.enum(["info", "debug", "warn", "error"]).default("info"),
DEBUG_ENABLE_GROUPMQ_LOGGING: booleanSchema.default('false'),
DATABASE_URL: z.string().url().default("postgresql://postgres:postgres@localhost:5432/postgres"),
CONFIG_PATH: z.string().optional(),

View file

@ -1,9 +1,47 @@
import { CheckRepoActions, GitConfigScope, simpleGit, SimpleGitProgressEvent } from 'simple-git';
import { mkdir } from 'node:fs/promises';
import { env } from './env.js';
import { dirname, resolve } from 'node:path';
import { existsSync } from 'node:fs';
type onProgressFn = (event: SimpleGitProgressEvent) => void;
/**
* Creates a simple-git client that has it's working directory
* set to the given path.
*/
const createGitClientForPath = (path: string, onProgress?: onProgressFn) => {
if (!existsSync(path)) {
throw new Error(`Path ${path} does not exist`);
}
const parentPath = resolve(dirname(path));
const git = simpleGit({
progress: onProgress,
})
.env({
...process.env,
/**
* @note on some inside-baseball on why this is necessary: The specific
* issue we saw was that a `git clone` would fail without throwing, and
* then a subsequent `git config` command would run, but since the clone
* failed, it wouldn't be running in a git directory. Git would then walk
* up the directory tree until it either found a git directory (in the case
* of the development env) or it would hit a GIT_DISCOVERY_ACROSS_FILESYSTEM
* error when trying to cross a filesystem boundary (in the prod case).
* GIT_CEILING_DIRECTORIES ensures that this walk will be limited to the
* parent directory.
*/
GIT_CEILING_DIRECTORIES: parentPath,
})
.cwd({
path,
});
return git;
}
export const cloneRepository = async (
{
cloneUrl,
@ -20,11 +58,7 @@ export const cloneRepository = async (
try {
await mkdir(path, { recursive: true });
const git = simpleGit({
progress: onProgress,
}).cwd({
path,
})
const git = createGitClientForPath(path, onProgress);
const cloneArgs = [
"--bare",
@ -62,11 +96,7 @@ export const fetchRepository = async (
}
) => {
try {
const git = simpleGit({
progress: onProgress,
}).cwd({
path: path,
})
const git = createGitClientForPath(path, onProgress);
if (authHeader) {
await git.addConfig("http.extraHeader", authHeader);
@ -108,9 +138,7 @@ export const fetchRepository = async (
* present in gitConfig.
*/
export const upsertGitConfig = async (path: string, gitConfig: Record<string, string>, onProgress?: onProgressFn) => {
const git = simpleGit({
progress: onProgress,
}).cwd(path);
const git = createGitClientForPath(path, onProgress);
try {
for (const [key, value] of Object.entries(gitConfig)) {
@ -130,9 +158,7 @@ export const upsertGitConfig = async (path: string, gitConfig: Record<string, st
* If a key is not set, this is a no-op.
*/
export const unsetGitConfig = async (path: string, keys: string[], onProgress?: onProgressFn) => {
const git = simpleGit({
progress: onProgress,
}).cwd(path);
const git = createGitClientForPath(path, onProgress);
try {
const configList = await git.listConfig();
@ -156,9 +182,7 @@ export const unsetGitConfig = async (path: string, keys: string[], onProgress?:
* Returns true if `path` is the _root_ of a git repository.
*/
export const isPathAValidGitRepoRoot = async (path: string, onProgress?: onProgressFn) => {
const git = simpleGit({
progress: onProgress,
}).cwd(path);
const git = createGitClientForPath(path, onProgress);
try {
return git.checkIsRepo(CheckRepoActions.IS_REPO_ROOT);
@ -184,7 +208,7 @@ export const isUrlAValidGitRepo = async (url: string) => {
}
export const getOriginUrl = async (path: string) => {
const git = simpleGit().cwd(path);
const git = createGitClientForPath(path);
try {
const remotes = await git.getConfig('remote.origin.url', GitConfigScope.local);
@ -199,18 +223,13 @@ export const getOriginUrl = async (path: string) => {
}
export const getBranches = async (path: string) => {
const git = simpleGit();
const branches = await git.cwd({
path,
}).branch();
const git = createGitClientForPath(path);
const branches = await git.branch();
return branches.all;
}
export const getTags = async (path: string) => {
const git = simpleGit();
const tags = await git.cwd({
path,
}).tags();
const git = createGitClientForPath(path);
const tags = await git.tags();
return tags.all;
}

View file

@ -6,16 +6,14 @@ import { hasEntitlement, loadConfig } from '@sourcebot/shared';
import { existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { Redis } from 'ioredis';
import path from 'path';
import { ConnectionManager } from './connectionManager.js';
import { DEFAULT_SETTINGS } from './constants.js';
import { env } from "./env.js";
import { DEFAULT_SETTINGS, INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js";
import { env } from "./env.js";
import { IndexSyncer } from "./indexSyncer.js";
import { PromClient } from './promClient.js';
import { RepoManager } from './repoManager.js';
import { AppContext } from "./types.js";
import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js";
import { IndexSyncer } from "./indexSyncer.js";
const logger = createLogger('backend-entrypoint');
@ -34,9 +32,8 @@ const getSettings = async (configPath?: string) => {
}
const cacheDir = env.DATA_CACHE_DIR;
const reposPath = path.join(cacheDir, 'repos');
const indexPath = path.join(cacheDir, 'index');
const reposPath = REPOS_CACHE_DIR;
const indexPath = INDEX_CACHE_DIR;
if (!existsSync(reposPath)) {
await mkdir(reposPath, { recursive: true });
@ -45,12 +42,6 @@ if (!existsSync(indexPath)) {
await mkdir(indexPath, { recursive: true });
}
const context: AppContext = {
indexPath,
reposPath,
cachePath: cacheDir,
}
const prisma = new PrismaClient();
const redis = new Redis(env.REDIS_URL, {
@ -69,10 +60,10 @@ const promClient = new PromClient();
const settings = await getSettings(env.CONFIG_PATH);
const connectionManager = new ConnectionManager(prisma, settings, redis);
const repoManager = new RepoManager(prisma, settings, redis, promClient, context);
const repoManager = new RepoManager(prisma, settings, redis, promClient);
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
const indexSyncer = new IndexSyncer(prisma, settings, redis, context);
const indexSyncer = new IndexSyncer(prisma, settings, redis);
// await repoManager.validateIndexedReposHaveShards();

View file

@ -5,9 +5,10 @@ import { existsSync } from 'fs';
import { readdir, rm } from 'fs/promises';
import { Job, Queue, ReservedJob, Worker } from "groupmq";
import { Redis } from 'ioredis';
import { INDEX_CACHE_DIR } from './constants.js';
import { env } from './env.js';
import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
import { repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
import { indexGitRepository } from './zoekt.js';
@ -33,16 +34,13 @@ export class IndexSyncer {
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private ctx: AppContext,
) {
this.queue = new Queue<JobPayload>({
redis,
namespace: 'index-sync-queue',
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
...(env.SOURCEBOT_LOG_LEVEL === 'debug' ? {
logger,
}: {}),
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
});
this.worker = new Worker<JobPayload>({
@ -50,8 +48,8 @@ export class IndexSyncer {
maxStalledCount: 1,
handler: this.runJob.bind(this),
concurrency: this.settings.maxRepoIndexingJobConcurrency,
...(env.SOURCEBOT_LOG_LEVEL === 'debug' ? {
logger,
...(env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true' ? {
logger: true,
}: {}),
});
@ -62,6 +60,7 @@ export class IndexSyncer {
}
public async startScheduler() {
logger.debug('Starting scheduler');
this.interval = setInterval(async () => {
await this.scheduleIndexJobs();
await this.scheduleCleanupJobs();
@ -240,7 +239,7 @@ export class IndexSyncer {
}
private async indexRepository(repo: RepoWithConnections, logger: Logger) {
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
const { path: repoPath, isReadOnly } = getRepoPath(repo);
const metadata = repoMetadataSchema.parse(repo.metadata);
@ -304,22 +303,22 @@ export class IndexSyncer {
}
logger.info(`Indexing ${repo.name} (id: ${repo.id})...`);
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx));
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.name} (id: ${repo.id}) in ${indexDuration_s}s`);
}
private async cleanupRepository(repo: Repo, logger: Logger) {
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
const { path: repoPath, isReadOnly } = getRepoPath(repo);
if (existsSync(repoPath) && !isReadOnly) {
logger.info(`Deleting repo directory ${repoPath}`);
await rm(repoPath, { recursive: true, force: true });
}
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
const files = (await readdir(this.ctx.indexPath)).filter(file => file.startsWith(shardPrefix));
const files = (await readdir(INDEX_CACHE_DIR)).filter(file => file.startsWith(shardPrefix));
for (const file of files) {
const filePath = `${this.ctx.indexPath}/${file}`;
const filePath = `${INDEX_CACHE_DIR}/${file}`;
logger.info(`Deleting shard file ${filePath}`);
await rm(filePath, { force: true });
}

View file

@ -4,10 +4,11 @@ import { createLogger } from "@sourcebot/logger";
import { Job, Queue, Worker } from 'bullmq';
import { existsSync, promises, readdirSync } from 'fs';
import { Redis } from 'ioredis';
import { INDEX_CACHE_DIR } from "./constants.js";
import { env } from './env.js';
import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from "./git.js";
import { PromClient } from './promClient.js';
import { AppContext, RepoWithConnections, Settings, repoMetadataSchema } from "./types.js";
import { RepoWithConnections, Settings, repoMetadataSchema } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from "./utils.js";
import { indexGitRepository } from "./zoekt.js";
@ -36,7 +37,6 @@ export class RepoManager {
private settings: Settings,
redis: Redis,
private promClient: PromClient,
private ctx: AppContext,
) {
// Repo indexing
this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, {
@ -162,7 +162,7 @@ export class RepoManager {
}
private async syncGitRepository(repo: RepoWithConnections, repoAlreadyInIndexingState: boolean) {
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
const { path: repoPath, isReadOnly } = getRepoPath(repo);
const metadata = repoMetadataSchema.parse(repo.metadata);
@ -225,7 +225,7 @@ export class RepoManager {
}
logger.info(`Indexing ${repo.displayName}...`);
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx));
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`);
}
@ -422,7 +422,7 @@ export class RepoManager {
});
// delete cloned repo
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
const { path: repoPath, isReadOnly } = getRepoPath(repo);
if (existsSync(repoPath) && !isReadOnly) {
logger.info(`Deleting repo directory ${repoPath}`);
await promises.rm(repoPath, { recursive: true, force: true });
@ -430,9 +430,9 @@ export class RepoManager {
// delete shards
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix));
const files = readdirSync(INDEX_CACHE_DIR).filter(file => file.startsWith(shardPrefix));
for (const file of files) {
const filePath = `${this.ctx.indexPath}/${file}`;
const filePath = `${INDEX_CACHE_DIR}/${file}`;
logger.info(`Deleting shard file ${filePath}`);
await promises.rm(filePath, { force: true });
}
@ -493,7 +493,7 @@ export class RepoManager {
return;
}
const files = readdirSync(this.ctx.indexPath);
const files = readdirSync(INDEX_CACHE_DIR);
const reposToReindex: number[] = [];
for (const repo of indexedRepos) {
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
@ -504,7 +504,7 @@ export class RepoManager {
try {
hasShards = files.some(file => file.startsWith(shardPrefix));
} catch (error) {
logger.error(`Failed to read index directory ${this.ctx.indexPath}: ${error}`);
logger.error(`Failed to read index directory ${INDEX_CACHE_DIR}: ${error}`);
continue;
}

View file

@ -2,20 +2,6 @@ import { Connection, Repo, RepoToConnection } from "@sourcebot/db";
import { Settings as SettingsSchema } from "@sourcebot/schemas/v3/index.type";
import { z } from "zod";
export type AppContext = {
/**
* Path to the repos cache directory.
*/
reposPath: string;
/**
* Path to the index cache directory;
*/
indexPath: string;
cachePath: string;
}
export type Settings = Required<SettingsSchema>;
// Structure of the `metadata` field in the `Repo` table.

View file

@ -1,11 +1,12 @@
import { Logger } from "winston";
import { AppContext, RepoAuthCredentials, RepoWithConnections } from "./types.js";
import { RepoAuthCredentials, RepoWithConnections } from "./types.js";
import path from 'path';
import { PrismaClient, Repo } from "@sourcebot/db";
import { getTokenFromConfig as getTokenFromConfigBase } from "@sourcebot/crypto";
import { BackendException, BackendError } from "@sourcebot/error";
import * as Sentry from "@sentry/node";
import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig, BitbucketConnectionConfig, AzureDevOpsConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
import { REPOS_CACHE_DIR } from "./constants.js";
export const measure = async <T>(cb: () => Promise<T>) => {
const start = Date.now();
@ -69,7 +70,7 @@ export const arraysEqualShallow = <T>(a?: readonly T[], b?: readonly T[]) => {
// @note: this function is duplicated in `packages/web/src/features/fileTree/actions.ts`.
// @todo: we should move this to a shared package.
export const getRepoPath = (repo: Repo, ctx: AppContext): { path: string, isReadOnly: boolean } => {
export const getRepoPath = (repo: Repo): { path: string, isReadOnly: boolean } => {
// If we are dealing with a local repository, then use that as the path.
// Mark as read-only since we aren't guaranteed to have write access to the local filesystem.
const cloneUrl = new URL(repo.cloneUrl);
@ -81,7 +82,7 @@ export const getRepoPath = (repo: Repo, ctx: AppContext): { path: string, isRead
}
return {
path: path.join(ctx.reposPath, repo.id.toString()),
path: path.join(REPOS_CACHE_DIR, repo.id.toString()),
isReadOnly: false,
}
}

View file

@ -1,21 +1,21 @@
import { exec } from "child_process";
import { AppContext, repoMetadataSchema, Settings } from "./types.js";
import { Repo } from "@sourcebot/db";
import { getRepoPath } from "./utils.js";
import { getShardPrefix } from "./utils.js";
import { getBranches, getTags } from "./git.js";
import micromatch from "micromatch";
import { createLogger } from "@sourcebot/logger";
import { exec } from "child_process";
import micromatch from "micromatch";
import { INDEX_CACHE_DIR } from "./constants.js";
import { getBranches, getTags } from "./git.js";
import { captureEvent } from "./posthog.js";
import { repoMetadataSchema, Settings } from "./types.js";
import { getRepoPath, getShardPrefix } from "./utils.js";
const logger = createLogger('zoekt');
export const indexGitRepository = async (repo: Repo, settings: Settings, ctx: AppContext) => {
export const indexGitRepository = async (repo: Repo, settings: Settings) => {
let revisions = [
'HEAD'
];
const { path: repoPath } = getRepoPath(repo, ctx);
const { path: repoPath } = getRepoPath(repo);
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
const metadata = repoMetadataSchema.parse(repo.metadata);
@ -60,7 +60,7 @@ export const indexGitRepository = async (repo: Repo, settings: Settings, ctx: Ap
const command = [
'zoekt-git-index',
'-allow_missing_branches',
`-index ${ctx.indexPath}`,
`-index ${INDEX_CACHE_DIR}`,
`-max_trigram_count ${settings.maxTrigramCount}`,
`-file_limit ${settings.maxFileSize}`,
`-branches "${revisions.join(',')}"`,