mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 20:35:24 +00:00
adds garbage collection for repos (#182)
* refactor repo indexing logic into RepoManager * wip cleanup stale repos * add rest of gc logic * set status to indexing properly
This commit is contained in:
parent
75d4189f25
commit
dffbbae41d
7 changed files with 310 additions and 207 deletions
|
|
@ -1,6 +1,6 @@
|
|||
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
|
||||
import { Job, Queue, Worker } from 'bullmq';
|
||||
import { AppContext, Settings, WithRequired } from "./types.js";
|
||||
import { Settings, WithRequired } from "./types.js";
|
||||
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
|
||||
import { createLogger } from "./logger.js";
|
||||
import os from 'os';
|
||||
|
|
@ -10,6 +10,7 @@ import { getGitHubReposFromConfig } from "./github.js";
|
|||
|
||||
interface IConnectionManager {
|
||||
scheduleConnectionSync: (connection: Connection) => Promise<void>;
|
||||
registerPollingCallback: () => void;
|
||||
dispose: () => void;
|
||||
}
|
||||
|
||||
|
|
@ -28,14 +29,13 @@ export class ConnectionManager implements IConnectionManager {
|
|||
|
||||
constructor(
|
||||
private db: PrismaClient,
|
||||
settings: Settings,
|
||||
private settings: Settings,
|
||||
redis: Redis,
|
||||
private context: AppContext,
|
||||
) {
|
||||
const numCores = os.cpus().length;
|
||||
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
|
||||
connection: redis,
|
||||
concurrency: numCores * settings.configSyncConcurrencyMultiple,
|
||||
concurrency: numCores * this.settings.configSyncConcurrencyMultiple,
|
||||
});
|
||||
this.worker.on('completed', this.onSyncJobCompleted.bind(this));
|
||||
this.worker.on('failed', this.onSyncJobFailed.bind(this));
|
||||
|
|
@ -61,6 +61,19 @@ export class ConnectionManager implements IConnectionManager {
|
|||
});
|
||||
}
|
||||
|
||||
public async registerPollingCallback() {
|
||||
setInterval(async () => {
|
||||
const connections = await this.db.connection.findMany({
|
||||
where: {
|
||||
syncStatus: ConnectionSyncStatus.SYNC_NEEDED,
|
||||
}
|
||||
});
|
||||
for (const connection of connections) {
|
||||
await this.scheduleConnectionSync(connection);
|
||||
}
|
||||
}, this.settings.resyncConnectionPollingIntervalMs);
|
||||
}
|
||||
|
||||
private async runSyncJob(job: Job<JobPayload>) {
|
||||
const { config, orgId } = job.data;
|
||||
// @note: We aren't actually doing anything with this atm.
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ export const DEFAULT_SETTINGS: Settings = {
|
|||
autoDeleteStaleRepos: true,
|
||||
reindexIntervalMs: 1000 * 60,
|
||||
resyncConnectionPollingIntervalMs: 1000,
|
||||
reindexRepoPollingInternvalMs: 1000,
|
||||
indexConcurrencyMultiple: 3,
|
||||
configSyncConcurrencyMultiple: 3,
|
||||
}
|
||||
|
|
@ -1,120 +1,14 @@
|
|||
import { ConnectionSyncStatus, PrismaClient, Repo, RepoIndexingStatus, RepoToConnection, Connection } from '@sourcebot/db';
|
||||
import { existsSync } from 'fs';
|
||||
import { cloneRepository, fetchRepository } from "./git.js";
|
||||
import { PrismaClient } from '@sourcebot/db';
|
||||
import { createLogger } from "./logger.js";
|
||||
import { captureEvent } from "./posthog.js";
|
||||
import { AppContext } from "./types.js";
|
||||
import { getRepoPath, getTokenFromConfig, measure } from "./utils.js";
|
||||
import { indexGitRepository } from "./zoekt.js";
|
||||
import { DEFAULT_SETTINGS } from './constants.js';
|
||||
import { Queue, Worker, Job } from 'bullmq';
|
||||
import { Redis } from 'ioredis';
|
||||
import * as os from 'os';
|
||||
import { ConnectionManager } from './connectionManager.js';
|
||||
import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
|
||||
import { RepoManager } from './repoManager.js';
|
||||
|
||||
const logger = createLogger('main');
|
||||
|
||||
type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] };
|
||||
|
||||
// TODO: do this better? ex: try using the tokens from all the connections
|
||||
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
|
||||
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
|
||||
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
|
||||
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing.
|
||||
const getTokenForRepo = async (repo: RepoWithConnections, db: PrismaClient) => {
|
||||
const repoConnections = repo.connections;
|
||||
if (repoConnections.length === 0) {
|
||||
logger.error(`Repo ${repo.id} has no connections`);
|
||||
return;
|
||||
}
|
||||
|
||||
let token: string | undefined;
|
||||
for (const repoConnection of repoConnections) {
|
||||
const connection = repoConnection.connection;
|
||||
const config = connection.config as unknown as ConnectionConfig;
|
||||
if (config.token) {
|
||||
token = await getTokenFromConfig(config.token, connection.orgId, db);
|
||||
if (token) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
|
||||
const syncGitRepository = async (repo: RepoWithConnections, ctx: AppContext, db: PrismaClient) => {
|
||||
let fetchDuration_s: number | undefined = undefined;
|
||||
let cloneDuration_s: number | undefined = undefined;
|
||||
|
||||
const repoPath = getRepoPath(repo, ctx);
|
||||
const metadata = repo.metadata as Record<string, string>;
|
||||
|
||||
if (existsSync(repoPath)) {
|
||||
logger.info(`Fetching ${repo.id}...`);
|
||||
|
||||
const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
|
||||
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
||||
}));
|
||||
fetchDuration_s = durationMs / 1000;
|
||||
|
||||
process.stdout.write('\n');
|
||||
logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`);
|
||||
|
||||
} else {
|
||||
logger.info(`Cloning ${repo.id}...`);
|
||||
|
||||
const token = await getTokenForRepo(repo, db);
|
||||
let cloneUrl = repo.cloneUrl;
|
||||
if (token) {
|
||||
const url = new URL(cloneUrl);
|
||||
url.username = token;
|
||||
cloneUrl = url.toString();
|
||||
}
|
||||
|
||||
const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => {
|
||||
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
||||
}));
|
||||
cloneDuration_s = durationMs / 1000;
|
||||
|
||||
process.stdout.write('\n');
|
||||
logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
|
||||
}
|
||||
|
||||
logger.info(`Indexing ${repo.id}...`);
|
||||
const { durationMs } = await measure(() => indexGitRepository(repo, ctx));
|
||||
const indexDuration_s = durationMs / 1000;
|
||||
logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
|
||||
|
||||
return {
|
||||
fetchDuration_s,
|
||||
cloneDuration_s,
|
||||
indexDuration_s,
|
||||
}
|
||||
}
|
||||
|
||||
async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
|
||||
for (const repo of repos) {
|
||||
await db.$transaction(async (tx) => {
|
||||
await tx.repo.update({
|
||||
where: { id: repo.id },
|
||||
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
|
||||
});
|
||||
|
||||
// Add the job to the queue
|
||||
await queue.add('indexJob', repo);
|
||||
logger.info(`Added job to queue for repo ${repo.id}`);
|
||||
}).catch((err: unknown) => {
|
||||
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export const main = async (db: PrismaClient, context: AppContext) => {
|
||||
/////////////////////////////
|
||||
// Init Redis
|
||||
/////////////////////////////
|
||||
const redis = new Redis({
|
||||
host: 'localhost',
|
||||
port: 6379,
|
||||
|
|
@ -128,99 +22,9 @@ export const main = async (db: PrismaClient, context: AppContext) => {
|
|||
process.exit(1);
|
||||
});
|
||||
|
||||
const connectionManager = new ConnectionManager(db, DEFAULT_SETTINGS, redis, context);
|
||||
setInterval(async () => {
|
||||
const connections = await db.connection.findMany({
|
||||
where: {
|
||||
syncStatus: ConnectionSyncStatus.SYNC_NEEDED,
|
||||
}
|
||||
});
|
||||
for (const connection of connections) {
|
||||
await connectionManager.scheduleConnectionSync(connection);
|
||||
}
|
||||
}, DEFAULT_SETTINGS.resyncConnectionPollingIntervalMs);
|
||||
|
||||
/////////////////////////
|
||||
// Setup repo indexing
|
||||
/////////////////////////
|
||||
const indexQueue = new Queue('indexQueue');
|
||||
const connectionManager = new ConnectionManager(db, DEFAULT_SETTINGS, redis);
|
||||
connectionManager.registerPollingCallback();
|
||||
|
||||
const numCores = os.cpus().length;
|
||||
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
|
||||
logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`);
|
||||
const worker = new Worker('indexQueue', async (job: Job) => {
|
||||
const repo = job.data as RepoWithConnections;
|
||||
|
||||
let indexDuration_s: number | undefined;
|
||||
let fetchDuration_s: number | undefined;
|
||||
let cloneDuration_s: number | undefined;
|
||||
|
||||
const stats = await syncGitRepository(repo, context, db);
|
||||
indexDuration_s = stats.indexDuration_s;
|
||||
fetchDuration_s = stats.fetchDuration_s;
|
||||
cloneDuration_s = stats.cloneDuration_s;
|
||||
|
||||
captureEvent('repo_synced', {
|
||||
vcs: 'git',
|
||||
codeHost: repo.external_codeHostType,
|
||||
indexDuration_s,
|
||||
fetchDuration_s,
|
||||
cloneDuration_s,
|
||||
});
|
||||
|
||||
await db.repo.update({
|
||||
where: {
|
||||
id: repo.id,
|
||||
},
|
||||
data: {
|
||||
indexedAt: new Date(),
|
||||
repoIndexingStatus: RepoIndexingStatus.INDEXED,
|
||||
}
|
||||
});
|
||||
}, { connection: redis, concurrency: numWorkers });
|
||||
|
||||
worker.on('completed', (job: Job) => {
|
||||
logger.info(`Job ${job.id} completed`);
|
||||
});
|
||||
worker.on('failed', async (job: Job | undefined, err: unknown) => {
|
||||
logger.info(`Job failed with error: ${err}`);
|
||||
if (job) {
|
||||
await db.repo.update({
|
||||
where: {
|
||||
id: job.data.id,
|
||||
},
|
||||
data: {
|
||||
repoIndexingStatus: RepoIndexingStatus.FAILED,
|
||||
}
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
// Repo indexing loop
|
||||
while (true) {
|
||||
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
|
||||
const repos = await db.repo.findMany({
|
||||
where: {
|
||||
repoIndexingStatus: {
|
||||
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
|
||||
},
|
||||
OR: [
|
||||
{ indexedAt: null },
|
||||
{ indexedAt: { lt: thresholdDate } },
|
||||
{ repoIndexingStatus: RepoIndexingStatus.NEW }
|
||||
]
|
||||
},
|
||||
include: {
|
||||
connections: {
|
||||
include: {
|
||||
connection: true
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
addReposToQueue(db, indexQueue, repos);
|
||||
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
}
|
||||
const repoManager = new RepoManager(db, DEFAULT_SETTINGS, redis, context);
|
||||
repoManager.blockingPollLoop();
|
||||
}
|
||||
|
|
|
|||
276
packages/backend/src/repoManager.ts
Normal file
276
packages/backend/src/repoManager.ts
Normal file
|
|
@ -0,0 +1,276 @@
|
|||
import { Job, Queue, Worker } from 'bullmq';
|
||||
import { Redis } from 'ioredis';
|
||||
import { createLogger } from "./logger.js";
|
||||
import { Connection, PrismaClient, Repo, RepoToConnection, RepoIndexingStatus } from "@sourcebot/db";
|
||||
import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
|
||||
import { AppContext, Settings } from "./types.js";
|
||||
import { captureEvent } from "./posthog.js";
|
||||
import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./utils.js";
|
||||
import { cloneRepository, fetchRepository } from "./git.js";
|
||||
import { existsSync, rmSync, readdirSync } from 'fs';
|
||||
import { indexGitRepository } from "./zoekt.js";
|
||||
import os from 'os';
|
||||
|
||||
interface IRepoManager {
|
||||
blockingPollLoop: () => void;
|
||||
scheduleRepoIndexing: (repo: RepoWithConnections) => Promise<void>;
|
||||
dispose: () => void;
|
||||
}
|
||||
|
||||
const QUEUE_NAME = 'repoIndexingQueue';
|
||||
|
||||
type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] };
|
||||
type JobPayload = {
|
||||
repo: RepoWithConnections,
|
||||
}
|
||||
|
||||
export class RepoManager implements IRepoManager {
|
||||
private queue = new Queue<JobPayload>(QUEUE_NAME);
|
||||
private worker: Worker;
|
||||
private logger = createLogger('RepoManager');
|
||||
|
||||
constructor(
|
||||
private db: PrismaClient,
|
||||
private settings: Settings,
|
||||
redis: Redis,
|
||||
private ctx: AppContext,
|
||||
) {
|
||||
const numCores = os.cpus().length;
|
||||
this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), {
|
||||
connection: redis,
|
||||
concurrency: numCores * this.settings.indexConcurrencyMultiple,
|
||||
});
|
||||
this.worker.on('completed', this.onIndexJobCompleted.bind(this));
|
||||
this.worker.on('failed', this.onIndexJobFailed.bind(this));
|
||||
}
|
||||
|
||||
public async blockingPollLoop() {
|
||||
while(true) {
|
||||
this.fetchAndScheduleRepoIndexing();
|
||||
this.garbageCollectRepo();
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingInternvalMs));
|
||||
}
|
||||
}
|
||||
|
||||
public async scheduleRepoIndexing(repo: RepoWithConnections) {
|
||||
await this.db.$transaction(async (tx) => {
|
||||
await tx.repo.update({
|
||||
where: { id: repo.id },
|
||||
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
|
||||
});
|
||||
|
||||
await this.queue.add('repoIndexJob', {
|
||||
repo
|
||||
});
|
||||
this.logger.info(`Added job to queue for repo ${repo.id}`);
|
||||
}).catch((err: unknown) => {
|
||||
this.logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
|
||||
});
|
||||
}
|
||||
|
||||
private async fetchAndScheduleRepoIndexing() {
|
||||
const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs);
|
||||
const repos = await this.db.repo.findMany({
|
||||
where: {
|
||||
repoIndexingStatus: {
|
||||
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
|
||||
},
|
||||
OR: [
|
||||
{ indexedAt: null },
|
||||
{ indexedAt: { lt: thresholdDate } },
|
||||
{ repoIndexingStatus: RepoIndexingStatus.NEW }
|
||||
]
|
||||
},
|
||||
include: {
|
||||
connections: {
|
||||
include: {
|
||||
connection: true
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (const repo of repos) {
|
||||
await this.scheduleRepoIndexing(repo);
|
||||
}
|
||||
}
|
||||
|
||||
private async garbageCollectRepo() {
|
||||
const reposWithNoConnections = await this.db.repo.findMany({
|
||||
where: {
|
||||
repoIndexingStatus: { notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.INDEXING] }, // we let the job finish for now so we don't need to worry about cancelling
|
||||
connections: {
|
||||
none: {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (const repo of reposWithNoConnections) {
|
||||
this.logger.info(`Garbage collecting repo with no connections: ${repo.id}`);
|
||||
|
||||
// delete cloned repo
|
||||
const repoPath = getRepoPath(repo, this.ctx);
|
||||
if(existsSync(repoPath)) {
|
||||
this.logger.info(`Deleting repo directory ${repoPath}`);
|
||||
rmSync(repoPath, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
// delete shards
|
||||
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
|
||||
const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix));
|
||||
for (const file of files) {
|
||||
const filePath = `${this.ctx.indexPath}/${file}`;
|
||||
this.logger.info(`Deleting shard file ${filePath}`);
|
||||
rmSync(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
await this.db.repo.deleteMany({
|
||||
where: {
|
||||
id: {
|
||||
in: reposWithNoConnections.map(repo => repo.id)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: do this better? ex: try using the tokens from all the connections
|
||||
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
|
||||
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
|
||||
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
|
||||
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing.
|
||||
private async getTokenForRepo(repo: RepoWithConnections, db: PrismaClient) {
|
||||
const repoConnections = repo.connections;
|
||||
if (repoConnections.length === 0) {
|
||||
this.logger.error(`Repo ${repo.id} has no connections`);
|
||||
return;
|
||||
}
|
||||
|
||||
let token: string | undefined;
|
||||
for (const repoConnection of repoConnections) {
|
||||
const connection = repoConnection.connection;
|
||||
const config = connection.config as unknown as ConnectionConfig;
|
||||
if (config.token) {
|
||||
token = await getTokenFromConfig(config.token, connection.orgId, db);
|
||||
if (token) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
|
||||
private async syncGitRepository(repo: RepoWithConnections) {
|
||||
let fetchDuration_s: number | undefined = undefined;
|
||||
let cloneDuration_s: number | undefined = undefined;
|
||||
|
||||
const repoPath = getRepoPath(repo, this.ctx);
|
||||
const metadata = repo.metadata as Record<string, string>;
|
||||
|
||||
if (existsSync(repoPath)) {
|
||||
this.logger.info(`Fetching ${repo.id}...`);
|
||||
|
||||
const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
|
||||
this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
||||
}));
|
||||
fetchDuration_s = durationMs / 1000;
|
||||
|
||||
process.stdout.write('\n');
|
||||
this.logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`);
|
||||
|
||||
} else {
|
||||
this.logger.info(`Cloning ${repo.id}...`);
|
||||
|
||||
const token = await this.getTokenForRepo(repo, this.db);
|
||||
let cloneUrl = repo.cloneUrl;
|
||||
if (token) {
|
||||
const url = new URL(cloneUrl);
|
||||
url.username = token;
|
||||
cloneUrl = url.toString();
|
||||
}
|
||||
|
||||
const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => {
|
||||
this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
||||
}));
|
||||
cloneDuration_s = durationMs / 1000;
|
||||
|
||||
process.stdout.write('\n');
|
||||
this.logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
|
||||
}
|
||||
|
||||
this.logger.info(`Indexing ${repo.id}...`);
|
||||
const { durationMs } = await measure(() => indexGitRepository(repo, this.ctx));
|
||||
const indexDuration_s = durationMs / 1000;
|
||||
this.logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
|
||||
|
||||
return {
|
||||
fetchDuration_s,
|
||||
cloneDuration_s,
|
||||
indexDuration_s,
|
||||
}
|
||||
}
|
||||
|
||||
private async runIndexJob(job: Job<JobPayload>) {
|
||||
const repo = job.data.repo as RepoWithConnections;
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
id: repo.id,
|
||||
},
|
||||
data: {
|
||||
repoIndexingStatus: RepoIndexingStatus.INDEXING,
|
||||
}
|
||||
});
|
||||
|
||||
let indexDuration_s: number | undefined;
|
||||
let fetchDuration_s: number | undefined;
|
||||
let cloneDuration_s: number | undefined;
|
||||
|
||||
const stats = await this.syncGitRepository(repo);
|
||||
indexDuration_s = stats.indexDuration_s;
|
||||
fetchDuration_s = stats.fetchDuration_s;
|
||||
cloneDuration_s = stats.cloneDuration_s;
|
||||
|
||||
captureEvent('repo_synced', {
|
||||
vcs: 'git',
|
||||
codeHost: repo.external_codeHostType,
|
||||
indexDuration_s,
|
||||
fetchDuration_s,
|
||||
cloneDuration_s,
|
||||
});
|
||||
}
|
||||
|
||||
private async onIndexJobCompleted(job: Job<JobPayload>) {
|
||||
this.logger.info(`Repo index job ${job.id} completed`);
|
||||
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
id: job.data.repo.id,
|
||||
},
|
||||
data: {
|
||||
indexedAt: new Date(),
|
||||
repoIndexingStatus: RepoIndexingStatus.INDEXED,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async onIndexJobFailed(job: Job<JobPayload> | undefined, err: unknown) {
|
||||
this.logger.info(`Repo index job failed with error: ${err}`);
|
||||
if (job) {
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
id: job.data.repo.id,
|
||||
},
|
||||
data: {
|
||||
repoIndexingStatus: RepoIndexingStatus.FAILED,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
public async dispose() {
|
||||
this.worker.close();
|
||||
this.queue.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -74,6 +74,10 @@ export type Settings = {
|
|||
* The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced.
|
||||
*/
|
||||
resyncConnectionPollingIntervalMs: number;
|
||||
/**
|
||||
* The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed.
|
||||
*/
|
||||
reindexRepoPollingInternvalMs: number;
|
||||
/**
|
||||
* The multiple of the number of CPUs to use for indexing.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -159,4 +159,8 @@ export const arraysEqualShallow = <T>(a?: readonly T[], b?: readonly T[]) => {
|
|||
|
||||
export const getRepoPath = (repo: Repo, ctx: AppContext) => {
|
||||
return path.join(ctx.reposPath, repo.id.toString());
|
||||
}
|
||||
|
||||
export const getShardPrefix = (orgId: number, repoId: number) => {
|
||||
return `${orgId}_${repoId}`;
|
||||
}
|
||||
|
|
@ -3,6 +3,7 @@ import { AppContext, LocalRepository, Settings } from "./types.js";
|
|||
import { Repo } from "@sourcebot/db";
|
||||
import { getRepoPath } from "./utils.js";
|
||||
import { DEFAULT_SETTINGS } from "./constants.js";
|
||||
import { getShardPrefix } from "./utils.js";
|
||||
|
||||
const ALWAYS_EXCLUDED_DIRS = ['.git', '.hg', '.svn'];
|
||||
|
||||
|
|
@ -11,7 +12,7 @@ export const indexGitRepository = async (repo: Repo, ctx: AppContext) => {
|
|||
'HEAD'
|
||||
];
|
||||
|
||||
const shardPrefix = `${repo.orgId}_${repo.id}`;
|
||||
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
|
||||
const repoPath = getRepoPath(repo, ctx);
|
||||
const command = `zoekt-git-index -allow_missing_branches -index ${ctx.indexPath} -file_limit ${DEFAULT_SETTINGS.maxFileSize} -branches ${revisions.join(',')} -tenant_id ${repo.orgId} -shard_prefix ${shardPrefix} ${repoPath}`;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue