mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 04:15:30 +00:00
chore(worker): Prometheus metrics for repo index manager (#571)
This commit is contained in:
parent
ef77e212a0
commit
a470ab8463
4 changed files with 65 additions and 69 deletions
|
|
@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
- Implement dynamic tab titles for files and folders in browse tab. [#560](https://github.com/sourcebot-dev/sourcebot/pull/560)
|
||||
- Added support for passing db connection url as seperate `DATABASE_HOST`, `DATABASE_USERNAME`, `DATABASE_PASSWORD`, `DATABASE_NAME`, and `DATABASE_ARGS` env vars. [#545](https://github.com/sourcebot-dev/sourcebot/pull/545)
|
||||
- Added support for GitHub Apps for service auth. [#570](https://github.com/sourcebot-dev/sourcebot/pull/570)
|
||||
- Added prometheus metrics for repo index manager. [#571](https://github.com/sourcebot-dev/sourcebot/pull/571)
|
||||
|
||||
### Fixed
|
||||
- Fixed "dubious ownership" errors when cloning / fetching repos. [#553](https://github.com/sourcebot-dev/sourcebot/pull/553)
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ if (hasEntitlement('github-app')) {
|
|||
const connectionManager = new ConnectionManager(prisma, settings, redis);
|
||||
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
||||
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
||||
const repoIndexManager = new RepoIndexManager(prisma, settings, redis);
|
||||
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);
|
||||
|
||||
connectionManager.startScheduler();
|
||||
repoIndexManager.startScheduler();
|
||||
|
|
|
|||
|
|
@ -10,84 +10,51 @@ export class PromClient {
|
|||
private app: express.Application;
|
||||
private server: Server;
|
||||
|
||||
public activeRepoIndexingJobs: Gauge<string>;
|
||||
public pendingRepoIndexingJobs: Gauge<string>;
|
||||
public repoIndexingReattemptsTotal: Counter<string>;
|
||||
public repoIndexingFailTotal: Counter<string>;
|
||||
public repoIndexingSuccessTotal: Counter<string>;
|
||||
|
||||
public activeRepoGarbageCollectionJobs: Gauge<string>;
|
||||
public repoGarbageCollectionErrorTotal: Counter<string>;
|
||||
public repoGarbageCollectionFailTotal: Counter<string>;
|
||||
public repoGarbageCollectionSuccessTotal: Counter<string>;
|
||||
public activeRepoIndexJobs: Gauge<string>;
|
||||
public pendingRepoIndexJobs: Gauge<string>;
|
||||
public repoIndexJobReattemptsTotal: Counter<string>;
|
||||
public repoIndexJobFailTotal: Counter<string>;
|
||||
public repoIndexJobSuccessTotal: Counter<string>;
|
||||
|
||||
public readonly PORT = 3060;
|
||||
|
||||
constructor() {
|
||||
this.registry = new Registry();
|
||||
|
||||
this.activeRepoIndexingJobs = new Gauge({
|
||||
name: 'active_repo_indexing_jobs',
|
||||
help: 'The number of repo indexing jobs in progress',
|
||||
labelNames: ['repo'],
|
||||
this.activeRepoIndexJobs = new Gauge({
|
||||
name: 'active_repo_index_jobs',
|
||||
help: 'The number of repo jobs in progress',
|
||||
labelNames: ['repo', 'type'],
|
||||
});
|
||||
this.registry.registerMetric(this.activeRepoIndexingJobs);
|
||||
this.registry.registerMetric(this.activeRepoIndexJobs);
|
||||
|
||||
this.pendingRepoIndexingJobs = new Gauge({
|
||||
name: 'pending_repo_indexing_jobs',
|
||||
help: 'The number of repo indexing jobs waiting in queue',
|
||||
labelNames: ['repo'],
|
||||
this.pendingRepoIndexJobs = new Gauge({
|
||||
name: 'pending_repo_index_jobs',
|
||||
help: 'The number of repo jobs waiting in queue',
|
||||
labelNames: ['repo', 'type'],
|
||||
});
|
||||
this.registry.registerMetric(this.pendingRepoIndexingJobs);
|
||||
this.registry.registerMetric(this.pendingRepoIndexJobs);
|
||||
|
||||
this.repoIndexingReattemptsTotal = new Counter({
|
||||
name: 'repo_indexing_reattempts',
|
||||
help: 'The number of repo indexing reattempts',
|
||||
labelNames: ['repo'],
|
||||
this.repoIndexJobReattemptsTotal = new Counter({
|
||||
name: 'repo_index_job_reattempts',
|
||||
help: 'The number of repo job reattempts',
|
||||
labelNames: ['repo', 'type'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoIndexingReattemptsTotal);
|
||||
this.registry.registerMetric(this.repoIndexJobReattemptsTotal);
|
||||
|
||||
this.repoIndexingFailTotal = new Counter({
|
||||
name: 'repo_indexing_fails',
|
||||
help: 'The number of repo indexing fails',
|
||||
labelNames: ['repo'],
|
||||
this.repoIndexJobFailTotal = new Counter({
|
||||
name: 'repo_index_job_fails',
|
||||
help: 'The number of repo job fails',
|
||||
labelNames: ['repo', 'type'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoIndexingFailTotal);
|
||||
this.registry.registerMetric(this.repoIndexJobFailTotal);
|
||||
|
||||
this.repoIndexingSuccessTotal = new Counter({
|
||||
name: 'repo_indexing_successes',
|
||||
help: 'The number of repo indexing successes',
|
||||
labelNames: ['repo'],
|
||||
this.repoIndexJobSuccessTotal = new Counter({
|
||||
name: 'repo_index_job_successes',
|
||||
help: 'The number of repo job successes',
|
||||
labelNames: ['repo', 'type'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoIndexingSuccessTotal);
|
||||
|
||||
this.activeRepoGarbageCollectionJobs = new Gauge({
|
||||
name: 'active_repo_garbage_collection_jobs',
|
||||
help: 'The number of repo garbage collection jobs in progress',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.activeRepoGarbageCollectionJobs);
|
||||
|
||||
this.repoGarbageCollectionErrorTotal = new Counter({
|
||||
name: 'repo_garbage_collection_errors',
|
||||
help: 'The number of repo garbage collection errors',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionErrorTotal);
|
||||
|
||||
this.repoGarbageCollectionFailTotal = new Counter({
|
||||
name: 'repo_garbage_collection_fails',
|
||||
help: 'The number of repo garbage collection fails',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionFailTotal);
|
||||
|
||||
this.repoGarbageCollectionSuccessTotal = new Counter({
|
||||
name: 'repo_garbage_collection_successes',
|
||||
help: 'The number of repo garbage collection successes',
|
||||
labelNames: ['repo'],
|
||||
});
|
||||
this.registry.registerMetric(this.repoGarbageCollectionSuccessTotal);
|
||||
this.registry.registerMetric(this.repoIndexJobSuccessTotal);
|
||||
|
||||
client.collectDefaultMetrics({
|
||||
register: this.registry,
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import { Redis } from 'ioredis';
|
|||
import { INDEX_CACHE_DIR } from './constants.js';
|
||||
import { env } from './env.js';
|
||||
import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
|
||||
import { PromClient } from './promClient.js';
|
||||
import { repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
|
||||
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
|
||||
import { indexGitRepository } from './zoekt.js';
|
||||
|
|
@ -43,6 +44,7 @@ export class RepoIndexManager {
|
|||
private db: PrismaClient,
|
||||
private settings: Settings,
|
||||
redis: Redis,
|
||||
private promClient: PromClient,
|
||||
) {
|
||||
this.queue = new Queue<JobPayload>({
|
||||
redis,
|
||||
|
|
@ -73,7 +75,7 @@ export class RepoIndexManager {
|
|||
this.interval = setInterval(async () => {
|
||||
await this.scheduleIndexJobs();
|
||||
await this.scheduleCleanupJobs();
|
||||
}, 1000 * 5);
|
||||
}, this.settings.reindexRepoPollingIntervalMs);
|
||||
|
||||
this.worker.run();
|
||||
}
|
||||
|
|
@ -135,7 +137,7 @@ export class RepoIndexManager {
|
|||
}
|
||||
}
|
||||
],
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
if (reposToIndex.length > 0) {
|
||||
|
|
@ -213,6 +215,9 @@ export class RepoIndexManager {
|
|||
},
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
const jobTypeLabel = getJobTypePrometheusLabel(type);
|
||||
this.promClient.pendingRepoIndexJobs.inc({ repo: job.repo.name, type: jobTypeLabel });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -243,6 +248,10 @@ export class RepoIndexManager {
|
|||
}
|
||||
});
|
||||
|
||||
const jobTypeLabel = getJobTypePrometheusLabel(jobType);
|
||||
this.promClient.pendingRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
this.promClient.activeRepoIndexJobs.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
|
||||
const abortController = new AbortController();
|
||||
const signalHandler = () => {
|
||||
logger.info(`Received shutdown signal, aborting...`);
|
||||
|
|
@ -378,6 +387,8 @@ export class RepoIndexManager {
|
|||
}
|
||||
});
|
||||
|
||||
const jobTypeLabel = getJobTypePrometheusLabel(jobData.type);
|
||||
|
||||
if (jobData.type === RepoIndexingJobType.INDEX) {
|
||||
const repo = await this.db.repo.update({
|
||||
where: { id: jobData.repoId },
|
||||
|
|
@ -395,6 +406,10 @@ export class RepoIndexManager {
|
|||
|
||||
logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`);
|
||||
}
|
||||
|
||||
// Track metrics for successful job
|
||||
this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
this.promClient.repoIndexJobSuccessTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
});
|
||||
|
||||
private onJobFailed = async (job: Job<JobPayload>) =>
|
||||
|
|
@ -404,6 +419,8 @@ export class RepoIndexManager {
|
|||
const attempt = job.attemptsMade + 1;
|
||||
const wasLastAttempt = attempt >= job.opts.attempts;
|
||||
|
||||
const jobTypeLabel = getJobTypePrometheusLabel(job.data.type);
|
||||
|
||||
if (wasLastAttempt) {
|
||||
const { repo } = await this.db.repoIndexingJob.update({
|
||||
where: { id: job.data.jobId },
|
||||
|
|
@ -415,12 +432,17 @@ export class RepoIndexManager {
|
|||
select: { repo: true }
|
||||
});
|
||||
|
||||
this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
this.promClient.repoIndexJobFailTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
|
||||
logger.error(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`);
|
||||
} else {
|
||||
const repo = await this.db.repo.findUniqueOrThrow({
|
||||
where: { id: job.data.repoId },
|
||||
});
|
||||
|
||||
this.promClient.repoIndexJobReattemptsTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||
|
||||
logger.warn(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
|
||||
}
|
||||
});
|
||||
|
|
@ -428,16 +450,20 @@ export class RepoIndexManager {
|
|||
private onJobStalled = async (jobId: string) =>
|
||||
groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => {
|
||||
const logger = createJobLogger(jobId);
|
||||
const { repo } = await this.db.repoIndexingJob.update({
|
||||
const { repo, type } = await this.db.repoIndexingJob.update({
|
||||
where: { id: jobId },
|
||||
data: {
|
||||
status: RepoIndexingJobStatus.FAILED,
|
||||
completedAt: new Date(),
|
||||
errorMessage: 'Job stalled',
|
||||
},
|
||||
select: { repo: true }
|
||||
select: { repo: true, type: true }
|
||||
});
|
||||
|
||||
const jobTypeLabel = getJobTypePrometheusLabel(type);
|
||||
this.promClient.activeRepoIndexJobs.dec({ repo: repo.name, type: jobTypeLabel });
|
||||
this.promClient.repoIndexJobFailTotal.inc({ repo: repo.name, type: jobTypeLabel });
|
||||
|
||||
logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`);
|
||||
});
|
||||
|
||||
|
|
@ -454,3 +480,5 @@ export class RepoIndexManager {
|
|||
await this.queue.close();
|
||||
}
|
||||
}
|
||||
|
||||
const getJobTypePrometheusLabel = (type: RepoIndexingJobType) => type === RepoIndexingJobType.INDEX ? 'index' : 'cleanup';
|
||||
Loading…
Reference in a new issue