sourcebot/packages/backend/src/repoManager.ts
Brendan Kellam 39b92b9e98
v3 effort (#158)
* SQL Database (#157)

* point zoekt to v3 branch

* bump zoekt version

* Add tenant ID concept into web app and backend (#160)

* hacked together a example of using zoekt grpc api

* provide tenant id to zoekt git indexer

* update zoekt version to point to multitenant branch

* pipe tenant id through header to zoekt

* remove incorrect submodule reference and settings typo

* update zoekt commit

* remove unused yarn script

* remove unused grpc client in web server

* remove unneeded deps and improve tenant id log

* pass tenant id when creating repo in db

* add mt yarn script

* add nocheckin comment to tenant id in v2 schema

---------

Co-authored-by: bkellam <bshizzle1234@gmail.com>

* bump zoekt version

* parallelize repo indexing (#163)

* hacked together a example of using zoekt grpc api

* provide tenant id to zoekt git indexer

* update zoekt version to point to multitenant branch

* pipe tenant id through header to zoekt

* remove incorrect submodule reference and settings typo

* update zoekt commit

* remove unused yarn script

* remove unused grpc client in web server

* remove unneeded deps and improve tenant id log

* pass tenant id when creating repo in db

* add mt yarn script

* add pol of bullmq into backend

* add better error handling and concurrency setting

* spin up redis instance in dockerfile

* cleanup transaction logic when adding repos to index queue

* add NEW index status fetch condition

* move bullmq deps to backend

---------

Co-authored-by: bkellam <bshizzle1234@gmail.com>

* Authentication (#164)

* Add Org table (#167)

* Move logout button & profile picture into settings dropdown (#172)

* Multi tenancy support in config syncer (#171)

* [wip] initial mt support in config syncer

* Move logout button & profile picture into settings dropdown (#172)

* update sync status properly and fix bug with multiple config in db case

* make config path required in single tenant mode

NOTE: deleting config/repos is currently not supported in multi tenancy case. Support for this will be added in a future PR

---------

Co-authored-by: Brendan Kellam <bshizzle1234@gmail.com>

* add tenant mode support in docker container:

* Organization switching & active org management (#173)

* updated syncedAt date after config sync:

* Migrate to postgres (#174)

* spin up postgres in docker container

* get initial pol of postgres db working in docker image

* spin up postgres server in dev case

* updated syncedAt date after config sync:

* remove unnecessary port expose in docker file

* Connection creation form (#175)

* fix issue with yarn dev startup

* init (#176)

* Add `@sourcebot/schemas` package (#177)

* Connection management (#178)

* add concept of secrets (#180)

* add @sourcebot/schemas package

* migrate things to use the schemas package

* Dockerfile support

* add secret table to schema

* Add concept of connection manager

* Rename Config->Connection

* Handle job failures

* Add join table between repo and connection

* nits

* create first version of crypto package

* add crypto package as deps to others

* forgot to add package changes

* add server action for adding and listing secrets, create test page for it

* add secrets page to nav menu

* add secret to config and support fetching it in backend

* reset secret form on successful submission

* add toast feedback for secrets form

* add instructions for adding encryption key to dev instructions

* add encryption key support in docker file

* add delete secret button

* fix nits from pr review

---------

Co-authored-by: bkellam <bshizzle1234@gmail.com>

* bump zoekt version

* enforce tenancy on search and repo listing endpoints (#181)

* enforce tenancy on search and repo listing

* remove orgId from request schemas

* adds garbage collection for repos (#182)

* refactor repo indexing logic into RepoManager

* wip cleanup stale repos

* add rest of gc logic

* set status to indexing properly

* add initial logic for staging environment

* try to move encryption key env decleration in docker file to fix build issues

* switch encryption key as build arg to se if that fixes build issues

* add deployment action for staging image

* try using mac github action runners instead

* switch to using arm64 runners on arm64 build

* change workflow names to fix trigger issue

* trigger staging actions to see if it works

* fix working directory typo and pray it doesnt push to prod

* checkout v3 when deploying staging

* try to change into the staging dir manuall

* dummy commit to trigger v3 workflows to test

* update staging deploy script to match new version in main

* reference proper image:tag in staging fly config

* update staging fly config to point to ghcr

* Connection management (#183)

* add invite system and google oauth provider (#185)

* add settings page with members list

* add invite to schema and basic create form

* add invite table

* add basic invite link copy button

* add auth invite accept case

* add non auth logic

* add google oauth provider

* fix reference to header component in connections

* add google logo to google oauth

* fix web build errors

* bump staging resources

* change staging cpu to perf

* add side bar nav in settings page

* improve styling of members page

* wip adding stripe checkout button

* wip onboarding flow

* add stripe subscription id to org

* save stripe session id and add manage subscription button in settings

* properly block access to pages if user isn't in an org

* wip add paywall

* Domain support

* Domain support (#188)

* Update Makefile to include crypto package when doing a make clean

* Add default for AUTH_URL in attempt to fix build

* attempt 2

* fix attempt #3: Do not require a encrpytion key at build time

* Fix generate script race condition

* Attempt #4

* add back paywall and also add support for incrememnting seat count on invite redemption

* prevent self invite

* action button styling in settings and toast on copy

* add ability to remove member from org

* move stripe product id to env var

* add await for blocking loop in backend

* add subscription info to billing page

* handle trial case in billing info page

* add trial duration indicator to nav bar

* check if domain starts or ends with dash

* remove unused no org component

* Generate AUTH_SECRET if not provided (#189)

* remove package lock file and fix prisma dep version

* revert dep version updates

* fix yarn.lock

* add auth and membership check to fetchSubscription

* properly handle invite redeem with no valid subscription case

* change back fetch subscription to not require org membership

* add back subscription check in invite redeem page

* Add stripe billing logic (#190)

* add side bar nav in settings page

* improve styling of members page

* wip adding stripe checkout button

* wip onboarding flow

* add stripe subscription id to org

* save stripe session id and add manage subscription button in settings

* properly block access to pages if user isn't in an org

* wip add paywall

* Domain support

* add back paywall and also add support for incrememnting seat count on invite redemption

* prevent self invite

* action button styling in settings and toast on copy

* add ability to remove member from org

* move stripe product id to env var

* add await for blocking loop in backend

* add subscription info to billing page

* handle trial case in billing info page

* add trial duration indicator to nav bar

* check if domain starts or ends with dash

* remove unused no org component

* remove package lock file and fix prisma dep version

* revert dep version updates

* fix yarn.lock

* add auth and membership check to fetchSubscription

* properly handle invite redeem with no valid subscription case

* change back fetch subscription to not require org membership

* add back subscription check in invite redeem page

---------

Co-authored-by: bkellam <bshizzle1234@gmail.com>

* fix nits

* remove providers check

* fix more nits

* change stripe init to be behind function

* fix publishible stripe key handling in docker container

* enforce owner perms (#191)

* add make owner logic, and owner perms for removal, invite, and manage subscription

* add change billing email card to billing settings

* enforce owner role in action level

* remove unused hover card component

* cleanup

* add back gitlab, gitea, and gerrit support (#184)

* add non github config definitions

* refactor github config compilation to seperate file

* add gitlab config compilation

* Connection management (#183)

* wip gitlab repo sync support

* fix gitlab zoekt metadata

* add gitea support

* add gerrit support

* Connection management (#183)

* add gerrit config compilation

* Connection management (#183)

---------

Co-authored-by: Brendan Kellam <bshizzle1234@gmail.com>

* fix apos usage in redeem page

* change csrf cookie to secure not host

* Credentials provider (#192)

* email password functionality

* feedback

* cleanup org's repos and shards if it's inactive (#194)

* add stripe subscription status and webhook

* add inactive org repo cleanup logic

* mark reactivated org connections for sync

* connections qol improvements (#195)

* add client side polling to connections list

* properly fetch repo image url

* add client polling to connection management page, and add ability to sync failed connections

* Fix build with suspense boundary

* improved fix

* add retries for 429 issues (#196)

* add connection compile retry and hard repo limit

* add more retry checks

* cleanup unused change

* address feedback

* fix build errors and add index concurrency env var

* add config upsert timeout env var

* Membership settings rework (#198)

* Add refined members list

* futher progress on members settings polish

* Remove old components

* feedback

* Magic links (#199)

* wip on magic link support

* Switch to nodemailer / resend for transactional mail

* Further cleanup

* Add stylized email using react-email

* fix

* Fix build

* db performance improvements and job resilience  (#200)

* replace upsert with seperate create many and raw update many calls

* add bulk repo status update and queue addition with priority

* add support for managed redis

* add note for changing raw sql on schema change

* remove non secret token options

* fix token examples in schema

* add better visualization for connection/repo errors and warnings (#201)

* replace upsert with seperate create many and raw update many calls

* add bulk repo status update and queue addition with priority

* add support for managed redis

* add note for changing raw sql on schema change

* add error package and use BackendException in connection manager

* handle connection failure display on web app

* add warning banner for not found orgs/repos/users

* add failure handling for gerrit

* add gitea notfound warning support

* add warning icon in connections list

* style nits

* add failed repo vis in connections list

* added retry failed repo index buttons

* move nav indicators to client with polling

* fix indicator flash issue and truncate large list results

* display error nav better

* truncate failed repo list in connection list item

* fix merge error

* fix merge bug

* add connection util file [wip]

* refactor notfound fetch logic and add missing error package to dockerfile

* move repeated logic to function and add zod schema for syncStatusMetadata

* add orgid unique constraint to repo

* revert repo compile update logic to upsert loop

* log upsert stats

* [temp] disable polling everywhere (#205)

* add health check endpoint

* Refined onboarding flow (#202)

* Redeem UX pass (#204)

* add log for health check

* fix new connection complete callback route

* add cpu split logic and only wait for postgres if we're going to connec to it

* Inline secret creation (#207)

* use docker scopes to try and improve caching

* Dummy change

* remove cpu split logic

* Add some instrumentation to web

* add posthog events on various user actions (#208)

* add page view event support

* add posthog events

* nit: remove unused import

* feedback

* fix merge error

* use staging posthog papik when building staging image

* fix other merge error and build warnings

* Add invite email (#209)

* wrap posthog provider in suspense to fix build error

* add grafana alloy config and setup (#210)

* add grafana alloy config and setup

* add basic repo prom metrics

* nits in dockerfile

* remove invalid characters when auto filling domain

* add login posthog events

* remove hard coded sourcebot.app references

* make repo garbage collection async (#211)

* add gc queue logic

* fix missing switch cases for gc status

* style org create form better with new staging domain

* change repo rm logic to be async

* simplify repo for inactive org query

* add grace period for garbage collecting repos

* make prom scrape interval 500ms

* fix typo in trial card

* onboarding tweaks

* rename some prom metrics and cleanup unused

* wipe existing repo if we've picked up a killed job to ensure good state

* Connections UX pass + query optimizations (#212)

* remove git & local schemas (#213)

* skip stripe checkout for trial + fix indexing in progress UI + additional schema validation (#214)

* add additional config validation

* wip bypass stripe checkout for trial

* fix stripe trial checkout bypass

* fix indexing in progress ui on home page

* add subscription checks, more schema validation, and fix issue with complete page

* dont display if no indexed repos

* fix skipping onboard complete check

* fix build error

* add back button in onboard connection creation flow

* Add back revision support (#215)

* fix build

* Fix bug with repository snapshot

* fix share links

* fix repo rm issue, 502 page, condition on test clock

* Make login and onboarding mobile friendly

* fix ordering of quick actions

* remove error msg dump on failed repo index job, and update indexedAt field

* Add mobile unsupported splash screne

* cherry pick fix for file links

* [Cherry Pick] Syntax reference guide (#169) (#216)

* Add .env to db gitignore

* fix case where we have repos but they're all failed for repo snapshot

* /settings/secrets page (#217)

* display domain properly in org create form

* Quick action tweaks (#218)

* revamp repo page (#220)

* wip repo table

* new repo page

* add indicator for when feedback is applied in repo page

* add repo button

* fetch connection data in one query

* fix styling

* fix (#219)

* remove / keyboard shortcut hint in search bar

* prevent switching to first page on data update and truncate long repo names in repo list

* General settings + cleanup (#221)

* General settings

* Add alert to org domain change

* First attempt at sending logs to grafana

* logs wip

* add alloy logs

* wip

* [temp] comment out loki for now

* update trial card content and add events for code host selection on onboard

* reduce scraping interval to 15s

* Add prometheus metric for pending repo indexing jobs

* switch magic link to invite code (#222)

* wip magic link codes

* pipe email to email provider properly

* remove magic link data cookie after sign in

* clean up unused imports

* dont remove cookie before we use it

* rm package-lock.json

* revert yarn files to v3 state

* switch email passing from cookie to search param

* add comment for settings dropdown auth update

* remove unused middleware file

* fix build error and warnings

* fix build error with useSearchParam not wrapped in suspense

* add sentry support to backend and webapp (#223)

* add sentry to web app

* set sentry environemnt from env var

* add sentry env replace logic in docker container

* wip add backend sentry

* add sentry to backend

* move dns to env var

* remove test exception

* Fix root domain issue on onboarding

* add setup sentry cli step to github action

* login to sentry

* fix sentry login in action

* Update grafana loki endpoint

* switch source map publish to runtime in entrypoint

* catch and rethrow simplegit exceptions

* alloy nits

* fix alloy

* backend logging (#224)

* revert grafana loki config

* fix login ui nits

* fix quick actions

* fix typo in secret creation

* fix private repo clone issue for gitlab

* add repo index timeout logic

* add posthog identify call after registeration

* various changes to add terms and security info (#225)

* add terms and security to footer

* add security card

* add demo card

* fix build error

* nit fix: center 'get in touch' on security card

* Dark theme improvements (#226)

* (fix) Fixed bug with gitlab and gitea not including hostname in the repoName

* Switch to using t3-env for env-var management (#230)

* Add missing env var

* fix build

* Centralize to using a single .env.development for development workflows (#231)

* Make billing optional (#232)

* Massage environment variables from strings to numbers (#234)

* Single tenancy & auth modes (#233)

* Add docs to this repo

* dummy change

* Declarative connection configuration (#235)

* fix build

* upgrade to next 14.2.25

* Improved database DX

* migrate to yarn v4

* Use origin from header for baseUrl of emails (instead of AUTH_URL). Also removed reference to hide scrollbars

* Remove SOURCEBOT_ENCRYPTION_KEY from build arg

* Fix issue with linking default user to org in single tenant + no-auth mode

* Fix fallback tokens (#242)

* add SECURITY_CARD_ENABLED flag

* Add repository weburl (#243)

* Random fixes and improvements (#244)

* add zoekt max wall time env var

* remove empty warning in docs

* fix reference in sh docs

* add connection manager upsert timeout env var

* Declarative connection cleanup + improvements (#245)

* change contact us footer in app to point to main contact form

* PostHog event pass (#246)

* fix typo

* Add sourcebot cloud environment prop to staging workflow

* Update generated files

* remove AUTH_URL since it unused and (likely) unnecessary

* Revert "remove AUTH_URL since it unused and (likely) unnecessary"

This reverts commit 1f4a5aed22.

* cleanup GitHub action releases (#252)

* remove alloy, change auth defaul to disabled, add settings page in me dropdown

* enforce connection management perms to owner (#253)

* enforce conneciton management perms to owner

* fix formatting

* more formatting

* naming nits

* fix var name error

* change empty repo set copy if auth is disabled

* add CONTRIBUTING.md file

* hide settings in dropdown with auth isnt enabled

* handle case where gerrit weburl is just gitiles path

* Docs overhall (#251)

* remove nocheckin

* fix build error

* remove v3 trigger from deploy staging

* fix build errors round 2

* another error fix

---------

Co-authored-by: msukkari <michael.sukkarieh@mail.mcgill.ca>
2025-03-31 22:34:42 -07:00

546 lines
No EOL
21 KiB
TypeScript

import { Job, Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { createLogger } from "./logger.js";
import { Connection, PrismaClient, Repo, RepoToConnection, RepoIndexingStatus, StripeSubscriptionStatus } from "@sourcebot/db";
import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
import { AppContext, Settings, RepoMetadata } from "./types.js";
import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./utils.js";
import { cloneRepository, fetchRepository } from "./git.js";
import { existsSync, readdirSync, promises } from 'fs';
import { indexGitRepository } from "./zoekt.js";
import { PromClient } from './promClient.js';
import * as Sentry from "@sentry/node";
interface IRepoManager {
blockingPollLoop: () => void;
dispose: () => void;
}
const REPO_INDEXING_QUEUE = 'repoIndexingQueue';
const REPO_GC_QUEUE = 'repoGarbageCollectionQueue';
type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] };
type RepoIndexingPayload = {
repo: RepoWithConnections,
}
type RepoGarbageCollectionPayload = {
repo: Repo,
}
export class RepoManager implements IRepoManager {
private indexWorker: Worker;
private indexQueue: Queue<RepoIndexingPayload>;
private gcWorker: Worker;
private gcQueue: Queue<RepoGarbageCollectionPayload>;
private logger = createLogger('RepoManager');
constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private promClient: PromClient,
private ctx: AppContext,
) {
// Repo indexing
this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, {
connection: redis,
});
this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), {
connection: redis,
concurrency: this.settings.maxRepoIndexingJobConcurrency,
});
this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this));
this.indexWorker.on('failed', this.onIndexJobFailed.bind(this));
// Garbage collection
this.gcQueue = new Queue<RepoGarbageCollectionPayload>(REPO_GC_QUEUE, {
connection: redis,
});
this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), {
connection: redis,
concurrency: this.settings.maxRepoGarbageCollectionJobConcurrency,
});
this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this));
this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this));
}
public async blockingPollLoop() {
while (true) {
await this.fetchAndScheduleRepoIndexing();
await this.fetchAndScheduleRepoGarbageCollection();
await this.fetchAndScheduleRepoTimeouts();
await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs));
}
}
///////////////////////////
// Repo indexing
///////////////////////////
private async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }
});
const reposByOrg = repos.reduce<Record<number, RepoWithConnections[]>>((acc, repo) => {
if (!acc[repo.orgId]) {
acc[repo.orgId] = [];
}
acc[repo.orgId].push(repo);
return acc;
}, {});
for (const orgId in reposByOrg) {
const orgRepos = reposByOrg[orgId];
// Set priority based on number of repos (more repos = lower priority)
// This helps prevent large orgs from overwhelming the indexQueue
const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152);
await this.indexQueue.addBulk(orgRepos.map(repo => ({
name: 'repoIndexJob',
data: { repo },
opts: {
priority: priority
}
})));
// Increment pending jobs counter for each repo added
orgRepos.forEach(repo => {
this.promClient.pendingRepoIndexingJobs.inc({ repo: repo.id.toString() });
});
this.logger.info(`Added ${orgRepos.length} jobs to indexQueue for org ${orgId} with priority ${priority}`);
}
}).catch((err: unknown) => {
this.logger.error(`Failed to add jobs to indexQueue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
});
}
private async fetchAndScheduleRepoIndexing() {
const thresholdDate = new Date(Date.now() - this.settings.reindexIntervalMs);
const repos = await this.db.repo.findMany({
where: {
OR: [
// "NEW" is really a misnomer here - it just means that the repo needs to be indexed
// immediately. In most cases, this will be because the repo was just created and
// is indeed "new". However, it could also be that a "retry" was requested on a failed
// index. So, we don't want to block on the indexedAt timestamp here.
{
repoIndexingStatus: RepoIndexingStatus.NEW,
},
// When the repo has already been indexed, we only want to reindex if the reindexing
// interval has elapsed (or if the date isn't set for some reason).
{
AND: [
{ repoIndexingStatus: RepoIndexingStatus.INDEXED },
{ OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
]}
]
}
]
},
include: {
connections: {
include: {
connection: true
}
}
}
});
if (repos.length > 0) {
await this.scheduleRepoIndexingBulk(repos);
}
}
// TODO: do this better? ex: try using the tokens from all the connections
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing.
private async getTokenForRepo(repo: RepoWithConnections, db: PrismaClient) {
const repoConnections = repo.connections;
if (repoConnections.length === 0) {
this.logger.error(`Repo ${repo.id} has no connections`);
return;
}
let token: string | undefined;
for (const repoConnection of repoConnections) {
const connection = repoConnection.connection;
if (connection.connectionType !== 'github' && connection.connectionType !== 'gitlab' && connection.connectionType !== 'gitea') {
continue;
}
const config = connection.config as unknown as GithubConnectionConfig | GitlabConnectionConfig | GiteaConnectionConfig;
if (config.token) {
token = await getTokenFromConfig(config.token, connection.orgId, db, this.logger);
if (token) {
break;
}
}
}
return token;
}
private async syncGitRepository(repo: RepoWithConnections, repoAlreadyInIndexingState: boolean) {
let fetchDuration_s: number | undefined = undefined;
let cloneDuration_s: number | undefined = undefined;
const repoPath = getRepoPath(repo, this.ctx);
const metadata = repo.metadata as RepoMetadata;
// If the repo was already in the indexing state, this job was likely killed and picked up again. As a result,
// to ensure the repo state is valid, we delete the repo if it exists so we get a fresh clone
if (repoAlreadyInIndexingState && existsSync(repoPath)) {
this.logger.info(`Deleting repo directory ${repoPath} during sync because it was already in the indexing state`);
await promises.rm(repoPath, { recursive: true, force: true });
}
if (existsSync(repoPath)) {
this.logger.info(`Fetching ${repo.id}...`);
const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
this.logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
}));
fetchDuration_s = durationMs / 1000;
process.stdout.write('\n');
this.logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`);
} else {
this.logger.info(`Cloning ${repo.id}...`);
const token = await this.getTokenForRepo(repo, this.db);
const cloneUrl = new URL(repo.cloneUrl);
if (token) {
switch (repo.external_codeHostType) {
case 'gitlab':
cloneUrl.username = 'oauth2';
cloneUrl.password = token;
break;
case 'gitea':
case 'github':
default:
cloneUrl.username = token;
break;
}
}
const { durationMs } = await measure(() => cloneRepository(cloneUrl.toString(), repoPath, metadata.gitConfig, ({ method, stage, progress }) => {
this.logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
}));
cloneDuration_s = durationMs / 1000;
process.stdout.write('\n');
this.logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
}
this.logger.info(`Indexing ${repo.id}...`);
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx));
const indexDuration_s = durationMs / 1000;
this.logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);
return {
fetchDuration_s,
cloneDuration_s,
indexDuration_s,
}
}
private async runIndexJob(job: Job<RepoIndexingPayload>) {
this.logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.id}`);
const repo = job.data.repo as RepoWithConnections;
// We have to use the existing repo object to get the repoIndexingStatus because the repo object
// inside the job is unchanged from when it was added to the queue.
const existingRepo = await this.db.repo.findUnique({
where: {
id: repo.id,
},
});
if (!existingRepo) {
this.logger.error(`Repo ${repo.id} not found`);
const e = new Error(`Repo ${repo.id} not found`);
Sentry.captureException(e);
throw e;
}
const repoAlreadyInIndexingState = existingRepo.repoIndexingStatus === RepoIndexingStatus.INDEXING;
await this.db.repo.update({
where: {
id: repo.id,
},
data: {
repoIndexingStatus: RepoIndexingStatus.INDEXING,
}
});
this.promClient.activeRepoIndexingJobs.inc();
this.promClient.pendingRepoIndexingJobs.dec({ repo: repo.id.toString() });
let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;
let stats;
let attempts = 0;
const maxAttempts = 3;
while (attempts < maxAttempts) {
try {
stats = await this.syncGitRepository(repo, repoAlreadyInIndexingState);
break;
} catch (error) {
Sentry.captureException(error);
attempts++;
this.promClient.repoIndexingReattemptsTotal.inc();
if (attempts === maxAttempts) {
this.logger.error(`Failed to sync repository ${repo.id} after ${maxAttempts} attempts. Error: ${error}`);
throw error;
}
const sleepDuration = 5000 * Math.pow(2, attempts - 1);
this.logger.error(`Failed to sync repository ${repo.id}, attempt ${attempts}/${maxAttempts}. Sleeping for ${sleepDuration / 1000}s... Error: ${error}`);
await new Promise(resolve => setTimeout(resolve, sleepDuration));
}
}
indexDuration_s = stats!.indexDuration_s;
fetchDuration_s = stats!.fetchDuration_s;
cloneDuration_s = stats!.cloneDuration_s;
}
private async onIndexJobCompleted(job: Job<RepoIndexingPayload>) {
this.logger.info(`Repo index job ${job.id} completed`);
this.promClient.activeRepoIndexingJobs.dec();
this.promClient.repoIndexingSuccessTotal.inc();
await this.db.repo.update({
where: {
id: job.data.repo.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.INDEXED,
}
});
}
private async onIndexJobFailed(job: Job<RepoIndexingPayload> | undefined, err: unknown) {
this.logger.info(`Repo index job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`);
Sentry.captureException(err, {
tags: {
repoId: job?.data.repo.id,
jobId: job?.id,
queue: REPO_INDEXING_QUEUE,
}
});
if (job) {
this.promClient.activeRepoIndexingJobs.dec();
this.promClient.repoIndexingFailTotal.inc();
await this.db.repo.update({
where: {
id: job.data.repo.id,
},
data: {
repoIndexingStatus: RepoIndexingStatus.FAILED,
indexedAt: new Date(),
}
})
}
}
///////////////////////////
// Repo garbage collection
///////////////////////////
private async scheduleRepoGarbageCollectionBulk(repos: Repo[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.IN_GC_QUEUE }
});
await this.gcQueue.addBulk(repos.map(repo => ({
name: 'repoGarbageCollectionJob',
data: { repo },
})));
this.logger.info(`Added ${repos.length} jobs to gcQueue`);
});
}
private async fetchAndScheduleRepoGarbageCollection() {
////////////////////////////////////
// Get repos with no connections
////////////////////////////////////
const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs);
const reposWithNoConnections = await this.db.repo.findMany({
where: {
repoIndexingStatus: {
in: [
RepoIndexingStatus.INDEXED, // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition)
RepoIndexingStatus.FAILED,
]
},
connections: {
none: {}
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } }
]
},
});
if (reposWithNoConnections.length > 0) {
this.logger.info(`Garbage collecting ${reposWithNoConnections.length} repos with no connections: ${reposWithNoConnections.map(repo => repo.id).join(', ')}`);
}
////////////////////////////////////
// Get inactive org repos
////////////////////////////////////
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const inactiveOrgRepos = await this.db.repo.findMany({
where: {
org: {
stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE,
stripeLastUpdatedAt: {
lt: sevenDaysAgo
}
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } }
]
}
});
if (inactiveOrgRepos.length > 0) {
this.logger.info(`Garbage collecting ${inactiveOrgRepos.length} inactive org repos: ${inactiveOrgRepos.map(repo => repo.id).join(', ')}`);
}
const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos];
if (reposToDelete.length > 0) {
await this.scheduleRepoGarbageCollectionBulk(reposToDelete);
}
}
private async runGarbageCollectionJob(job: Job<RepoGarbageCollectionPayload>) {
this.logger.info(`Running garbage collection job (id: ${job.id}) for repo ${job.data.repo.id}`);
this.promClient.activeRepoGarbageCollectionJobs.inc();
const repo = job.data.repo as Repo;
await this.db.repo.update({
where: {
id: repo.id
},
data: {
repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTING
}
});
// delete cloned repo
const repoPath = getRepoPath(repo, this.ctx);
if (existsSync(repoPath)) {
this.logger.info(`Deleting repo directory ${repoPath}`);
await promises.rm(repoPath, { recursive: true, force: true });
}
// delete shards
const shardPrefix = getShardPrefix(repo.orgId, repo.id);
const files = readdirSync(this.ctx.indexPath).filter(file => file.startsWith(shardPrefix));
for (const file of files) {
const filePath = `${this.ctx.indexPath}/${file}`;
this.logger.info(`Deleting shard file ${filePath}`);
await promises.rm(filePath, { force: true });
}
}
private async onGarbageCollectionJobCompleted(job: Job<RepoGarbageCollectionPayload>) {
this.logger.info(`Garbage collection job ${job.id} completed`);
this.promClient.activeRepoGarbageCollectionJobs.dec();
this.promClient.repoGarbageCollectionSuccessTotal.inc();
await this.db.repo.delete({
where: {
id: job.data.repo.id
}
});
}
private async onGarbageCollectionJobFailed(job: Job<RepoGarbageCollectionPayload> | undefined, err: unknown) {
this.logger.info(`Garbage collection job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`);
Sentry.captureException(err, {
tags: {
repoId: job?.data.repo.id,
jobId: job?.id,
queue: REPO_GC_QUEUE,
}
});
if (job) {
this.promClient.activeRepoGarbageCollectionJobs.dec();
this.promClient.repoGarbageCollectionFailTotal.inc();
await this.db.repo.update({
where: {
id: job.data.repo.id
},
data: {
repoIndexingStatus: RepoIndexingStatus.GARBAGE_COLLECTION_FAILED
}
});
}
}
private async fetchAndScheduleRepoTimeouts() {
const repos = await this.db.repo.findMany({
where: {
repoIndexingStatus: RepoIndexingStatus.INDEXING,
updatedAt: {
lt: new Date(Date.now() - this.settings.repoIndexTimeoutMs)
}
}
});
if (repos.length > 0) {
this.logger.info(`Scheduling ${repos.length} repo timeouts`);
await this.scheduleRepoTimeoutsBulk(repos);
}
}
private async scheduleRepoTimeoutsBulk(repos: Repo[]) {
await this.db.$transaction(async (tx) => {
await tx.repo.updateMany({
where: { id: { in: repos.map(repo => repo.id) } },
data: { repoIndexingStatus: RepoIndexingStatus.FAILED }
});
});
}
public async dispose() {
this.indexWorker.close();
this.indexQueue.close();
this.gcQueue.close();
this.gcWorker.close();
}
}