2025-01-29 17:05:20 +00:00
import { Job , Queue , Worker } from 'bullmq' ;
import { Redis } from 'ioredis' ;
import { createLogger } from "./logger.js" ;
2025-02-15 17:58:17 +00:00
import { Connection , PrismaClient , Repo , RepoToConnection , RepoIndexingStatus , StripeSubscriptionStatus } from "@sourcebot/db" ;
2025-02-14 18:58:53 +00:00
import { GithubConnectionConfig , GitlabConnectionConfig , GiteaConnectionConfig } from '@sourcebot/schemas/v3/connection.type' ;
2025-02-27 17:39:55 +00:00
import { AppContext , Settings , RepoMetadata } from "./types.js" ;
2025-01-29 17:05:20 +00:00
import { getRepoPath , getTokenFromConfig , measure , getShardPrefix } from "./utils.js" ;
import { cloneRepository , fetchRepository } from "./git.js" ;
2025-02-27 21:11:26 +00:00
import { existsSync , readdirSync , promises } from 'fs' ;
2025-01-29 17:05:20 +00:00
import { indexGitRepository } from "./zoekt.js" ;
import os from 'os' ;
2025-02-25 05:11:28 +00:00
import { PromClient } from './promClient.js' ;
2025-03-02 03:21:17 +00:00
import * as Sentry from "@sentry/node" ;
2025-01-29 17:05:20 +00:00
interface IRepoManager {
blockingPollLoop : ( ) = > void ;
dispose : ( ) = > void ;
}
2025-02-25 23:49:09 +00:00
const REPO_INDEXING_QUEUE = 'repoIndexingQueue' ;
const REPO_GC_QUEUE = 'repoGarbageCollectionQueue' ;
2025-01-29 17:05:20 +00:00
2025-02-25 23:49:09 +00:00
type RepoWithConnections = Repo & { connections : ( RepoToConnection & { connection : Connection } ) [ ] } ;
type RepoIndexingPayload = {
2025-01-29 17:05:20 +00:00
repo : RepoWithConnections ,
}
2025-02-25 23:49:09 +00:00
type RepoGarbageCollectionPayload = {
repo : Repo ,
}
2025-01-29 17:05:20 +00:00
export class RepoManager implements IRepoManager {
2025-02-25 23:49:09 +00:00
private indexWorker : Worker ;
private indexQueue : Queue < RepoIndexingPayload > ;
private gcWorker : Worker ;
private gcQueue : Queue < RepoGarbageCollectionPayload > ;
2025-01-29 17:05:20 +00:00
private logger = createLogger ( 'RepoManager' ) ;
constructor (
private db : PrismaClient ,
private settings : Settings ,
redis : Redis ,
2025-02-25 05:11:28 +00:00
private promClient : PromClient ,
2025-01-29 17:05:20 +00:00
private ctx : AppContext ,
) {
2025-02-25 23:49:09 +00:00
const numCores = os . cpus ( ) . length ;
// Repo indexing
this . indexQueue = new Queue < RepoIndexingPayload > ( REPO_INDEXING_QUEUE , {
2025-02-19 18:26:13 +00:00
connection : redis ,
} ) ;
2025-02-25 23:49:09 +00:00
this . indexWorker = new Worker ( REPO_INDEXING_QUEUE , this . runIndexJob . bind ( this ) , {
2025-01-29 17:05:20 +00:00
connection : redis ,
concurrency : numCores * this . settings . indexConcurrencyMultiple ,
} ) ;
2025-02-25 23:49:09 +00:00
this . indexWorker . on ( 'completed' , this . onIndexJobCompleted . bind ( this ) ) ;
this . indexWorker . on ( 'failed' , this . onIndexJobFailed . bind ( this ) ) ;
// Garbage collection
this . gcQueue = new Queue < RepoGarbageCollectionPayload > ( REPO_GC_QUEUE , {
connection : redis ,
} ) ;
this . gcWorker = new Worker ( REPO_GC_QUEUE , this . runGarbageCollectionJob . bind ( this ) , {
connection : redis ,
concurrency : numCores * this . settings . gcConcurrencyMultiple ,
} ) ;
this . gcWorker . on ( 'completed' , this . onGarbageCollectionJobCompleted . bind ( this ) ) ;
this . gcWorker . on ( 'failed' , this . onGarbageCollectionJobFailed . bind ( this ) ) ;
2025-01-29 17:05:20 +00:00
}
public async blockingPollLoop() {
2025-02-25 23:49:09 +00:00
while ( true ) {
2025-02-19 18:26:13 +00:00
await this . fetchAndScheduleRepoIndexing ( ) ;
2025-02-25 23:49:09 +00:00
await this . fetchAndScheduleRepoGarbageCollection ( ) ;
2025-03-03 18:31:56 +00:00
await this . fetchAndScheduleRepoTimeouts ( ) ;
2025-01-29 17:05:20 +00:00
2025-02-15 18:00:44 +00:00
await new Promise ( resolve = > setTimeout ( resolve , this . settings . reindexRepoPollingIntervalMs ) ) ;
2025-01-29 17:05:20 +00:00
}
}
2025-02-25 23:49:09 +00:00
///////////////////////////
// Repo indexing
///////////////////////////
private async scheduleRepoIndexingBulk ( repos : RepoWithConnections [ ] ) {
2025-01-29 17:05:20 +00:00
await this . db . $transaction ( async ( tx ) = > {
2025-02-19 18:26:13 +00:00
await tx . repo . updateMany ( {
where : { id : { in : repos . map ( repo = > repo . id ) } } ,
data : { repoIndexingStatus : RepoIndexingStatus.IN_INDEX_QUEUE }
2025-01-29 17:05:20 +00:00
} ) ;
2025-02-25 23:49:09 +00:00
2025-02-19 18:26:13 +00:00
const reposByOrg = repos . reduce < Record < number , RepoWithConnections [ ] > > ( ( acc , repo ) = > {
if ( ! acc [ repo . orgId ] ) {
acc [ repo . orgId ] = [ ] ;
}
acc [ repo . orgId ] . push ( repo ) ;
return acc ;
} , { } ) ;
for ( const orgId in reposByOrg ) {
const orgRepos = reposByOrg [ orgId ] ;
// Set priority based on number of repos (more repos = lower priority)
2025-02-25 23:49:09 +00:00
// This helps prevent large orgs from overwhelming the indexQueue
2025-02-19 18:26:13 +00:00
const priority = Math . min ( Math . ceil ( orgRepos . length / 10 ) , 2097152 ) ;
2025-02-25 23:49:09 +00:00
await this . indexQueue . addBulk ( orgRepos . map ( repo = > ( {
2025-02-19 18:26:13 +00:00
name : 'repoIndexJob' ,
data : { repo } ,
opts : {
priority : priority
}
} ) ) ) ;
2025-03-01 23:50:09 +00:00
// Increment pending jobs counter for each repo added
orgRepos . forEach ( repo = > {
this . promClient . pendingRepoIndexingJobs . inc ( { repo : repo.id.toString ( ) } ) ;
} ) ;
2025-02-25 23:49:09 +00:00
this . logger . info ( ` Added ${ orgRepos . length } jobs to indexQueue for org ${ orgId } with priority ${ priority } ` ) ;
2025-02-19 18:26:13 +00:00
}
2025-01-29 17:05:20 +00:00
} ) . catch ( ( err : unknown ) = > {
2025-02-25 23:49:09 +00:00
this . logger . error ( ` Failed to add jobs to indexQueue for repos ${ repos . map ( repo = > repo . id ) . join ( ', ' ) } : ${ err } ` ) ;
2025-01-29 17:05:20 +00:00
} ) ;
}
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
private async fetchAndScheduleRepoIndexing() {
const thresholdDate = new Date ( Date . now ( ) - this . settings . reindexIntervalMs ) ;
const repos = await this . db . repo . findMany ( {
where : {
repoIndexingStatus : {
2025-02-25 23:49:09 +00:00
in : [
RepoIndexingStatus . NEW ,
RepoIndexingStatus . INDEXED
]
2025-01-29 17:05:20 +00:00
} ,
OR : [
{ indexedAt : null } ,
{ indexedAt : { lt : thresholdDate } } ,
]
} ,
include : {
connections : {
include : {
connection : true
}
}
}
} ) ;
2025-02-19 18:26:13 +00:00
if ( repos . length > 0 ) {
await this . scheduleRepoIndexingBulk ( repos ) ;
}
2025-01-29 17:05:20 +00:00
}
// TODO: do this better? ex: try using the tokens from all the connections
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing.
private async getTokenForRepo ( repo : RepoWithConnections , db : PrismaClient ) {
const repoConnections = repo . connections ;
if ( repoConnections . length === 0 ) {
this . logger . error ( ` Repo ${ repo . id } has no connections ` ) ;
return ;
}
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
let token : string | undefined ;
for ( const repoConnection of repoConnections ) {
const connection = repoConnection . connection ;
2025-02-14 18:58:53 +00:00
if ( connection . connectionType !== 'github' && connection . connectionType !== 'gitlab' && connection . connectionType !== 'gitea' ) {
continue ;
}
const config = connection . config as unknown as GithubConnectionConfig | GitlabConnectionConfig | GiteaConnectionConfig ;
2025-01-29 17:05:20 +00:00
if ( config . token ) {
2025-02-20 02:10:22 +00:00
const tokenResult = await getTokenFromConfig ( config . token , connection . orgId , db ) ;
token = tokenResult ? . token ;
2025-01-29 17:05:20 +00:00
if ( token ) {
break ;
}
}
}
return token ;
}
2025-02-26 20:46:24 +00:00
private async syncGitRepository ( repo : RepoWithConnections , repoAlreadyInIndexingState : boolean ) {
2025-01-29 17:05:20 +00:00
let fetchDuration_s : number | undefined = undefined ;
let cloneDuration_s : number | undefined = undefined ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
const repoPath = getRepoPath ( repo , this . ctx ) ;
2025-02-27 17:39:55 +00:00
const metadata = repo . metadata as RepoMetadata ;
2025-02-25 23:49:09 +00:00
2025-02-27 21:11:26 +00:00
2025-02-26 20:46:24 +00:00
// If the repo was already in the indexing state, this job was likely killed and picked up again. As a result,
// to ensure the repo state is valid, we delete the repo if it exists so we get a fresh clone
if ( repoAlreadyInIndexingState && existsSync ( repoPath ) ) {
this . logger . info ( ` Deleting repo directory ${ repoPath } during sync because it was already in the indexing state ` ) ;
2025-02-27 21:11:26 +00:00
await promises . rm ( repoPath , { recursive : true , force : true } ) ;
2025-02-26 20:46:24 +00:00
}
2025-01-29 17:05:20 +00:00
if ( existsSync ( repoPath ) ) {
this . logger . info ( ` Fetching ${ repo . id } ... ` ) ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
const { durationMs } = await measure ( ( ) = > fetchRepository ( repoPath , ( { method , stage , progress } ) = > {
2025-02-16 00:37:50 +00:00
//this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
2025-01-29 17:05:20 +00:00
} ) ) ;
fetchDuration_s = durationMs / 1000 ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
process . stdout . write ( '\n' ) ;
this . logger . info ( ` Fetched ${ repo . name } in ${ fetchDuration_s } s ` ) ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
} else {
this . logger . info ( ` Cloning ${ repo . id } ... ` ) ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
const token = await this . getTokenForRepo ( repo , this . db ) ;
2025-03-02 06:57:07 +00:00
const cloneUrl = new URL ( repo . cloneUrl ) ;
2025-01-29 17:05:20 +00:00
if ( token ) {
2025-03-02 06:57:07 +00:00
switch ( repo . external_codeHostType ) {
case 'gitlab' :
cloneUrl . username = 'oauth2' ;
cloneUrl . password = token ;
break ;
case 'gitea' :
case 'github' :
default :
cloneUrl . username = token ;
break ;
}
2025-01-29 17:05:20 +00:00
}
2025-02-25 23:49:09 +00:00
2025-03-02 06:57:07 +00:00
const { durationMs } = await measure ( ( ) = > cloneRepository ( cloneUrl . toString ( ) , repoPath , metadata . gitConfig , ( { method , stage , progress } ) = > {
2025-02-16 00:37:50 +00:00
//this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
2025-01-29 17:05:20 +00:00
} ) ) ;
cloneDuration_s = durationMs / 1000 ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
process . stdout . write ( '\n' ) ;
this . logger . info ( ` Cloned ${ repo . id } in ${ cloneDuration_s } s ` ) ;
}
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
this . logger . info ( ` Indexing ${ repo . id } ... ` ) ;
const { durationMs } = await measure ( ( ) = > indexGitRepository ( repo , this . ctx ) ) ;
const indexDuration_s = durationMs / 1000 ;
this . logger . info ( ` Indexed ${ repo . id } in ${ indexDuration_s } s ` ) ;
2025-02-25 23:49:09 +00:00
2025-01-29 17:05:20 +00:00
return {
fetchDuration_s ,
cloneDuration_s ,
indexDuration_s ,
}
}
2025-02-25 23:49:09 +00:00
private async runIndexJob ( job : Job < RepoIndexingPayload > ) {
2025-02-16 00:37:50 +00:00
this . logger . info ( ` Running index job (id: ${ job . id } ) for repo ${ job . data . repo . id } ` ) ;
2025-01-29 17:05:20 +00:00
const repo = job . data . repo as RepoWithConnections ;
2025-02-26 20:46:24 +00:00
// We have to use the existing repo object to get the repoIndexingStatus because the repo object
// inside the job is unchanged from when it was added to the queue.
const existingRepo = await this . db . repo . findUnique ( {
where : {
id : repo.id ,
} ,
} ) ;
if ( ! existingRepo ) {
this . logger . error ( ` Repo ${ repo . id } not found ` ) ;
2025-03-02 03:21:17 +00:00
const e = new Error ( ` Repo ${ repo . id } not found ` ) ;
Sentry . captureException ( e ) ;
throw e ;
2025-02-26 20:46:24 +00:00
}
const repoAlreadyInIndexingState = existingRepo . repoIndexingStatus === RepoIndexingStatus . INDEXING ;
2025-01-29 17:05:20 +00:00
await this . db . repo . update ( {
where : {
id : repo.id ,
} ,
data : {
repoIndexingStatus : RepoIndexingStatus.INDEXING ,
}
} ) ;
2025-02-25 05:11:28 +00:00
this . promClient . activeRepoIndexingJobs . inc ( ) ;
2025-03-01 23:50:09 +00:00
this . promClient . pendingRepoIndexingJobs . dec ( { repo : repo.id.toString ( ) } ) ;
2025-01-29 17:05:20 +00:00
let indexDuration_s : number | undefined ;
let fetchDuration_s : number | undefined ;
let cloneDuration_s : number | undefined ;
2025-02-16 00:37:50 +00:00
let stats ;
let attempts = 0 ;
const maxAttempts = 3 ;
2025-02-25 23:49:09 +00:00
2025-02-16 00:37:50 +00:00
while ( attempts < maxAttempts ) {
try {
2025-02-26 20:46:24 +00:00
stats = await this . syncGitRepository ( repo , repoAlreadyInIndexingState ) ;
2025-02-16 00:37:50 +00:00
break ;
} catch ( error ) {
2025-03-02 03:21:17 +00:00
Sentry . captureException ( error ) ;
2025-02-16 00:37:50 +00:00
attempts ++ ;
2025-02-26 02:53:56 +00:00
this . promClient . repoIndexingReattemptsTotal . inc ( ) ;
2025-02-16 00:37:50 +00:00
if ( attempts === maxAttempts ) {
this . logger . error ( ` Failed to sync repository ${ repo . id } after ${ maxAttempts } attempts. Error: ${ error } ` ) ;
throw error ;
}
const sleepDuration = 5000 * Math . pow ( 2 , attempts - 1 ) ;
this . logger . error ( ` Failed to sync repository ${ repo . id } , attempt ${ attempts } / ${ maxAttempts } . Sleeping for ${ sleepDuration / 1000 } s... Error: ${ error } ` ) ;
await new Promise ( resolve = > setTimeout ( resolve , sleepDuration ) ) ;
}
}
indexDuration_s = stats ! . indexDuration_s ;
fetchDuration_s = stats ! . fetchDuration_s ;
cloneDuration_s = stats ! . cloneDuration_s ;
2025-01-29 17:05:20 +00:00
}
2025-02-25 23:49:09 +00:00
private async onIndexJobCompleted ( job : Job < RepoIndexingPayload > ) {
2025-01-29 17:05:20 +00:00
this . logger . info ( ` Repo index job ${ job . id } completed ` ) ;
2025-02-25 05:11:28 +00:00
this . promClient . activeRepoIndexingJobs . dec ( ) ;
2025-02-25 23:49:09 +00:00
this . promClient . repoIndexingSuccessTotal . inc ( ) ;
2025-01-29 17:05:20 +00:00
await this . db . repo . update ( {
where : {
id : job.data.repo.id ,
} ,
data : {
indexedAt : new Date ( ) ,
repoIndexingStatus : RepoIndexingStatus.INDEXED ,
}
} ) ;
}
2025-02-25 23:49:09 +00:00
private async onIndexJobFailed ( job : Job < RepoIndexingPayload > | undefined , err : unknown ) {
2025-02-28 00:04:31 +00:00
this . logger . info ( ` Repo index job failed (id: ${ job ? . id ? ? 'unknown' } ) ` ) ;
2025-01-29 17:05:20 +00:00
if ( job ) {
2025-02-25 05:11:28 +00:00
this . promClient . activeRepoIndexingJobs . dec ( ) ;
2025-02-25 23:49:09 +00:00
this . promClient . repoIndexingFailTotal . inc ( ) ;
2025-02-25 05:11:28 +00:00
2025-01-29 17:05:20 +00:00
await this . db . repo . update ( {
where : {
id : job.data.repo.id ,
} ,
data : {
repoIndexingStatus : RepoIndexingStatus.FAILED ,
2025-02-28 00:04:31 +00:00
indexedAt : new Date ( ) ,
2025-01-29 17:05:20 +00:00
}
} )
}
}
2025-02-25 23:49:09 +00:00
///////////////////////////
// Repo garbage collection
///////////////////////////
private async scheduleRepoGarbageCollectionBulk ( repos : Repo [ ] ) {
await this . db . $transaction ( async ( tx ) = > {
await tx . repo . updateMany ( {
where : { id : { in : repos . map ( repo = > repo . id ) } } ,
data : { repoIndexingStatus : RepoIndexingStatus.IN_GC_QUEUE }
} ) ;
await this . gcQueue . addBulk ( repos . map ( repo = > ( {
name : 'repoGarbageCollectionJob' ,
data : { repo } ,
} ) ) ) ;
this . logger . info ( ` Added ${ repos . length } jobs to gcQueue ` ) ;
} ) ;
}
private async fetchAndScheduleRepoGarbageCollection() {
////////////////////////////////////
// Get repos with no connections
////////////////////////////////////
const thresholdDate = new Date ( Date . now ( ) - this . settings . gcGracePeriodMs ) ;
const reposWithNoConnections = await this . db . repo . findMany ( {
where : {
repoIndexingStatus : {
in : [
RepoIndexingStatus . INDEXED , // we don't include NEW repos here because they'll be picked up by the index queue (potential race condition)
RepoIndexingStatus . FAILED ,
]
} ,
connections : {
none : { }
} ,
OR : [
{ indexedAt : null } ,
{ indexedAt : { lt : thresholdDate } }
]
} ,
} ) ;
if ( reposWithNoConnections . length > 0 ) {
this . logger . info ( ` Garbage collecting ${ reposWithNoConnections . length } repos with no connections: ${ reposWithNoConnections . map ( repo = > repo . id ) . join ( ', ' ) } ` ) ;
}
////////////////////////////////////
// Get inactive org repos
////////////////////////////////////
const sevenDaysAgo = new Date ( Date . now ( ) - 7 * 24 * 60 * 60 * 1000 ) ;
const inactiveOrgRepos = await this . db . repo . findMany ( {
where : {
org : {
stripeSubscriptionStatus : StripeSubscriptionStatus.INACTIVE ,
stripeLastUpdatedAt : {
lt : sevenDaysAgo
}
} ,
OR : [
{ indexedAt : null } ,
{ indexedAt : { lt : thresholdDate } }
]
}
} ) ;
if ( inactiveOrgRepos . length > 0 ) {
this . logger . info ( ` Garbage collecting ${ inactiveOrgRepos . length } inactive org repos: ${ inactiveOrgRepos . map ( repo = > repo . id ) . join ( ', ' ) } ` ) ;
}
const reposToDelete = [ . . . reposWithNoConnections , . . . inactiveOrgRepos ] ;
if ( reposToDelete . length > 0 ) {
await this . scheduleRepoGarbageCollectionBulk ( reposToDelete ) ;
}
}
private async runGarbageCollectionJob ( job : Job < RepoGarbageCollectionPayload > ) {
this . logger . info ( ` Running garbage collection job (id: ${ job . id } ) for repo ${ job . data . repo . id } ` ) ;
this . promClient . activeRepoGarbageCollectionJobs . inc ( ) ;
const repo = job . data . repo as Repo ;
await this . db . repo . update ( {
where : {
id : repo.id
} ,
data : {
repoIndexingStatus : RepoIndexingStatus.GARBAGE_COLLECTING
}
} ) ;
// delete cloned repo
const repoPath = getRepoPath ( repo , this . ctx ) ;
if ( existsSync ( repoPath ) ) {
this . logger . info ( ` Deleting repo directory ${ repoPath } ` ) ;
2025-02-27 21:11:26 +00:00
await promises . rm ( repoPath , { recursive : true , force : true } ) ;
2025-02-25 23:49:09 +00:00
}
// delete shards
const shardPrefix = getShardPrefix ( repo . orgId , repo . id ) ;
const files = readdirSync ( this . ctx . indexPath ) . filter ( file = > file . startsWith ( shardPrefix ) ) ;
for ( const file of files ) {
const filePath = ` ${ this . ctx . indexPath } / ${ file } ` ;
this . logger . info ( ` Deleting shard file ${ filePath } ` ) ;
2025-02-27 21:11:26 +00:00
await promises . rm ( filePath , { force : true } ) ;
2025-02-25 23:49:09 +00:00
}
}
private async onGarbageCollectionJobCompleted ( job : Job < RepoGarbageCollectionPayload > ) {
this . logger . info ( ` Garbage collection job ${ job . id } completed ` ) ;
this . promClient . activeRepoGarbageCollectionJobs . dec ( ) ;
this . promClient . repoGarbageCollectionSuccessTotal . inc ( ) ;
await this . db . repo . delete ( {
where : {
id : job.data.repo.id
}
} ) ;
}
private async onGarbageCollectionJobFailed ( job : Job < RepoGarbageCollectionPayload > | undefined , err : unknown ) {
this . logger . info ( ` Garbage collection job failed (id: ${ job ? . id ? ? 'unknown' } ) with error: ${ err } ` ) ;
if ( job ) {
this . promClient . activeRepoGarbageCollectionJobs . dec ( ) ;
this . promClient . repoGarbageCollectionFailTotal . inc ( ) ;
await this . db . repo . update ( {
where : {
id : job.data.repo.id
} ,
data : {
repoIndexingStatus : RepoIndexingStatus.GARBAGE_COLLECTION_FAILED
}
} ) ;
}
}
2025-03-03 18:31:56 +00:00
private async fetchAndScheduleRepoTimeouts() {
const repos = await this . db . repo . findMany ( {
where : {
repoIndexingStatus : RepoIndexingStatus.INDEXING ,
updatedAt : {
lt : new Date ( Date . now ( ) - this . settings . repoIndexTimeoutMs )
}
}
} ) ;
if ( repos . length > 0 ) {
this . logger . info ( ` Scheduling ${ repos . length } repo timeouts ` ) ;
await this . scheduleRepoTimeoutsBulk ( repos ) ;
}
}
private async scheduleRepoTimeoutsBulk ( repos : Repo [ ] ) {
await this . db . $transaction ( async ( tx ) = > {
await tx . repo . updateMany ( {
where : { id : { in : repos . map ( repo = > repo . id ) } } ,
data : { repoIndexingStatus : RepoIndexingStatus.FAILED }
} ) ;
} ) ;
}
2025-01-29 17:05:20 +00:00
public async dispose() {
2025-02-25 23:49:09 +00:00
this . indexWorker . close ( ) ;
this . indexQueue . close ( ) ;
this . gcQueue . close ( ) ;
this . gcWorker . close ( ) ;
2025-01-29 17:05:20 +00:00
}
}