diff --git a/packages/backend/src/indexSyncer.ts b/packages/backend/src/indexSyncer.ts index aba95fbf..f1677598 100644 --- a/packages/backend/src/indexSyncer.ts +++ b/packages/backend/src/indexSyncer.ts @@ -1,8 +1,8 @@ import { createBullBoard } from '@bull-board/api'; import { ExpressAdapter } from '@bull-board/express'; import * as Sentry from '@sentry/node'; -import { PrismaClient, Repo, RepoIndexingJobStatus } from "@sourcebot/db"; -import { createLogger } from "@sourcebot/logger"; +import { Prisma, PrismaClient, Repo, RepoIndexingJobStatus } from "@sourcebot/db"; +import { createLogger, Logger, Transport, TransportStreamOptions } from "@sourcebot/logger"; import express from 'express'; import { BullBoardGroupMQAdapter, Job, Queue, ReservedJob, Worker } from "groupmq"; import { Redis } from 'ioredis'; @@ -12,7 +12,87 @@ import { existsSync } from 'fs'; import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from './git.js'; import { indexGitRepository } from './zoekt.js'; -const logger = createLogger('index-syncer'); +interface LogEntry { + message: string; +} + +interface DatabaseTransportOptions extends TransportStreamOptions { + writer: (logs: LogEntry[]) => Promise; +} + +export class DatabaseTransport extends Transport { + private logs: LogEntry[] = []; + private writer: (logs: LogEntry[]) => Promise; + + constructor(opts: DatabaseTransportOptions) { + super(opts); + this.writer = opts.writer; + } + + log(info: any, callback: () => void) { + setImmediate(() => { + this.emit('logged', info); + }); + + // Capture structured log data + const logEntry: LogEntry = { + // timestamp: info.timestamp, + // level: info.level, + message: info.message, + // label: info.label, + // stack: info.stack, + // metadata: info.metadata || {}, + // ...info // Include any additional fields + }; + + this.logs.push(logEntry); + + callback(); + } + + async flush() { + if (this.logs.length > 0) { + await this.writer(this.logs); + this.logs = []; + } + } +} + + +const useScopedLogger = async (jobId: string, db: PrismaClient, cb: (logger: Logger) => Promise) => { + const transport = new DatabaseTransport({ + writer: async (logs) => { + try { + const existingLogs = await db.repoIndexingJob.findUnique({ + where: { id: jobId }, + select: { logs: true } + }); + + await db.repoIndexingJob.update({ + where: { id: jobId }, + data: { + logs: [ + ...(existingLogs?.logs as unknown as LogEntry[] ?? []), + ...logs, + ] as unknown as Prisma.InputJsonValue, + } + }) + } catch (error) { + console.error(`Error writing logs for job ${jobId}.`, error); + } + } + }); + + const logger = createLogger('index-syncer', [ + transport, + ]); + + try { + await cb(logger); + } finally { + await transport.flush(); + } +} type IndexSyncJob = { jobId: string; @@ -24,6 +104,7 @@ export class IndexSyncer { private interval?: NodeJS.Timeout; private queue: Queue; private worker: Worker; + private globalLogger: Logger; constructor( private db: PrismaClient, @@ -31,6 +112,7 @@ export class IndexSyncer { redis: Redis, private ctx: AppContext, ) { + this.globalLogger = createLogger('index-syncer'); this.queue = new Queue({ redis, namespace: 'index-sync-queue', @@ -51,7 +133,7 @@ export class IndexSyncer { this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', async (error) => { Sentry.captureException(error); - logger.error(`Index syncer worker error.`, error); + this.globalLogger.error(`Index syncer worker error.`, error); }); // @nocheckin @@ -151,32 +233,84 @@ export class IndexSyncer { } } - private async runJob(job: ReservedJob) { - const id = job.data.jobId; - const { repo } = await this.db.repoIndexingJob.update({ - where: { - id, - }, - data: { - status: RepoIndexingJobStatus.IN_PROGRESS, - }, - select: { - repo: { - include: { - connections: { - include: { - connection: true, + private runJob = async (job: ReservedJob) => + useScopedLogger(job.data.jobId, this.db, async (logger) => { + const id = job.data.jobId; + + const { repo } = await this.db.repoIndexingJob.update({ + where: { + id, + }, + data: { + status: RepoIndexingJobStatus.IN_PROGRESS, + }, + select: { + repo: { + include: { + connections: { + include: { + connection: true, + } } } } } - } - }); + }); - await this.syncGitRepository(repo); - } + await this._syncGitRepository(repo, logger); + }) - private async syncGitRepository(repo: RepoWithConnections) { + private onJobCompleted = async (job: Job) => + useScopedLogger(job.data.jobId, this.db, async (logger) => { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoIndexingJobStatus.COMPLETED, + repo: { + update: { + indexedAt: new Date(), + } + }, + completedAt: new Date(), + }, + select: { repo: true } + }); + + logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`); + }) + + private onJobFailed = (job: Job) => + useScopedLogger(job.data.jobId, this.db, async (logger) => { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: job.failedReason, + }, + select: { repo: true } + }); + + logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`); + }) + + private onJobStalled = (jobId: string) => + useScopedLogger(jobId, this.db, async (logger) => { + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job stalled', + }, + select: { repo: true } + }); + + logger.error(`Job ${jobId} stalled for repo ${repo.name}`); + }) + + + private _syncGitRepository = async (repo: RepoWithConnections, logger: Logger) => { const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx); const metadata = repoMetadataSchema.parse(repo.metadata); @@ -238,51 +372,6 @@ export class IndexSyncer { logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`); } - private async onJobCompleted(job: Job) { - const { repo } = await this.db.repoIndexingJob.update({ - where: { id: job.data.jobId }, - data: { - status: RepoIndexingJobStatus.COMPLETED, - repo: { - update: { - indexedAt: new Date(), - } - }, - completedAt: new Date(), - }, - select: { repo: true } - }); - - logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`); - } - - private async onJobFailed(job: Job) { - const { repo } = await this.db.repoIndexingJob.update({ - where: { id: job.data.jobId }, - data: { - status: RepoIndexingJobStatus.FAILED, - completedAt: new Date(), - errorMessage: job.failedReason, - }, - select: { repo: true} - }); - - logger.error(`Failed index job ${job.data.jobId} for repo ${repo.name}`); - } - - private async onJobStalled(jobId: string) { - const { repo } = await this.db.repoIndexingJob.update({ - where: { id: jobId }, - data: { - status: RepoIndexingJobStatus.FAILED, - completedAt: new Date(), - errorMessage: 'Job stalled', - }, - select: { repo: true } - }); - - logger.error(`Job ${jobId} stalled for repo ${repo.name}`); - } public dispose() { if (this.interval) { @@ -291,4 +380,4 @@ export class IndexSyncer { this.worker.close(); this.queue.close(); } -} \ No newline at end of file +} diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 75e30ec9..4f9e11ef 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -91,6 +91,7 @@ model RepoIndexingJob { completedAt DateTime? errorMessage String? + logs Json? repo Repo @relation(fields: [repoId], references: [id], onDelete: Cascade) repoId Int diff --git a/packages/logger/package.json b/packages/logger/package.json index 2e2279a3..e5a781c3 100644 --- a/packages/logger/package.json +++ b/packages/logger/package.json @@ -15,6 +15,7 @@ "dotenv": "^16.4.5", "triple-beam": "^1.4.1", "winston": "^3.15.0", + "winston-transport": "^4.9.0", "zod": "^3.24.3" }, "devDependencies": { diff --git a/packages/logger/src/index.ts b/packages/logger/src/index.ts index d3998d2c..746d848d 100644 --- a/packages/logger/src/index.ts +++ b/packages/logger/src/index.ts @@ -1,4 +1,5 @@ -import winston, { format } from 'winston'; +import winston, { format, Logger } from 'winston'; +import Transport, { TransportStreamOptions } from 'winston-transport'; import { Logtail } from '@logtail/node'; import { LogtailTransport } from '@logtail/winston'; import { MESSAGE } from 'triple-beam'; @@ -40,7 +41,7 @@ const humanReadableFormat = printf(({ level, message, timestamp, stack, label: _ return `${timestamp} ${level}: ${label}${message}`; }); -const createLogger = (label: string) => { +const createLogger = (label: string, transports: Transport[] = []) => { const isStructuredLoggingEnabled = env.SOURCEBOT_STRUCTURED_LOGGING_ENABLED === 'true'; return winston.createLogger({ @@ -78,10 +79,17 @@ const createLogger = (label: string) => { }) ) ] : []), + ...transports, ] }); } export { - createLogger -}; \ No newline at end of file + createLogger, + Transport, + Logger +}; + +export type { + TransportStreamOptions, +} \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index 6fe84282..3cadb59e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7684,6 +7684,7 @@ __metadata: triple-beam: "npm:^1.4.1" typescript: "npm:^5.7.3" winston: "npm:^3.15.0" + winston-transport: "npm:^4.9.0" zod: "npm:^3.24.3" languageName: unknown linkType: soft