2025-10-18 23:31:22 +00:00
import * as Sentry from '@sentry/node' ;
import { PrismaClient , Repo , RepoIndexingJobStatus , RepoIndexingJobType } from "@sourcebot/db" ;
2025-11-05 05:22:31 +00:00
import { createLogger , Logger } from "@sourcebot/shared" ;
import { env , RepoIndexingJobMetadata , repoIndexingJobMetadataSchema , RepoMetadata , repoMetadataSchema } from '@sourcebot/shared' ;
2025-10-18 23:31:22 +00:00
import { existsSync } from 'fs' ;
import { readdir , rm } from 'fs/promises' ;
import { Job , Queue , ReservedJob , Worker } from "groupmq" ;
import { Redis } from 'ioredis' ;
2025-10-25 18:51:41 +00:00
import micromatch from 'micromatch' ;
2025-11-12 04:11:59 +00:00
import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS , INDEX_CACHE_DIR } from './constants.js' ;
2025-10-25 18:51:41 +00:00
import { cloneRepository , fetchRepository , getBranches , getCommitHashForRefName , getTags , isPathAValidGitRepoRoot , unsetGitConfig , upsertGitConfig } from './git.js' ;
import { captureEvent } from './posthog.js' ;
2025-10-22 03:43:33 +00:00
import { PromClient } from './promClient.js' ;
2025-10-25 18:51:41 +00:00
import { RepoWithConnections , Settings } from "./types.js" ;
2025-11-09 22:49:24 +00:00
import { getAuthCredentialsForRepo , getRepoPath , getShardPrefix , groupmqLifecycleExceptionWrapper , measure , setIntervalAsync } from './utils.js' ;
2025-10-18 23:31:22 +00:00
import { indexGitRepository } from './zoekt.js' ;
const LOG_TAG = 'repo-index-manager' ;
const logger = createLogger ( LOG_TAG ) ;
const createJobLogger = ( jobId : string ) = > createLogger ( ` ${ LOG_TAG } :job: ${ jobId } ` ) ;
type JobPayload = {
type : 'INDEX' | 'CLEANUP' ;
jobId : string ;
repoId : number ;
repoName : string ;
} ;
/ * *
* Manages the lifecycle of repository data on disk , including git working copies
* and search index shards . Handles both indexing operations ( cloning / fetching repos
* and building search indexes ) and cleanup operations ( removing orphaned repos and
* their associated data ) .
*
* Uses a job queue system to process indexing and cleanup tasks asynchronously ,
* with configurable concurrency limits and retry logic . Automatically schedules
* re - indexing of repos based on configured intervals and manages garbage collection
* of repos that are no longer connected to any source .
* /
export class RepoIndexManager {
private interval? : NodeJS.Timeout ;
private queue : Queue < JobPayload > ;
private worker : Worker < JobPayload > ;
constructor (
private db : PrismaClient ,
private settings : Settings ,
2025-11-12 04:11:59 +00:00
private redis : Redis ,
2025-10-22 03:43:33 +00:00
private promClient : PromClient ,
2025-10-18 23:31:22 +00:00
) {
this . queue = new Queue < JobPayload > ( {
redis ,
namespace : 'repo-index-queue' ,
2025-10-20 19:41:09 +00:00
jobTimeoutMs : this.settings.repoIndexTimeoutMs ,
2025-10-18 23:31:22 +00:00
maxAttempts : 3 ,
logger : env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true' ,
} ) ;
this . worker = new Worker < JobPayload > ( {
queue : this.queue ,
maxStalledCount : 1 ,
handler : this.runJob.bind ( this ) ,
concurrency : this.settings.maxRepoIndexingJobConcurrency ,
. . . ( env . DEBUG_ENABLE_GROUPMQ_LOGGING === 'true' ? {
logger : true ,
2025-10-25 18:51:41 +00:00
} : { } ) ,
2025-10-18 23:31:22 +00:00
} ) ;
this . worker . on ( 'completed' , this . onJobCompleted . bind ( this ) ) ;
this . worker . on ( 'failed' , this . onJobFailed . bind ( this ) ) ;
this . worker . on ( 'stalled' , this . onJobStalled . bind ( this ) ) ;
this . worker . on ( 'error' , this . onWorkerError . bind ( this ) ) ;
2025-11-12 04:11:59 +00:00
// graceful-timeout is triggered when a job is still processing after
// worker.close() is called and the timeout period has elapsed. In this case,
// we fail the job with no retry.
this . worker . on ( 'graceful-timeout' , this . onJobGracefulTimeout . bind ( this ) ) ;
2025-10-18 23:31:22 +00:00
}
2025-11-09 22:49:24 +00:00
public startScheduler() {
2025-10-18 23:31:22 +00:00
logger . debug ( 'Starting scheduler' ) ;
2025-11-09 22:49:24 +00:00
this . interval = setIntervalAsync ( async ( ) = > {
2025-10-18 23:31:22 +00:00
await this . scheduleIndexJobs ( ) ;
await this . scheduleCleanupJobs ( ) ;
2025-10-22 03:43:33 +00:00
} , this . settings . reindexRepoPollingIntervalMs ) ;
2025-10-18 23:31:22 +00:00
this . worker . run ( ) ;
}
private async scheduleIndexJobs() {
const thresholdDate = new Date ( Date . now ( ) - this . settings . reindexIntervalMs ) ;
2025-10-20 19:41:09 +00:00
const timeoutDate = new Date ( Date . now ( ) - this . settings . repoIndexTimeoutMs ) ;
2025-10-18 23:31:22 +00:00
const reposToIndex = await this . db . repo . findMany ( {
where : {
AND : [
{
OR : [
{ indexedAt : null } ,
{ indexedAt : { lt : thresholdDate } } ,
]
} ,
{
NOT : {
jobs : {
some : {
AND : [
{
type : RepoIndexingJobType . INDEX ,
} ,
{
OR : [
// Don't schedule if there are active jobs that were created within the threshold date.
// This handles the case where a job is stuck in a pending state and will never be scheduled.
{
AND : [
{
status : {
in : [
RepoIndexingJobStatus . PENDING ,
RepoIndexingJobStatus . IN_PROGRESS ,
]
} ,
} ,
{
createdAt : {
2025-10-20 19:41:09 +00:00
gt : timeoutDate ,
2025-10-18 23:31:22 +00:00
}
}
]
} ,
// Don't schedule if there are recent failed jobs (within the threshold date).
{
AND : [
{ status : RepoIndexingJobStatus.FAILED } ,
2025-10-25 18:51:41 +00:00
{ completedAt : { gt : thresholdDate } } ,
2025-10-18 23:31:22 +00:00
]
}
]
}
]
}
}
}
}
] ,
2025-10-22 03:43:33 +00:00
} ,
2025-10-18 23:31:22 +00:00
} ) ;
if ( reposToIndex . length > 0 ) {
await this . createJobs ( reposToIndex , RepoIndexingJobType . INDEX ) ;
}
}
private async scheduleCleanupJobs() {
2025-10-29 04:31:28 +00:00
const gcGracePeriodMs = new Date ( Date . now ( ) - this . settings . repoGarbageCollectionGracePeriodMs ) ;
const timeoutDate = new Date ( Date . now ( ) - this . settings . repoIndexTimeoutMs ) ;
2025-10-18 23:31:22 +00:00
const reposToCleanup = await this . db . repo . findMany ( {
where : {
connections : {
none : { }
} ,
OR : [
{ indexedAt : null } ,
2025-10-29 04:31:28 +00:00
{ indexedAt : { lt : gcGracePeriodMs } } ,
2025-10-18 23:31:22 +00:00
] ,
NOT : {
jobs : {
some : {
AND : [
{
type : RepoIndexingJobType . CLEANUP ,
} ,
{
status : {
in : [
RepoIndexingJobStatus . PENDING ,
RepoIndexingJobStatus . IN_PROGRESS ,
]
} ,
} ,
{
createdAt : {
2025-10-29 04:31:28 +00:00
gt : timeoutDate ,
2025-10-18 23:31:22 +00:00
}
}
]
}
}
}
}
} ) ;
if ( reposToCleanup . length > 0 ) {
await this . createJobs ( reposToCleanup , RepoIndexingJobType . CLEANUP ) ;
}
}
2025-11-11 23:16:40 +00:00
public async createJobs ( repos : Repo [ ] , type : RepoIndexingJobType ) {
2025-10-18 23:31:22 +00:00
// @note: we don't perform this in a transaction because
// we want to avoid the situation where a job is created and run
// prior to the transaction being committed.
const jobs = await this . db . repoIndexingJob . createManyAndReturn ( {
data : repos.map ( repo = > ( {
type ,
repoId : repo.id ,
} ) ) ,
include : {
repo : true ,
}
} ) ;
for ( const job of jobs ) {
await this . queue . add ( {
groupId : ` repo: ${ job . repoId } ` ,
data : {
jobId : job.id ,
type ,
repoName : job.repo.name ,
repoId : job.repo.id ,
} ,
jobId : job.id ,
} ) ;
2025-10-22 03:43:33 +00:00
const jobTypeLabel = getJobTypePrometheusLabel ( type ) ;
this . promClient . pendingRepoIndexJobs . inc ( { repo : job.repo.name , type : jobTypeLabel } ) ;
2025-10-18 23:31:22 +00:00
}
2025-11-11 23:16:40 +00:00
return jobs . map ( job = > job . id ) ;
2025-10-18 23:31:22 +00:00
}
private async runJob ( job : ReservedJob < JobPayload > ) {
const id = job . data . jobId ;
const logger = createJobLogger ( id ) ;
logger . info ( ` Running ${ job . data . type } job ${ id } for repo ${ job . data . repoName } (id: ${ job . data . repoId } ) (attempt ${ job . attempts + 1 } / ${ job . maxAttempts } ) ` ) ;
2025-11-12 04:11:59 +00:00
const currentStatus = await this . db . repoIndexingJob . findUniqueOrThrow ( {
where : {
id ,
} ,
select : {
status : true ,
}
} ) ;
// Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
// is in an invalid state and should be skipped.
if (
currentStatus . status !== RepoIndexingJobStatus . PENDING &&
currentStatus . status !== RepoIndexingJobStatus . IN_PROGRESS
) {
throw new Error ( ` Job ${ id } is not in a valid state. Expected: ${ RepoIndexingJobStatus . PENDING } or ${ RepoIndexingJobStatus . IN_PROGRESS } . Actual: ${ currentStatus . status } . Skipping. ` ) ;
}
2025-10-18 23:31:22 +00:00
const { repo , type : jobType } = await this . db . repoIndexingJob . update ( {
where : {
id ,
} ,
data : {
status : RepoIndexingJobStatus.IN_PROGRESS ,
} ,
select : {
type : true ,
repo : {
include : {
connections : {
include : {
connection : true ,
}
}
}
}
}
} ) ;
2025-10-22 03:43:33 +00:00
const jobTypeLabel = getJobTypePrometheusLabel ( jobType ) ;
this . promClient . pendingRepoIndexJobs . dec ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
this . promClient . activeRepoIndexJobs . inc ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
2025-10-18 23:31:22 +00:00
const abortController = new AbortController ( ) ;
const signalHandler = ( ) = > {
logger . info ( ` Received shutdown signal, aborting... ` ) ;
abortController . abort ( ) ; // This cancels all operations
} ;
process . on ( 'SIGTERM' , signalHandler ) ;
process . on ( 'SIGINT' , signalHandler ) ;
try {
if ( jobType === RepoIndexingJobType . INDEX ) {
2025-10-25 18:51:41 +00:00
const revisions = await this . indexRepository ( repo , logger , abortController . signal ) ;
await this . db . repoIndexingJob . update ( {
where : { id } ,
data : {
metadata : {
indexedRevisions : revisions ,
} satisfies RepoIndexingJobMetadata ,
} ,
} ) ;
2025-10-18 23:31:22 +00:00
} else if ( jobType === RepoIndexingJobType . CLEANUP ) {
await this . cleanupRepository ( repo , logger ) ;
}
} finally {
process . off ( 'SIGTERM' , signalHandler ) ;
process . off ( 'SIGINT' , signalHandler ) ;
}
}
private async indexRepository ( repo : RepoWithConnections , logger : Logger , signal : AbortSignal ) {
const { path : repoPath , isReadOnly } = getRepoPath ( repo ) ;
const metadata = repoMetadataSchema . parse ( repo . metadata ) ;
2025-10-31 21:33:28 +00:00
const credentials = await getAuthCredentialsForRepo ( repo ) ;
2025-10-18 23:31:22 +00:00
const cloneUrlMaybeWithToken = credentials ? . cloneUrlWithToken ? ? repo . cloneUrl ;
const authHeader = credentials ? . authHeader ? ? undefined ;
// If the repo path exists but it is not a valid git repository root, this indicates
// that the repository is in a bad state. To fix, we remove the directory and perform
// a fresh clone.
2025-10-25 18:51:41 +00:00
if ( existsSync ( repoPath ) && ! ( await isPathAValidGitRepoRoot ( { path : repoPath } ) ) ) {
2025-10-18 23:31:22 +00:00
const isValidGitRepo = await isPathAValidGitRepoRoot ( {
path : repoPath ,
signal ,
} ) ;
if ( ! isValidGitRepo && ! isReadOnly ) {
logger . warn ( ` ${ repoPath } is not a valid git repository root. Deleting directory and performing fresh clone. ` ) ;
await rm ( repoPath , { recursive : true , force : true } ) ;
}
}
if ( existsSync ( repoPath ) && ! isReadOnly ) {
// @NOTE: in #483, we changed the cloning method s.t., we _no longer_
// write the clone URL (which could contain a auth token) to the
// `remote.origin.url` entry. For the upgrade scenario, we want
// to unset this key since it is no longer needed, hence this line.
// This will no-op if the key is already unset.
// @see: https://github.com/sourcebot-dev/sourcebot/pull/483
await unsetGitConfig ( {
path : repoPath ,
keys : [ "remote.origin.url" ] ,
signal ,
} ) ;
logger . info ( ` Fetching ${ repo . name } (id: ${ repo . id } )... ` ) ;
const { durationMs } = await measure ( ( ) = > fetchRepository ( {
cloneUrl : cloneUrlMaybeWithToken ,
authHeader ,
path : repoPath ,
onProgress : ( { method , stage , progress } ) = > {
logger . debug ( ` git. ${ method } ${ stage } stage ${ progress } % complete for ${ repo . name } (id: ${ repo . id } ) ` )
} ,
signal ,
} ) ) ;
const fetchDuration_s = durationMs / 1000 ;
process . stdout . write ( '\n' ) ;
logger . info ( ` Fetched ${ repo . name } (id: ${ repo . id } ) in ${ fetchDuration_s } s ` ) ;
} else if ( ! isReadOnly ) {
logger . info ( ` Cloning ${ repo . name } (id: ${ repo . id } )... ` ) ;
const { durationMs } = await measure ( ( ) = > cloneRepository ( {
cloneUrl : cloneUrlMaybeWithToken ,
authHeader ,
path : repoPath ,
onProgress : ( { method , stage , progress } ) = > {
logger . debug ( ` git. ${ method } ${ stage } stage ${ progress } % complete for ${ repo . name } (id: ${ repo . id } ) ` )
} ,
signal
} ) ) ;
const cloneDuration_s = durationMs / 1000 ;
process . stdout . write ( '\n' ) ;
logger . info ( ` Cloned ${ repo . name } (id: ${ repo . id } ) in ${ cloneDuration_s } s ` ) ;
}
// Regardless of clone or fetch, always upsert the git config for the repo.
// This ensures that the git config is always up to date for whatever we
// have in the DB.
if ( metadata . gitConfig && ! isReadOnly ) {
await upsertGitConfig ( {
path : repoPath ,
gitConfig : metadata.gitConfig ,
signal ,
} ) ;
}
2025-10-25 18:51:41 +00:00
let revisions = [
'HEAD'
] ;
if ( metadata . branches ) {
const branchGlobs = metadata . branches
const allBranches = await getBranches ( repoPath ) ;
const matchingBranches =
allBranches
. filter ( ( branch ) = > micromatch . isMatch ( branch , branchGlobs ) )
. map ( ( branch ) = > ` refs/heads/ ${ branch } ` ) ;
revisions = [
. . . revisions ,
. . . matchingBranches
] ;
}
if ( metadata . tags ) {
const tagGlobs = metadata . tags ;
const allTags = await getTags ( repoPath ) ;
const matchingTags =
allTags
. filter ( ( tag ) = > micromatch . isMatch ( tag , tagGlobs ) )
. map ( ( tag ) = > ` refs/tags/ ${ tag } ` ) ;
revisions = [
. . . revisions ,
. . . matchingTags
] ;
}
// zoekt has a limit of 64 branches/tags to index.
if ( revisions . length > 64 ) {
logger . warn ( ` Too many revisions ( ${ revisions . length } ) for repo ${ repo . id } , truncating to 64 ` ) ;
captureEvent ( 'backend_revisions_truncated' , {
repoId : repo.id ,
revisionCount : revisions.length ,
} ) ;
revisions = revisions . slice ( 0 , 64 ) ;
}
2025-10-18 23:31:22 +00:00
logger . info ( ` Indexing ${ repo . name } (id: ${ repo . id } )... ` ) ;
2025-10-25 18:51:41 +00:00
const { durationMs } = await measure ( ( ) = > indexGitRepository ( repo , this . settings , revisions , signal ) ) ;
2025-10-18 23:31:22 +00:00
const indexDuration_s = durationMs / 1000 ;
logger . info ( ` Indexed ${ repo . name } (id: ${ repo . id } ) in ${ indexDuration_s } s ` ) ;
2025-10-25 18:51:41 +00:00
return revisions ;
2025-10-18 23:31:22 +00:00
}
private async cleanupRepository ( repo : Repo , logger : Logger ) {
const { path : repoPath , isReadOnly } = getRepoPath ( repo ) ;
if ( existsSync ( repoPath ) && ! isReadOnly ) {
logger . info ( ` Deleting repo directory ${ repoPath } ` ) ;
await rm ( repoPath , { recursive : true , force : true } ) ;
}
const shardPrefix = getShardPrefix ( repo . orgId , repo . id ) ;
const files = ( await readdir ( INDEX_CACHE_DIR ) ) . filter ( file = > file . startsWith ( shardPrefix ) ) ;
for ( const file of files ) {
const filePath = ` ${ INDEX_CACHE_DIR } / ${ file } ` ;
logger . info ( ` Deleting shard file ${ filePath } ` ) ;
await rm ( filePath , { force : true } ) ;
}
}
private onJobCompleted = async ( job : Job < JobPayload > ) = >
groupmqLifecycleExceptionWrapper ( 'onJobCompleted' , logger , async ( ) = > {
const logger = createJobLogger ( job . data . jobId ) ;
const jobData = await this . db . repoIndexingJob . update ( {
where : { id : job.data.jobId } ,
data : {
status : RepoIndexingJobStatus.COMPLETED ,
completedAt : new Date ( ) ,
2025-10-25 18:51:41 +00:00
} ,
include : {
repo : true ,
2025-10-18 23:31:22 +00:00
}
} ) ;
2025-10-22 03:43:33 +00:00
const jobTypeLabel = getJobTypePrometheusLabel ( jobData . type ) ;
2025-10-18 23:31:22 +00:00
if ( jobData . type === RepoIndexingJobType . INDEX ) {
2025-10-25 18:51:41 +00:00
const { path : repoPath } = getRepoPath ( jobData . repo ) ;
const commitHash = await getCommitHashForRefName ( {
path : repoPath ,
refName : 'HEAD' ,
} ) ;
const jobMetadata = repoIndexingJobMetadataSchema . parse ( jobData . metadata ) ;
2025-10-18 23:31:22 +00:00
const repo = await this . db . repo . update ( {
where : { id : jobData.repoId } ,
data : {
indexedAt : new Date ( ) ,
2025-10-25 18:51:41 +00:00
indexedCommitHash : commitHash ,
metadata : {
. . . ( jobData . repo . metadata as RepoMetadata ) ,
indexedRevisions : jobMetadata.indexedRevisions ,
} satisfies RepoMetadata ,
2025-10-18 23:31:22 +00:00
}
} ) ;
logger . info ( ` Completed index job ${ job . data . jobId } for repo ${ repo . name } (id: ${ repo . id } ) ` ) ;
}
else if ( jobData . type === RepoIndexingJobType . CLEANUP ) {
const repo = await this . db . repo . delete ( {
where : { id : jobData.repoId } ,
} ) ;
logger . info ( ` Completed cleanup job ${ job . data . jobId } for repo ${ repo . name } (id: ${ repo . id } ) ` ) ;
}
2025-10-22 03:43:33 +00:00
// Track metrics for successful job
this . promClient . activeRepoIndexJobs . dec ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
this . promClient . repoIndexJobSuccessTotal . inc ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
2025-10-18 23:31:22 +00:00
} ) ;
private onJobFailed = async ( job : Job < JobPayload > ) = >
groupmqLifecycleExceptionWrapper ( 'onJobFailed' , logger , async ( ) = > {
const logger = createJobLogger ( job . data . jobId ) ;
const attempt = job . attemptsMade + 1 ;
const wasLastAttempt = attempt >= job . opts . attempts ;
2025-10-22 03:43:33 +00:00
const jobTypeLabel = getJobTypePrometheusLabel ( job . data . type ) ;
2025-10-18 23:31:22 +00:00
if ( wasLastAttempt ) {
const { repo } = await this . db . repoIndexingJob . update ( {
where : { id : job.data.jobId } ,
data : {
status : RepoIndexingJobStatus.FAILED ,
completedAt : new Date ( ) ,
errorMessage : job.failedReason ,
} ,
select : { repo : true }
} ) ;
2025-10-22 03:43:33 +00:00
this . promClient . activeRepoIndexJobs . dec ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
this . promClient . repoIndexJobFailTotal . inc ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
2025-10-18 23:31:22 +00:00
logger . error ( ` Failed job ${ job . data . jobId } for repo ${ repo . name } (id: ${ repo . id } ). Attempt ${ attempt } / ${ job . opts . attempts } . Failing job. ` ) ;
} else {
const repo = await this . db . repo . findUniqueOrThrow ( {
where : { id : job.data.repoId } ,
} ) ;
2025-10-22 03:43:33 +00:00
this . promClient . repoIndexJobReattemptsTotal . inc ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
2025-10-18 23:31:22 +00:00
logger . warn ( ` Failed job ${ job . data . jobId } for repo ${ repo . name } (id: ${ repo . id } ). Attempt ${ attempt } / ${ job . opts . attempts } . Retrying. ` ) ;
}
} ) ;
private onJobStalled = async ( jobId : string ) = >
groupmqLifecycleExceptionWrapper ( 'onJobStalled' , logger , async ( ) = > {
const logger = createJobLogger ( jobId ) ;
2025-10-22 03:43:33 +00:00
const { repo , type } = await this . db . repoIndexingJob . update ( {
2025-10-18 23:31:22 +00:00
where : { id : jobId } ,
data : {
status : RepoIndexingJobStatus.FAILED ,
completedAt : new Date ( ) ,
errorMessage : 'Job stalled' ,
} ,
2025-10-22 03:43:33 +00:00
select : { repo : true , type : true }
2025-10-18 23:31:22 +00:00
} ) ;
2025-10-22 03:43:33 +00:00
const jobTypeLabel = getJobTypePrometheusLabel ( type ) ;
this . promClient . activeRepoIndexJobs . dec ( { repo : repo.name , type : jobTypeLabel } ) ;
this . promClient . repoIndexJobFailTotal . inc ( { repo : repo.name , type : jobTypeLabel } ) ;
2025-10-18 23:31:22 +00:00
logger . error ( ` Job ${ jobId } stalled for repo ${ repo . name } (id: ${ repo . id } ) ` ) ;
} ) ;
2025-11-12 04:11:59 +00:00
private onJobGracefulTimeout = async ( job : Job < JobPayload > ) = >
groupmqLifecycleExceptionWrapper ( 'onJobGracefulTimeout' , logger , async ( ) = > {
const logger = createJobLogger ( job . data . jobId ) ;
const jobTypeLabel = getJobTypePrometheusLabel ( job . data . type ) ;
const { repo } = await this . db . repoIndexingJob . update ( {
where : { id : job.data.jobId } ,
data : {
status : RepoIndexingJobStatus.FAILED ,
completedAt : new Date ( ) ,
errorMessage : 'Job timed out' ,
} ,
select : { repo : true }
} ) ;
this . promClient . activeRepoIndexJobs . dec ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
this . promClient . repoIndexJobFailTotal . inc ( { repo : job.data.repoName , type : jobTypeLabel } ) ;
logger . error ( ` Job ${ job . data . jobId } timed out for repo ${ repo . name } (id: ${ repo . id } ). Failing job. ` ) ;
} ) ;
2025-10-18 23:31:22 +00:00
private async onWorkerError ( error : Error ) {
Sentry . captureException ( error ) ;
logger . error ( ` Index syncer worker error. ` , error ) ;
}
public async dispose() {
if ( this . interval ) {
clearInterval ( this . interval ) ;
}
2025-11-12 04:11:59 +00:00
const inProgressJobs = this . worker . getCurrentJobs ( ) ;
await this . worker . close ( GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS ) ;
// Manually release group locks for in progress jobs to prevent deadlocks.
// @see: https://github.com/Openpanel-dev/groupmq/issues/8
for ( const { job } of inProgressJobs ) {
const lockKey = ` groupmq:repo-index-queue:lock: ${ job . groupId } ` ;
logger . debug ( ` Releasing group lock ${ lockKey } for in progress job ${ job . id } ` ) ;
await this . redis . del ( lockKey ) ;
}
// @note: As of groupmq v1.0.0, queue.close() will just close the underlying
// redis connection. Since we share the same redis client between, skip this
// step and close the redis client directly in index.ts.
// await this.queue.close();
2025-10-18 23:31:22 +00:00
}
2025-10-22 03:43:33 +00:00
}
const getJobTypePrometheusLabel = ( type : RepoIndexingJobType ) = > type === RepoIndexingJobType . INDEX ? 'index' : 'cleanup' ;