mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 04:15:30 +00:00
metrics
This commit is contained in:
parent
c2299aa86b
commit
d490c0e740
3 changed files with 64 additions and 69 deletions
|
|
@ -67,7 +67,7 @@ if (hasEntitlement('github-app')) {
|
||||||
const connectionManager = new ConnectionManager(prisma, settings, redis);
|
const connectionManager = new ConnectionManager(prisma, settings, redis);
|
||||||
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
||||||
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
||||||
const repoIndexManager = new RepoIndexManager(prisma, settings, redis);
|
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);
|
||||||
|
|
||||||
connectionManager.startScheduler();
|
connectionManager.startScheduler();
|
||||||
repoIndexManager.startScheduler();
|
repoIndexManager.startScheduler();
|
||||||
|
|
|
||||||
|
|
@ -10,84 +10,51 @@ export class PromClient {
|
||||||
private app: express.Application;
|
private app: express.Application;
|
||||||
private server: Server;
|
private server: Server;
|
||||||
|
|
||||||
public activeRepoIndexingJobs: Gauge<string>;
|
public activeRepoIndexJobs: Gauge<string>;
|
||||||
public pendingRepoIndexingJobs: Gauge<string>;
|
public pendingRepoIndexJobs: Gauge<string>;
|
||||||
public repoIndexingReattemptsTotal: Counter<string>;
|
public repoIndexJobReattemptsTotal: Counter<string>;
|
||||||
public repoIndexingFailTotal: Counter<string>;
|
public repoIndexJobFailTotal: Counter<string>;
|
||||||
public repoIndexingSuccessTotal: Counter<string>;
|
public repoIndexJobSuccessTotal: Counter<string>;
|
||||||
|
|
||||||
public activeRepoGarbageCollectionJobs: Gauge<string>;
|
|
||||||
public repoGarbageCollectionErrorTotal: Counter<string>;
|
|
||||||
public repoGarbageCollectionFailTotal: Counter<string>;
|
|
||||||
public repoGarbageCollectionSuccessTotal: Counter<string>;
|
|
||||||
|
|
||||||
public readonly PORT = 3060;
|
public readonly PORT = 3060;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.registry = new Registry();
|
this.registry = new Registry();
|
||||||
|
|
||||||
this.activeRepoIndexingJobs = new Gauge({
|
this.activeRepoIndexJobs = new Gauge({
|
||||||
name: 'active_repo_indexing_jobs',
|
name: 'active_repo_index_jobs',
|
||||||
help: 'The number of repo indexing jobs in progress',
|
help: 'The number of repo jobs in progress',
|
||||||
labelNames: ['repo'],
|
labelNames: ['repo', 'type'],
|
||||||
});
|
});
|
||||||
this.registry.registerMetric(this.activeRepoIndexingJobs);
|
this.registry.registerMetric(this.activeRepoIndexJobs);
|
||||||
|
|
||||||
this.pendingRepoIndexingJobs = new Gauge({
|
this.pendingRepoIndexJobs = new Gauge({
|
||||||
name: 'pending_repo_indexing_jobs',
|
name: 'pending_repo_index_jobs',
|
||||||
help: 'The number of repo indexing jobs waiting in queue',
|
help: 'The number of repo jobs waiting in queue',
|
||||||
labelNames: ['repo'],
|
labelNames: ['repo', 'type'],
|
||||||
});
|
});
|
||||||
this.registry.registerMetric(this.pendingRepoIndexingJobs);
|
this.registry.registerMetric(this.pendingRepoIndexJobs);
|
||||||
|
|
||||||
this.repoIndexingReattemptsTotal = new Counter({
|
this.repoIndexJobReattemptsTotal = new Counter({
|
||||||
name: 'repo_indexing_reattempts',
|
name: 'repo_index_job_reattempts',
|
||||||
help: 'The number of repo indexing reattempts',
|
help: 'The number of repo job reattempts',
|
||||||
labelNames: ['repo'],
|
labelNames: ['repo', 'type'],
|
||||||
});
|
});
|
||||||
this.registry.registerMetric(this.repoIndexingReattemptsTotal);
|
this.registry.registerMetric(this.repoIndexJobReattemptsTotal);
|
||||||
|
|
||||||
this.repoIndexingFailTotal = new Counter({
|
this.repoIndexJobFailTotal = new Counter({
|
||||||
name: 'repo_indexing_fails',
|
name: 'repo_index_job_fails',
|
||||||
help: 'The number of repo indexing fails',
|
help: 'The number of repo job fails',
|
||||||
labelNames: ['repo'],
|
labelNames: ['repo', 'type'],
|
||||||
});
|
});
|
||||||
this.registry.registerMetric(this.repoIndexingFailTotal);
|
this.registry.registerMetric(this.repoIndexJobFailTotal);
|
||||||
|
|
||||||
this.repoIndexingSuccessTotal = new Counter({
|
this.repoIndexJobSuccessTotal = new Counter({
|
||||||
name: 'repo_indexing_successes',
|
name: 'repo_index_job_successes',
|
||||||
help: 'The number of repo indexing successes',
|
help: 'The number of repo job successes',
|
||||||
labelNames: ['repo'],
|
labelNames: ['repo', 'type'],
|
||||||
});
|
});
|
||||||
this.registry.registerMetric(this.repoIndexingSuccessTotal);
|
this.registry.registerMetric(this.repoIndexJobSuccessTotal);
|
||||||
|
|
||||||
this.activeRepoGarbageCollectionJobs = new Gauge({
|
|
||||||
name: 'active_repo_garbage_collection_jobs',
|
|
||||||
help: 'The number of repo garbage collection jobs in progress',
|
|
||||||
labelNames: ['repo'],
|
|
||||||
});
|
|
||||||
this.registry.registerMetric(this.activeRepoGarbageCollectionJobs);
|
|
||||||
|
|
||||||
this.repoGarbageCollectionErrorTotal = new Counter({
|
|
||||||
name: 'repo_garbage_collection_errors',
|
|
||||||
help: 'The number of repo garbage collection errors',
|
|
||||||
labelNames: ['repo'],
|
|
||||||
});
|
|
||||||
this.registry.registerMetric(this.repoGarbageCollectionErrorTotal);
|
|
||||||
|
|
||||||
this.repoGarbageCollectionFailTotal = new Counter({
|
|
||||||
name: 'repo_garbage_collection_fails',
|
|
||||||
help: 'The number of repo garbage collection fails',
|
|
||||||
labelNames: ['repo'],
|
|
||||||
});
|
|
||||||
this.registry.registerMetric(this.repoGarbageCollectionFailTotal);
|
|
||||||
|
|
||||||
this.repoGarbageCollectionSuccessTotal = new Counter({
|
|
||||||
name: 'repo_garbage_collection_successes',
|
|
||||||
help: 'The number of repo garbage collection successes',
|
|
||||||
labelNames: ['repo'],
|
|
||||||
});
|
|
||||||
this.registry.registerMetric(this.repoGarbageCollectionSuccessTotal);
|
|
||||||
|
|
||||||
client.collectDefaultMetrics({
|
client.collectDefaultMetrics({
|
||||||
register: this.registry,
|
register: this.registry,
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import { Redis } from 'ioredis';
|
||||||
import { INDEX_CACHE_DIR } from './constants.js';
|
import { INDEX_CACHE_DIR } from './constants.js';
|
||||||
import { env } from './env.js';
|
import { env } from './env.js';
|
||||||
import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
|
import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
|
||||||
|
import { PromClient } from './promClient.js';
|
||||||
import { repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
|
import { repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
|
||||||
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
|
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
|
||||||
import { indexGitRepository } from './zoekt.js';
|
import { indexGitRepository } from './zoekt.js';
|
||||||
|
|
@ -43,6 +44,7 @@ export class RepoIndexManager {
|
||||||
private db: PrismaClient,
|
private db: PrismaClient,
|
||||||
private settings: Settings,
|
private settings: Settings,
|
||||||
redis: Redis,
|
redis: Redis,
|
||||||
|
private promClient: PromClient,
|
||||||
) {
|
) {
|
||||||
this.queue = new Queue<JobPayload>({
|
this.queue = new Queue<JobPayload>({
|
||||||
redis,
|
redis,
|
||||||
|
|
@ -73,7 +75,7 @@ export class RepoIndexManager {
|
||||||
this.interval = setInterval(async () => {
|
this.interval = setInterval(async () => {
|
||||||
await this.scheduleIndexJobs();
|
await this.scheduleIndexJobs();
|
||||||
await this.scheduleCleanupJobs();
|
await this.scheduleCleanupJobs();
|
||||||
}, 1000 * 5);
|
}, this.settings.reindexRepoPollingIntervalMs);
|
||||||
|
|
||||||
this.worker.run();
|
this.worker.run();
|
||||||
}
|
}
|
||||||
|
|
@ -135,7 +137,7 @@ export class RepoIndexManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
}
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
if (reposToIndex.length > 0) {
|
if (reposToIndex.length > 0) {
|
||||||
|
|
@ -213,6 +215,9 @@ export class RepoIndexManager {
|
||||||
},
|
},
|
||||||
jobId: job.id,
|
jobId: job.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const jobTypeLabel = getJobTypePrometheusLabel(type);
|
||||||
|
this.promClient.pendingRepoIndexJobs.inc({ repo: job.repo.name, type: jobTypeLabel });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -243,6 +248,10 @@ export class RepoIndexManager {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const jobTypeLabel = getJobTypePrometheusLabel(jobType);
|
||||||
|
this.promClient.pendingRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
|
this.promClient.activeRepoIndexJobs.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
|
|
||||||
const abortController = new AbortController();
|
const abortController = new AbortController();
|
||||||
const signalHandler = () => {
|
const signalHandler = () => {
|
||||||
logger.info(`Received shutdown signal, aborting...`);
|
logger.info(`Received shutdown signal, aborting...`);
|
||||||
|
|
@ -378,6 +387,8 @@ export class RepoIndexManager {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const jobTypeLabel = getJobTypePrometheusLabel(jobData.type);
|
||||||
|
|
||||||
if (jobData.type === RepoIndexingJobType.INDEX) {
|
if (jobData.type === RepoIndexingJobType.INDEX) {
|
||||||
const repo = await this.db.repo.update({
|
const repo = await this.db.repo.update({
|
||||||
where: { id: jobData.repoId },
|
where: { id: jobData.repoId },
|
||||||
|
|
@ -395,6 +406,10 @@ export class RepoIndexManager {
|
||||||
|
|
||||||
logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`);
|
logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track metrics for successful job
|
||||||
|
this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
|
this.promClient.repoIndexJobSuccessTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
});
|
});
|
||||||
|
|
||||||
private onJobFailed = async (job: Job<JobPayload>) =>
|
private onJobFailed = async (job: Job<JobPayload>) =>
|
||||||
|
|
@ -404,6 +419,8 @@ export class RepoIndexManager {
|
||||||
const attempt = job.attemptsMade + 1;
|
const attempt = job.attemptsMade + 1;
|
||||||
const wasLastAttempt = attempt >= job.opts.attempts;
|
const wasLastAttempt = attempt >= job.opts.attempts;
|
||||||
|
|
||||||
|
const jobTypeLabel = getJobTypePrometheusLabel(job.data.type);
|
||||||
|
|
||||||
if (wasLastAttempt) {
|
if (wasLastAttempt) {
|
||||||
const { repo } = await this.db.repoIndexingJob.update({
|
const { repo } = await this.db.repoIndexingJob.update({
|
||||||
where: { id: job.data.jobId },
|
where: { id: job.data.jobId },
|
||||||
|
|
@ -415,12 +432,17 @@ export class RepoIndexManager {
|
||||||
select: { repo: true }
|
select: { repo: true }
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
|
this.promClient.repoIndexJobFailTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
|
|
||||||
logger.error(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`);
|
logger.error(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`);
|
||||||
} else {
|
} else {
|
||||||
const repo = await this.db.repo.findUniqueOrThrow({
|
const repo = await this.db.repo.findUniqueOrThrow({
|
||||||
where: { id: job.data.repoId },
|
where: { id: job.data.repoId },
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.promClient.repoIndexJobReattemptsTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
|
||||||
|
|
||||||
logger.warn(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
|
logger.warn(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -428,16 +450,20 @@ export class RepoIndexManager {
|
||||||
private onJobStalled = async (jobId: string) =>
|
private onJobStalled = async (jobId: string) =>
|
||||||
groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => {
|
groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => {
|
||||||
const logger = createJobLogger(jobId);
|
const logger = createJobLogger(jobId);
|
||||||
const { repo } = await this.db.repoIndexingJob.update({
|
const { repo, type } = await this.db.repoIndexingJob.update({
|
||||||
where: { id: jobId },
|
where: { id: jobId },
|
||||||
data: {
|
data: {
|
||||||
status: RepoIndexingJobStatus.FAILED,
|
status: RepoIndexingJobStatus.FAILED,
|
||||||
completedAt: new Date(),
|
completedAt: new Date(),
|
||||||
errorMessage: 'Job stalled',
|
errorMessage: 'Job stalled',
|
||||||
},
|
},
|
||||||
select: { repo: true }
|
select: { repo: true, type: true }
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const jobTypeLabel = getJobTypePrometheusLabel(type);
|
||||||
|
this.promClient.activeRepoIndexJobs.dec({ repo: repo.name, type: jobTypeLabel });
|
||||||
|
this.promClient.repoIndexJobFailTotal.inc({ repo: repo.name, type: jobTypeLabel });
|
||||||
|
|
||||||
logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`);
|
logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -454,3 +480,5 @@ export class RepoIndexManager {
|
||||||
await this.queue.close();
|
await this.queue.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const getJobTypePrometheusLabel = (type: RepoIndexingJobType) => type === RepoIndexingJobType.INDEX ? 'index' : 'cleanup';
|
||||||
Loading…
Reference in a new issue