mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 12:25:22 +00:00
make repo garbage collection async (#211)
* add gc queue logic * fix missing switch cases for gc status * style org create form better with new staging domain * change repo rm logic to be async * simplify repo for inactive org query * add grace period for garbage collecting repos * make prom scrape interval 500ms
This commit is contained in:
parent
88fe84ebdd
commit
3f586dd927
14 changed files with 316 additions and 150 deletions
|
|
@ -1,7 +1,6 @@
|
|||
{
|
||||
"$schema": "./schemas/v2/index.json",
|
||||
"settings": {
|
||||
"autoDeleteStaleRepos": true,
|
||||
"reindexInterval": 86400000, // 24 hours
|
||||
"resyncInterval": 86400000 // 24 hours
|
||||
},
|
||||
|
|
|
|||
|
|
@ -9,7 +9,8 @@ prometheus.scrape "local_app" {
|
|||
]
|
||||
|
||||
metrics_path = "/metrics"
|
||||
scrape_interval = "10s"
|
||||
scrape_timeout = "500ms"
|
||||
scrape_interval = "500ms"
|
||||
|
||||
job_name = sys.env("GRAFANA_ENVIRONMENT")
|
||||
|
||||
|
|
|
|||
|
|
@ -5,10 +5,11 @@ import { Settings } from "./types.js";
|
|||
*/
|
||||
export const DEFAULT_SETTINGS: Settings = {
|
||||
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
|
||||
autoDeleteStaleRepos: true,
|
||||
reindexIntervalMs: 1000 * 60 * 60, // 1 hour
|
||||
resyncConnectionPollingIntervalMs: 1000,
|
||||
reindexRepoPollingIntervalMs: 1000,
|
||||
indexConcurrencyMultiple: 3,
|
||||
configSyncConcurrencyMultiple: 3,
|
||||
gcConcurrencyMultiple: 1,
|
||||
gcGracePeriodMs: 10 * 1000, // 10 seconds
|
||||
}
|
||||
|
|
@ -6,9 +6,16 @@ export class PromClient {
|
|||
private app: express.Application;
|
||||
public activeRepoIndexingJobs: Gauge<string>;
|
||||
public repoIndexingDuration: Histogram<string>;
|
||||
public repoIndexingErrors: Counter<string>;
|
||||
public repoIndexingFails: Counter<string>;
|
||||
public repoIndexingSuccesses: Counter<string>;
|
||||
public repoIndexingErrorTotal: Counter<string>;
|
||||
public repoIndexingFailTotal: Counter<string>;
|
||||
public repoIndexingSuccessTotal: Counter<string>;
|
||||
|
||||
public activeRepoGarbageCollectionJobs: Gauge<string>;
|
||||
public repoGarbageCollectionDuration: Histogram<string>;
|
||||
public repoGarbageCollectionErrorTotal: Counter<string>;
|
||||
public repoGarbageCollectionFailTotal: Counter<string>;
|
||||
public repoGarbageCollectionSuccessTotal: Counter<string>;
|
||||
|
||||
public readonly PORT = 3060;
|
||||
|
||||
constructor() {
|
||||
|
|
@ -28,26 +35,61 @@ export class PromClient {
|
|||
});
|
||||
this.registry.registerMetric(this.repoIndexingDuration);
|
||||
|
||||
this.repoIndexingErrors = new Counter({
|
||||
this.repoIndexingErrorTotal = new Counter({
|
||||
name: 'repo_indexing_errors',
|
||||
help: 'The number of repo indexing errors',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoIndexingErrors);
|
||||
this.registry.registerMetric(this.repoIndexingErrorTotal);
|
||||
|
||||
this.repoIndexingFails = new Counter({
|
||||
this.repoIndexingFailTotal = new Counter({
|
||||
name: 'repo_indexing_fails',
|
||||
help: 'The number of repo indexing fails',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoIndexingFails);
|
||||
this.registry.registerMetric(this.repoIndexingFailTotal);
|
||||
|
||||
this.repoIndexingSuccesses = new Counter({
|
||||
this.repoIndexingSuccessTotal = new Counter({
|
||||
name: 'repo_indexing_successes',
|
||||
help: 'The number of repo indexing successes',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoIndexingSuccesses);
|
||||
this.registry.registerMetric(this.repoIndexingSuccessTotal);
|
||||
|
||||
this.activeRepoGarbageCollectionJobs = new Gauge({
|
||||
name: 'active_repo_garbage_collection_jobs',
|
||||
help: 'The number of repo garbage collection jobs in progress',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.activeRepoGarbageCollectionJobs);
|
||||
|
||||
this.repoGarbageCollectionDuration = new Histogram({
|
||||
name: 'repo_garbage_collection_duration',
|
||||
help: 'The duration of repo garbage collection jobs',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionDuration);
|
||||
|
||||
this.repoGarbageCollectionErrorTotal = new Counter({
|
||||
name: 'repo_garbage_collection_errors',
|
||||
help: 'The number of repo garbage collection errors',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionErrorTotal);
|
||||
|
||||
this.repoGarbageCollectionFailTotal = new Counter({
|
||||
name: 'repo_garbage_collection_fails',
|
||||
help: 'The number of repo garbage collection fails',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionFailTotal);
|
||||
|
||||
this.repoGarbageCollectionSuccessTotal = new Counter({
|
||||
name: 'repo_garbage_collection_successes',
|
||||
help: 'The number of repo garbage collection successes',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionSuccessTotal);
|
||||
|
||||
client.collectDefaultMetrics({
|
||||
register: this.registry,
|
||||
|
|
|
|||
|
|
@ -6,27 +6,33 @@ import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig }
|
|||
import { AppContext, Settings } from "./types.js";
|
||||
import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./utils.js";
|
||||
import { cloneRepository, fetchRepository } from "./git.js";
|
||||
import { existsSync, rmSync, readdirSync } from 'fs';
|
||||
import { existsSync, rmSync, readdirSync, rm } from 'fs';
|
||||
import { indexGitRepository } from "./zoekt.js";
|
||||
import os from 'os';
|
||||
import { PromClient } from './promClient.js';
|
||||
|
||||
interface IRepoManager {
|
||||
blockingPollLoop: () => void;
|
||||
scheduleRepoIndexingBulk: (repos: RepoWithConnections[]) => Promise<void>;
|
||||
dispose: () => void;
|
||||
}
|
||||
|
||||
const QUEUE_NAME = 'repoIndexingQueue';
|
||||
const REPO_INDEXING_QUEUE = 'repoIndexingQueue';
|
||||
const REPO_GC_QUEUE = 'repoGarbageCollectionQueue';
|
||||
|
||||
type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] };
|
||||
type JobPayload = {
|
||||
type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] };
|
||||
type RepoIndexingPayload = {
|
||||
repo: RepoWithConnections,
|
||||
}
|
||||
|
||||
type RepoGarbageCollectionPayload = {
|
||||
repo: Repo,
|
||||
}
|
||||
|
||||
export class RepoManager implements IRepoManager {
|
||||
private worker: Worker;
|
||||
private queue: Queue<JobPayload>;
|
||||
private indexWorker: Worker;
|
||||
private indexQueue: Queue<RepoIndexingPayload>;
|
||||
private gcWorker: Worker;
|
||||
private gcQueue: Queue<RepoGarbageCollectionPayload>;
|
||||
private logger = createLogger('RepoManager');
|
||||
|
||||
constructor(
|
||||
|
|
@ -36,34 +42,51 @@ export class RepoManager implements IRepoManager {
|
|||
private promClient: PromClient,
|
||||
private ctx: AppContext,
|
||||
) {
|
||||
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
|
||||
const numCores = os.cpus().length;
|
||||
|
||||
// Repo indexing
|
||||
this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, {
|
||||
connection: redis,
|
||||
});
|
||||
const numCores = os.cpus().length;
|
||||
this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), {
|
||||
this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), {
|
||||
connection: redis,
|
||||
concurrency: numCores * this.settings.indexConcurrencyMultiple,
|
||||
});
|
||||
this.worker.on('completed', this.onIndexJobCompleted.bind(this));
|
||||
this.worker.on('failed', this.onIndexJobFailed.bind(this));
|
||||
this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this));
|
||||
this.indexWorker.on('failed', this.onIndexJobFailed.bind(this));
|
||||
|
||||
// Garbage collection
|
||||
this.gcQueue = new Queue<RepoGarbageCollectionPayload>(REPO_GC_QUEUE, {
|
||||
connection: redis,
|
||||
});
|
||||
this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), {
|
||||
connection: redis,
|
||||
concurrency: numCores * this.settings.gcConcurrencyMultiple,
|
||||
});
|
||||
this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this));
|
||||
this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this));
|
||||
}
|
||||
|
||||
public async blockingPollLoop() {
|
||||
while(true) {
|
||||
while (true) {
|
||||
await this.fetchAndScheduleRepoIndexing();
|
||||
await this.garbageCollectRepo();
|
||||
await this.fetchAndScheduleRepoGarbageCollection();
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs));
|
||||
}
|
||||
}
|
||||
|
||||
public async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
|
||||
///////////////////////////
|
||||
// Repo indexing
|
||||
///////////////////////////
|
||||
|
||||
private async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
|
||||
await this.db.$transaction(async (tx) => {
|
||||
await tx.repo.updateMany({
|
||||
where: { id: { in: repos.map(repo => repo.id) } },
|
||||
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }
|
||||
});
|
||||
|
||||
|
||||
const reposByOrg = repos.reduce<Record<number, RepoWithConnections[]>>((acc, repo) => {
|
||||
if (!acc[repo.orgId]) {
|
||||
acc[repo.orgId] = [];
|
||||
|
|
@ -75,10 +98,10 @@ export class RepoManager implements IRepoManager {
|
|||
for (const orgId in reposByOrg) {
|
||||
const orgRepos = reposByOrg[orgId];
|
||||
// Set priority based on number of repos (more repos = lower priority)
|
||||
// This helps prevent large orgs from overwhelming the queue
|
||||
// This helps prevent large orgs from overwhelming the indexQueue
|
||||
const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152);
|
||||
|
||||
await this.queue.addBulk(orgRepos.map(repo => ({
|
||||
await this.indexQueue.addBulk(orgRepos.map(repo => ({
|
||||
name: 'repoIndexJob',
|
||||
data: { repo },
|
||||
opts: {
|
||||
|
|
@ -86,26 +109,29 @@ export class RepoManager implements IRepoManager {
|
|||
}
|
||||
})));
|
||||
|
||||
this.logger.info(`Added ${orgRepos.length} jobs to queue for org ${orgId} with priority ${priority}`);
|
||||
this.logger.info(`Added ${orgRepos.length} jobs to indexQueue for org ${orgId} with priority ${priority}`);
|
||||
}
|
||||
|
||||
|
||||
}).catch((err: unknown) => {
|
||||
this.logger.error(`Failed to add jobs to queue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
|
||||
this.logger.error(`Failed to add jobs to indexQueue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private async fetchAndScheduleRepoIndexing() {
|
||||
const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs);
|
||||
const repos = await this.db.repo.findMany({
|
||||
where: {
|
||||
repoIndexingStatus: {
|
||||
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.INDEXING, RepoIndexingStatus.FAILED]
|
||||
in: [
|
||||
RepoIndexingStatus.NEW,
|
||||
RepoIndexingStatus.INDEXED
|
||||
]
|
||||
},
|
||||
OR: [
|
||||
{ indexedAt: null },
|
||||
{ indexedAt: { lt: thresholdDate } },
|
||||
{ repoIndexingStatus: RepoIndexingStatus.NEW }
|
||||
]
|
||||
},
|
||||
include: {
|
||||
|
|
@ -122,69 +148,6 @@ export class RepoManager implements IRepoManager {
|
|||
}
|
||||
}
|
||||
|
||||
private async garbageCollectRepo() {
|
||||
const reposWithNoConnections = await this.db.repo.findMany({
|
||||
where: {
|
||||
repoIndexingStatus: { notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.INDEXING] }, // we let the job finish for now so we don't need to worry about cancelling
|
||||
connections: {
|
||||
none: {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
|
||||
const inactiveOrgs = await this.db.org.findMany({
|
||||
where: {
|
||||
stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE,
|
||||
stripeLastUpdatedAt: {
|
||||
lt: sevenDaysAgo
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const inactiveOrgIds = inactiveOrgs.map(org => org.id);
|
||||
|
||||
const inactiveOrgRepos = await this.db.repo.findMany({
|
||||
where: {
|
||||
orgId: {
|
||||
in: inactiveOrgIds
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (inactiveOrgIds.length > 0 && inactiveOrgRepos.length > 0) {
|
||||
console.log(`Garbage collecting ${inactiveOrgs.length} inactive orgs: ${inactiveOrgIds.join(', ')}`);
|
||||
}
|
||||
|
||||
const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos];
|
||||
for (const repo of reposToDelete) {
|
||||
this.logger.info(`Garbage collecting repo: ${repo.id}`);
|
||||
|
||||
// delete cloned repo
|
||||
const repoPath = getRepoPath(repo, this.ctx);
|
||||
if(existsSync(repoPath)) {
|
||||
this.logger.info(`Deleting repo directory ${repoPath}`);
|
||||
rmSync(repoPath, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
// delete shards
|
||||
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
|
||||
const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix));
|
||||
for (const file of files) {
|
||||
const filePath = `${this.ctx.indexPath}/${file}`;
|
||||
this.logger.info(`Deleting shard file ${filePath}`);
|
||||
rmSync(filePath);
|
||||
}
|
||||
}
|
||||
|
||||
await this.db.repo.deleteMany({
|
||||
where: {
|
||||
id: {
|
||||
in: reposToDelete.map(repo => repo.id)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: do this better? ex: try using the tokens from all the connections
|
||||
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
|
||||
|
|
@ -198,7 +161,7 @@ export class RepoManager implements IRepoManager {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
let token: string | undefined;
|
||||
for (const repoConnection of repoConnections) {
|
||||
const connection = repoConnection.connection;
|
||||
|
|
@ -222,24 +185,24 @@ export class RepoManager implements IRepoManager {
|
|||
private async syncGitRepository(repo: RepoWithConnections) {
|
||||
let fetchDuration_s: number | undefined = undefined;
|
||||
let cloneDuration_s: number | undefined = undefined;
|
||||
|
||||
|
||||
const repoPath = getRepoPath(repo, this.ctx);
|
||||
const metadata = repo.metadata as Record<string, string>;
|
||||
|
||||
|
||||
if (existsSync(repoPath)) {
|
||||
this.logger.info(`Fetching ${repo.id}...`);
|
||||
|
||||
|
||||
const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
|
||||
//this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
||||
}));
|
||||
fetchDuration_s = durationMs / 1000;
|
||||
|
||||
|
||||
process.stdout.write('\n');
|
||||
this.logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`);
|
||||
|
||||
|
||||
} else {
|
||||
this.logger.info(`Cloning ${repo.id}...`);
|
||||
|
||||
|
||||
const token = await this.getTokenForRepo(repo, this.db);
|
||||
let cloneUrl = repo.cloneUrl;
|
||||
if (token) {
|
||||
|
|
@ -247,21 +210,21 @@ export class RepoManager implements IRepoManager {
|
|||
url.username = token;
|
||||
cloneUrl = url.toString();
|
||||
}
|
||||
|
||||
|
||||
const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => {
|
||||
//this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
|
||||
}));
|
||||
cloneDuration_s = durationMs / 1000;
|
||||
|
||||
|
||||
process.stdout.write('\n');
|
||||
this.logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
|
||||
}
|
||||
|
||||
|
||||
this.logger.info(`Indexing ${repo.id}...`);
|
||||
const { durationMs } = await measure(() => indexGitRepository(repo, this.ctx));
|
||||
const indexDuration_s = durationMs / 1000;
|
||||
this.logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
|
||||
|
||||
|
||||
return {
|
||||
fetchDuration_s,
|
||||
cloneDuration_s,
|
||||
|
|
@ -269,7 +232,7 @@ export class RepoManager implements IRepoManager {
|
|||
}
|
||||
}
|
||||
|
||||
private async runIndexJob(job: Job<JobPayload>) {
|
||||
private async runIndexJob(job: Job<RepoIndexingPayload>) {
|
||||
this.logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.id}`);
|
||||
const repo = job.data.repo as RepoWithConnections;
|
||||
await this.db.repo.update({
|
||||
|
|
@ -289,14 +252,14 @@ export class RepoManager implements IRepoManager {
|
|||
let stats;
|
||||
let attempts = 0;
|
||||
const maxAttempts = 3;
|
||||
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
try {
|
||||
stats = await this.syncGitRepository(repo);
|
||||
break;
|
||||
} catch (error) {
|
||||
attempts++;
|
||||
this.promClient.repoIndexingErrors.inc();
|
||||
this.promClient.repoIndexingErrorTotal.inc();
|
||||
if (attempts === maxAttempts) {
|
||||
this.logger.error(`Failed to sync repository ${repo.id} after ${maxAttempts} attempts. Error: ${error}`);
|
||||
throw error;
|
||||
|
|
@ -312,12 +275,12 @@ export class RepoManager implements IRepoManager {
|
|||
fetchDuration_s = stats!.fetchDuration_s;
|
||||
cloneDuration_s = stats!.cloneDuration_s;
|
||||
}
|
||||
|
||||
private async onIndexJobCompleted(job: Job<JobPayload>) {
|
||||
|
||||
private async onIndexJobCompleted(job: Job<RepoIndexingPayload>) {
|
||||
this.logger.info(`Repo index job ${job.id} completed`);
|
||||
this.promClient.activeRepoIndexingJobs.dec();
|
||||
this.promClient.repoIndexingSuccesses.inc();
|
||||
|
||||
this.promClient.repoIndexingSuccessTotal.inc();
|
||||
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
id: job.data.repo.id,
|
||||
|
|
@ -329,11 +292,11 @@ export class RepoManager implements IRepoManager {
|
|||
});
|
||||
}
|
||||
|
||||
private async onIndexJobFailed(job: Job<JobPayload> | undefined, err: unknown) {
|
||||
private async onIndexJobFailed(job: Job<RepoIndexingPayload> | undefined, err: unknown) {
|
||||
this.logger.info(`Repo index job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`);
|
||||
if (job) {
|
||||
this.promClient.activeRepoIndexingJobs.dec();
|
||||
this.promClient.repoIndexingFails.inc();
|
||||
this.promClient.repoIndexingFailTotal.inc();
|
||||
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
|
|
@ -346,8 +309,158 @@ export class RepoManager implements IRepoManager {
|
|||
}
|
||||
}
|
||||
|
||||
///////////////////////////
|
||||
// Repo garbage collection
|
||||
///////////////////////////
|
||||
|
||||
private async scheduleRepoGarbageCollectionBulk(repos: Repo[]) {
|
||||
await this.db.$transaction(async (tx) => {
|
||||
await tx.repo.updateMany({
|
||||
where: { id: { in: repos.map(repo => repo.id) } },
|
||||
data: { repoIndexingStatus: RepoIndexingStatus.IN_GC_QUEUE }
|
||||
});
|
||||
|
||||
await this.gcQueue.addBulk(repos.map(repo => ({
|
||||
name: 'repoGarbageCollectionJob',
|
||||
data: { repo },
|
||||
})));
|
||||
|
||||
this.logger.info(`Added ${repos.length} jobs to gcQueue`);
|
||||
});
|
||||
}
|
||||
|
||||
private async fetchAndScheduleRepoGarbageCollection() {
|
||||
////////////////////////////////////
|
||||
// Get repos with no connections
|
||||
////////////////////////////////////
|
||||
|
||||
|
||||
const thresholdDate = new Date(Date.now() - this.settings.gcGracePeriodMs);
|
||||
const reposWithNoConnections = await this.db.repo.findMany({
|
||||
where: {
|
||||
repoIndexingStatus: {
|
||||
in: [
|
||||
RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition)
|
||||
RepoIndexingStatus.FAILED,
|
||||
]
|
||||
},
|
||||
connections: {
|
||||
none: {}
|
||||
},
|
||||
OR: [
|
||||
{ indexedAt: null },
|
||||
{ indexedAt: { lt: thresholdDate } }
|
||||
]
|
||||
},
|
||||
});
|
||||
if (reposWithNoConnections.length > 0) {
|
||||
this.logger.info(`Garbage collecting ${reposWithNoConnections.length} repos with no connections: ${reposWithNoConnections.map(repo => repo.id).join(', ')}`);
|
||||
}
|
||||
|
||||
////////////////////////////////////
|
||||
// Get inactive org repos
|
||||
////////////////////////////////////
|
||||
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
|
||||
const inactiveOrgRepos = await this.db.repo.findMany({
|
||||
where: {
|
||||
org: {
|
||||
stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE,
|
||||
stripeLastUpdatedAt: {
|
||||
lt: sevenDaysAgo
|
||||
}
|
||||
},
|
||||
OR: [
|
||||
{ indexedAt: null },
|
||||
{ indexedAt: { lt: thresholdDate } }
|
||||
]
|
||||
}
|
||||
});
|
||||
|
||||
if (inactiveOrgRepos.length > 0) {
|
||||
this.logger.info(`Garbage collecting ${inactiveOrgRepos.length} inactive org repos: ${inactiveOrgRepos.map(repo => repo.id).join(', ')}`);
|
||||
}
|
||||
|
||||
const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos];
|
||||
if (reposToDelete.length > 0) {
|
||||
await this.scheduleRepoGarbageCollectionBulk(reposToDelete);
|
||||
}
|
||||
}
|
||||
|
||||
private async runGarbageCollectionJob(job: Job<RepoGarbageCollectionPayload>) {
|
||||
this.logger.info(`Running garbage collection job (id: ${job.id}) for repo ${job.data.repo.id}`);
|
||||
this.promClient.activeRepoGarbageCollectionJobs.inc();
|
||||
|
||||
const repo = job.data.repo as Repo;
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
id: repo.id
|
||||
},
|
||||
data: {
|
||||
repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTING
|
||||
}
|
||||
});
|
||||
|
||||
// delete cloned repo
|
||||
const repoPath = getRepoPath(repo, this.ctx);
|
||||
if (existsSync(repoPath)) {
|
||||
this.logger.info(`Deleting repo directory ${repoPath}`);
|
||||
await rm(repoPath, { recursive: true, force: true }, (err) => {
|
||||
if (err) {
|
||||
this.logger.error(`Failed to delete repo directory ${repoPath}: ${err}`);
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// delete shards
|
||||
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
|
||||
const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix));
|
||||
for (const file of files) {
|
||||
const filePath = `${this.ctx.indexPath}/${file}`;
|
||||
this.logger.info(`Deleting shard file ${filePath}`);
|
||||
await rm(filePath, { force: true }, (err) => {
|
||||
if (err) {
|
||||
this.logger.error(`Failed to delete shard file ${filePath}: ${err}`);
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async onGarbageCollectionJobCompleted(job: Job<RepoGarbageCollectionPayload>) {
|
||||
this.logger.info(`Garbage collection job ${job.id} completed`);
|
||||
this.promClient.activeRepoGarbageCollectionJobs.dec();
|
||||
this.promClient.repoGarbageCollectionSuccessTotal.inc();
|
||||
|
||||
await this.db.repo.delete({
|
||||
where: {
|
||||
id: job.data.repo.id
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async onGarbageCollectionJobFailed(job: Job<RepoGarbageCollectionPayload> | undefined, err: unknown) {
|
||||
this.logger.info(`Garbage collection job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`);
|
||||
|
||||
if (job) {
|
||||
this.promClient.activeRepoGarbageCollectionJobs.dec();
|
||||
this.promClient.repoGarbageCollectionFailTotal.inc();
|
||||
|
||||
await this.db.repo.update({
|
||||
where: {
|
||||
id: job.data.repo.id
|
||||
},
|
||||
data: {
|
||||
repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTION_FAILED
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async dispose() {
|
||||
this.worker.close();
|
||||
this.queue.close();
|
||||
this.indexWorker.close();
|
||||
this.indexQueue.close();
|
||||
this.gcQueue.close();
|
||||
this.gcWorker.close();
|
||||
}
|
||||
}
|
||||
|
|
@ -62,10 +62,6 @@ export type Settings = {
|
|||
* The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be inexed.
|
||||
*/
|
||||
maxFileSize: number;
|
||||
/**
|
||||
* Automatically delete stale repositories from the index. Defaults to true.
|
||||
*/
|
||||
autoDeleteStaleRepos: boolean;
|
||||
/**
|
||||
* The interval (in milliseconds) at which the indexer should re-index all repositories.
|
||||
*/
|
||||
|
|
@ -86,6 +82,14 @@ export type Settings = {
|
|||
* The multiple of the number of CPUs to use for syncing the configuration.
|
||||
*/
|
||||
configSyncConcurrencyMultiple: number;
|
||||
/**
|
||||
* The multiple of the number of CPUs to use for garbage collection.
|
||||
*/
|
||||
gcConcurrencyMultiple: number;
|
||||
/**
|
||||
* The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded
|
||||
*/
|
||||
gcGracePeriodMs: number;
|
||||
}
|
||||
|
||||
// @see : https://stackoverflow.com/a/61132308
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
-- AlterEnum
|
||||
-- This migration adds more than one value to an enum.
|
||||
-- With PostgreSQL versions 11 and earlier, this is not possible
|
||||
-- in a single migration. This can be worked around by creating
|
||||
-- multiple migrations, each migration adding only one value to
|
||||
-- the enum.
|
||||
|
||||
|
||||
ALTER TYPE "RepoIndexingStatus" ADD VALUE 'IN_GC_QUEUE';
|
||||
ALTER TYPE "RepoIndexingStatus" ADD VALUE 'GARBAGE_COLLECTING';
|
||||
ALTER TYPE "RepoIndexingStatus" ADD VALUE 'GARBAGE_COLLECTION_FAILED';
|
||||
|
|
@ -16,6 +16,9 @@ enum RepoIndexingStatus {
|
|||
INDEXING
|
||||
INDEXED
|
||||
FAILED
|
||||
IN_GC_QUEUE
|
||||
GARBAGE_COLLECTING
|
||||
GARBAGE_COLLECTION_FAILED
|
||||
}
|
||||
|
||||
enum ConnectionSyncStatus {
|
||||
|
|
|
|||
|
|
@ -595,11 +595,6 @@ const schema = {
|
|||
"default": 2097152,
|
||||
"minimum": 1
|
||||
},
|
||||
"autoDeleteStaleRepos": {
|
||||
"type": "boolean",
|
||||
"description": "Automatically delete stale repositories from the index. Defaults to true.",
|
||||
"default": true
|
||||
},
|
||||
"reindexInterval": {
|
||||
"type": "integer",
|
||||
"description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Repositories are always indexed when first added. Defaults to 1 hour (3600000 milliseconds).",
|
||||
|
|
|
|||
|
|
@ -40,10 +40,6 @@ export interface Settings {
|
|||
* The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be inexed. Defaults to 2MB (2097152 bytes).
|
||||
*/
|
||||
maxFileSize?: number;
|
||||
/**
|
||||
* Automatically delete stale repositories from the index. Defaults to true.
|
||||
*/
|
||||
autoDeleteStaleRepos?: boolean;
|
||||
/**
|
||||
* The interval (in milliseconds) at which the indexer should re-index all repositories. Repositories are always indexed when first added. Defaults to 1 hour (3600000 milliseconds).
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -35,6 +35,12 @@ export const RepoListItem = ({
|
|||
return 'Indexed';
|
||||
case RepoIndexingStatus.FAILED:
|
||||
return 'Index failed';
|
||||
case RepoIndexingStatus.IN_GC_QUEUE:
|
||||
return 'In garbage collection queue...';
|
||||
case RepoIndexingStatus.GARBAGE_COLLECTING:
|
||||
return 'Garbage collecting...';
|
||||
case RepoIndexingStatus.GARBAGE_COLLECTION_FAILED:
|
||||
return 'Garbage collection failed';
|
||||
}
|
||||
}, [status]);
|
||||
|
||||
|
|
@ -85,8 +91,12 @@ const convertIndexingStatus = (status: RepoIndexingStatus) => {
|
|||
case RepoIndexingStatus.IN_INDEX_QUEUE:
|
||||
case RepoIndexingStatus.INDEXING:
|
||||
return 'running';
|
||||
case RepoIndexingStatus.IN_GC_QUEUE:
|
||||
case RepoIndexingStatus.GARBAGE_COLLECTING:
|
||||
return "garbage-collecting"
|
||||
case RepoIndexingStatus.INDEXED:
|
||||
return 'succeeded';
|
||||
case RepoIndexingStatus.GARBAGE_COLLECTION_FAILED:
|
||||
case RepoIndexingStatus.FAILED:
|
||||
return 'failed';
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import { CircleCheckIcon, CircleXIcon } from "lucide-react";
|
|||
import { useMemo } from "react";
|
||||
import { FiLoader } from "react-icons/fi";
|
||||
|
||||
export type Status = 'waiting' | 'running' | 'succeeded' | 'failed';
|
||||
export type Status = 'waiting' | 'running' | 'succeeded' | 'failed' | 'garbage-collecting';
|
||||
|
||||
export const StatusIcon = ({
|
||||
status,
|
||||
|
|
@ -12,6 +12,7 @@ export const StatusIcon = ({
|
|||
const Icon = useMemo(() => {
|
||||
switch (status) {
|
||||
case 'waiting':
|
||||
case 'garbage-collecting':
|
||||
case 'running':
|
||||
return <FiLoader className={cn('animate-spin-slow', className)} />;
|
||||
case 'succeeded':
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ export function OrgCreateForm() {
|
|||
const { toast } = useToast();
|
||||
const router = useRouter();
|
||||
const captureEvent = useCaptureEvent();
|
||||
|
||||
|
||||
const onboardingFormSchema = z.object({
|
||||
name: z.string()
|
||||
.min(2, { message: "Organization name must be at least 3 characters long." })
|
||||
|
|
@ -73,9 +73,9 @@ export function OrgCreateForm() {
|
|||
}
|
||||
|
||||
return (
|
||||
<Card className="flex flex-col border p-12 space-y-6 bg-background w-96">
|
||||
<Card className="flex flex-col border p-8 bg-background w-full max-w-md">
|
||||
<Form {...form}>
|
||||
<form onSubmit={form.handleSubmit(onSubmit)} className="space-y-8">
|
||||
<form onSubmit={form.handleSubmit(onSubmit)} className="space-y-6">
|
||||
<FormField
|
||||
control={form.control}
|
||||
name="name"
|
||||
|
|
@ -104,21 +104,16 @@ export function OrgCreateForm() {
|
|||
<FormItem>
|
||||
<FormLabel>Organization Domain</FormLabel>
|
||||
<FormControl>
|
||||
<div className="flex items-center">
|
||||
<span className="ml-2">staging.sourcebot.dev/</span>
|
||||
<Input placeholder="aperature-labs" {...field} className="w-1/2" />
|
||||
<div className="flex items-center space-x-2 w-full">
|
||||
<div className="flex-shrink-0 text-sm text-muted-foreground">staging.sourcebot.dev/</div>
|
||||
<Input placeholder="aperture-labs" {...field} className="flex-1" />
|
||||
</div>
|
||||
</FormControl>
|
||||
<FormMessage />
|
||||
</FormItem>
|
||||
)}
|
||||
/>
|
||||
<Button
|
||||
variant="default"
|
||||
className="w-full"
|
||||
type="submit"
|
||||
disabled={isSubmitting}
|
||||
>
|
||||
<Button variant="default" className="w-full" type="submit" disabled={isSubmitting}>
|
||||
{isSubmitting && <Loader2 className="w-4 h-4 mr-2 animate-spin" />}
|
||||
Create
|
||||
</Button>
|
||||
|
|
|
|||
|
|
@ -570,11 +570,6 @@
|
|||
"default": 2097152,
|
||||
"minimum": 1
|
||||
},
|
||||
"autoDeleteStaleRepos": {
|
||||
"type": "boolean",
|
||||
"description": "Automatically delete stale repositories from the index. Defaults to true.",
|
||||
"default": true
|
||||
},
|
||||
"reindexInterval": {
|
||||
"type": "integer",
|
||||
"description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Repositories are always indexed when first added. Defaults to 1 hour (3600000 milliseconds).",
|
||||
|
|
|
|||
Loading…
Reference in a new issue