db migration

This commit is contained in:
bkellam 2025-10-18 14:30:46 -07:00
parent 3f72a533c6
commit c912a0b9a9
7 changed files with 75 additions and 55 deletions

View file

@ -1,5 +1,5 @@
import * as Sentry from '@sentry/node'; import * as Sentry from '@sentry/node';
import { PrismaClient, Repo, RepoJobStatus, RepoJobType } from "@sourcebot/db"; import { PrismaClient, Repo, RepoIndexingJobStatus, RepoIndexingJobType } from "@sourcebot/db";
import { createLogger, Logger } from "@sourcebot/logger"; import { createLogger, Logger } from "@sourcebot/logger";
import { existsSync } from 'fs'; import { existsSync } from 'fs';
import { readdir, rm } from 'fs/promises'; import { readdir, rm } from 'fs/promises';
@ -97,7 +97,7 @@ export class RepoIndexManager {
some: { some: {
AND: [ AND: [
{ {
type: RepoJobType.INDEX, type: RepoIndexingJobType.INDEX,
}, },
{ {
OR: [ OR: [
@ -108,8 +108,8 @@ export class RepoIndexManager {
{ {
status: { status: {
in: [ in: [
RepoJobStatus.PENDING, RepoIndexingJobStatus.PENDING,
RepoJobStatus.IN_PROGRESS, RepoIndexingJobStatus.IN_PROGRESS,
] ]
}, },
}, },
@ -123,7 +123,7 @@ export class RepoIndexManager {
// Don't schedule if there are recent failed jobs (within the threshold date). // Don't schedule if there are recent failed jobs (within the threshold date).
{ {
AND: [ AND: [
{ status: RepoJobStatus.FAILED }, { status: RepoIndexingJobStatus.FAILED },
{ completedAt: { gt: thresholdDate } }, { completedAt: { gt: thresholdDate } },
] ]
} }
@ -139,7 +139,7 @@ export class RepoIndexManager {
}); });
if (reposToIndex.length > 0) { if (reposToIndex.length > 0) {
await this.createJobs(reposToIndex, RepoJobType.INDEX); await this.createJobs(reposToIndex, RepoIndexingJobType.INDEX);
} }
} }
@ -161,13 +161,13 @@ export class RepoIndexManager {
some: { some: {
AND: [ AND: [
{ {
type: RepoJobType.CLEANUP, type: RepoIndexingJobType.CLEANUP,
}, },
{ {
status: { status: {
in: [ in: [
RepoJobStatus.PENDING, RepoIndexingJobStatus.PENDING,
RepoJobStatus.IN_PROGRESS, RepoIndexingJobStatus.IN_PROGRESS,
] ]
}, },
}, },
@ -184,15 +184,15 @@ export class RepoIndexManager {
}); });
if (reposToCleanup.length > 0) { if (reposToCleanup.length > 0) {
await this.createJobs(reposToCleanup, RepoJobType.CLEANUP); await this.createJobs(reposToCleanup, RepoIndexingJobType.CLEANUP);
} }
} }
private async createJobs(repos: Repo[], type: RepoJobType) { private async createJobs(repos: Repo[], type: RepoIndexingJobType) {
// @note: we don't perform this in a transaction because // @note: we don't perform this in a transaction because
// we want to avoid the situation where a job is created and run // we want to avoid the situation where a job is created and run
// prior to the transaction being committed. // prior to the transaction being committed.
const jobs = await this.db.repoJob.createManyAndReturn({ const jobs = await this.db.repoIndexingJob.createManyAndReturn({
data: repos.map(repo => ({ data: repos.map(repo => ({
type, type,
repoId: repo.id, repoId: repo.id,
@ -222,12 +222,12 @@ export class RepoIndexManager {
logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
const { repo, type: jobType } = await this.db.repoJob.update({ const { repo, type: jobType } = await this.db.repoIndexingJob.update({
where: { where: {
id, id,
}, },
data: { data: {
status: RepoJobStatus.IN_PROGRESS, status: RepoIndexingJobStatus.IN_PROGRESS,
}, },
select: { select: {
type: true, type: true,
@ -253,9 +253,9 @@ export class RepoIndexManager {
process.on('SIGINT', signalHandler); process.on('SIGINT', signalHandler);
try { try {
if (jobType === RepoJobType.INDEX) { if (jobType === RepoIndexingJobType.INDEX) {
await this.indexRepository(repo, logger, abortController.signal); await this.indexRepository(repo, logger, abortController.signal);
} else if (jobType === RepoJobType.CLEANUP) { } else if (jobType === RepoIndexingJobType.CLEANUP) {
await this.cleanupRepository(repo, logger); await this.cleanupRepository(repo, logger);
} }
} finally { } finally {
@ -370,15 +370,15 @@ export class RepoIndexManager {
private onJobCompleted = async (job: Job<JobPayload>) => private onJobCompleted = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => { groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => {
const logger = createJobLogger(job.data.jobId); const logger = createJobLogger(job.data.jobId);
const jobData = await this.db.repoJob.update({ const jobData = await this.db.repoIndexingJob.update({
where: { id: job.data.jobId }, where: { id: job.data.jobId },
data: { data: {
status: RepoJobStatus.COMPLETED, status: RepoIndexingJobStatus.COMPLETED,
completedAt: new Date(), completedAt: new Date(),
} }
}); });
if (jobData.type === RepoJobType.INDEX) { if (jobData.type === RepoIndexingJobType.INDEX) {
const repo = await this.db.repo.update({ const repo = await this.db.repo.update({
where: { id: jobData.repoId }, where: { id: jobData.repoId },
data: { data: {
@ -388,7 +388,7 @@ export class RepoIndexManager {
logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`); logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`);
} }
else if (jobData.type === RepoJobType.CLEANUP) { else if (jobData.type === RepoIndexingJobType.CLEANUP) {
const repo = await this.db.repo.delete({ const repo = await this.db.repo.delete({
where: { id: jobData.repoId }, where: { id: jobData.repoId },
}); });
@ -405,10 +405,10 @@ export class RepoIndexManager {
const wasLastAttempt = attempt >= job.opts.attempts; const wasLastAttempt = attempt >= job.opts.attempts;
if (wasLastAttempt) { if (wasLastAttempt) {
const { repo } = await this.db.repoJob.update({ const { repo } = await this.db.repoIndexingJob.update({
where: { id: job.data.jobId }, where: { id: job.data.jobId },
data: { data: {
status: RepoJobStatus.FAILED, status: RepoIndexingJobStatus.FAILED,
completedAt: new Date(), completedAt: new Date(),
errorMessage: job.failedReason, errorMessage: job.failedReason,
}, },
@ -428,10 +428,10 @@ export class RepoIndexManager {
private onJobStalled = async (jobId: string) => private onJobStalled = async (jobId: string) =>
groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => { groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => {
const logger = createJobLogger(jobId); const logger = createJobLogger(jobId);
const { repo } = await this.db.repoJob.update({ const { repo } = await this.db.repoIndexingJob.update({
where: { id: jobId }, where: { id: jobId },
data: { data: {
status: RepoJobStatus.FAILED, status: RepoIndexingJobStatus.FAILED,
completedAt: new Date(), completedAt: new Date(),
errorMessage: 'Job stalled', errorMessage: 'Job stalled',
}, },

View file

@ -246,7 +246,7 @@ const createGitCloneUrlWithToken = (cloneUrl: string, credentials: { username?:
/** /**
* Wraps groupmq worker lifecycle callbacks with exception handling. This prevents * Wraps groupmq worker lifecycle callbacks with exception handling. This prevents
* uncaught exceptions (e.g., like a RepoJob not existing in the DB) from crashing * uncaught exceptions (e.g., like a RepoIndexingJob not existing in the DB) from crashing
* the app. * the app.
* @see: https://openpanel-dev.github.io/groupmq/api-worker/#events * @see: https://openpanel-dev.github.io/groupmq/api-worker/#events
*/ */

View file

@ -0,0 +1,34 @@
/*
Warnings:
- You are about to drop the column `repoIndexingStatus` on the `Repo` table. All the data in the column will be lost.
*/
-- CreateEnum
CREATE TYPE "RepoIndexingJobStatus" AS ENUM ('PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED');
-- CreateEnum
CREATE TYPE "RepoIndexingJobType" AS ENUM ('INDEX', 'CLEANUP');
-- AlterTable
ALTER TABLE "Repo" DROP COLUMN "repoIndexingStatus";
-- DropEnum
DROP TYPE "RepoIndexingStatus";
-- CreateTable
CREATE TABLE "RepoIndexingJob" (
"id" TEXT NOT NULL,
"type" "RepoIndexingJobType" NOT NULL,
"status" "RepoIndexingJobStatus" NOT NULL DEFAULT 'PENDING',
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
"completedAt" TIMESTAMP(3),
"errorMessage" TEXT,
"repoId" INTEGER NOT NULL,
CONSTRAINT "RepoIndexingJob_pkey" PRIMARY KEY ("id")
);
-- AddForeignKey
ALTER TABLE "RepoIndexingJob" ADD CONSTRAINT "RepoIndexingJob_repoId_fkey" FOREIGN KEY ("repoId") REFERENCES "Repo"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View file

@ -10,17 +10,6 @@ datasource db {
url = env("DATABASE_URL") url = env("DATABASE_URL")
} }
enum RepoIndexingStatus {
NEW
IN_INDEX_QUEUE
INDEXING
INDEXED
FAILED
IN_GC_QUEUE
GARBAGE_COLLECTING
GARBAGE_COLLECTION_FAILED
}
enum ConnectionSyncStatus { enum ConnectionSyncStatus {
SYNC_NEEDED SYNC_NEEDED
IN_SYNC_QUEUE IN_SYNC_QUEUE
@ -55,14 +44,11 @@ model Repo {
connections RepoToConnection[] connections RepoToConnection[]
imageUrl String? imageUrl String?
/// @deprecated status tracking is now done via the `jobs` table.
repoIndexingStatus RepoIndexingStatus @default(NEW)
permittedUsers UserToRepoPermission[] permittedUsers UserToRepoPermission[]
permissionSyncJobs RepoPermissionSyncJob[] permissionSyncJobs RepoPermissionSyncJob[]
permissionSyncedAt DateTime? /// When the permissions were last synced successfully. permissionSyncedAt DateTime? /// When the permissions were last synced successfully.
jobs RepoJob[] jobs RepoIndexingJob[]
indexedAt DateTime? /// When the repo was last indexed successfully. indexedAt DateTime? /// When the repo was last indexed successfully.
external_id String /// The id of the repo in the external service external_id String /// The id of the repo in the external service
@ -78,22 +64,22 @@ model Repo {
@@index([orgId]) @@index([orgId])
} }
enum RepoJobStatus { enum RepoIndexingJobStatus {
PENDING PENDING
IN_PROGRESS IN_PROGRESS
COMPLETED COMPLETED
FAILED FAILED
} }
enum RepoJobType { enum RepoIndexingJobType {
INDEX INDEX
CLEANUP CLEANUP
} }
model RepoJob { model RepoIndexingJob {
id String @id @default(cuid()) id String @id @default(cuid())
type RepoJobType type RepoIndexingJobType
status RepoJobStatus @default(PENDING) status RepoIndexingJobStatus @default(PENDING)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
completedAt DateTime? completedAt DateTime?

View file

@ -10,7 +10,7 @@ import { prisma } from "@/prisma";
import { render } from "@react-email/components"; import { render } from "@react-email/components";
import * as Sentry from '@sentry/nextjs'; import * as Sentry from '@sentry/nextjs';
import { encrypt, generateApiKey, getTokenFromConfig, hashSecret } from "@sourcebot/crypto"; import { encrypt, generateApiKey, getTokenFromConfig, hashSecret } from "@sourcebot/crypto";
import { ApiKey, Org, OrgRole, Prisma, RepoJobStatus, RepoJobType, StripeSubscriptionStatus } from "@sourcebot/db"; import { ApiKey, Org, OrgRole, Prisma, RepoIndexingJobStatus, RepoIndexingJobType, StripeSubscriptionStatus } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger"; import { createLogger } from "@sourcebot/logger";
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type"; import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type";
import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type"; import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type";
@ -586,11 +586,11 @@ export const getReposStats = async () => sew(() =>
orgId: org.id, orgId: org.id,
jobs: { jobs: {
some: { some: {
type: RepoJobType.INDEX, type: RepoIndexingJobType.INDEX,
status: { status: {
in: [ in: [
RepoJobStatus.PENDING, RepoIndexingJobStatus.PENDING,
RepoJobStatus.IN_PROGRESS, RepoIndexingJobStatus.IN_PROGRESS,
] ]
} }
}, },

View file

@ -10,7 +10,7 @@ import { env } from "@/env.mjs";
import { ServiceErrorException } from "@/lib/serviceError"; import { ServiceErrorException } from "@/lib/serviceError";
import { isServiceError } from "@/lib/utils"; import { isServiceError } from "@/lib/utils";
import { DiscordLogoIcon, GitHubLogoIcon } from "@radix-ui/react-icons"; import { DiscordLogoIcon, GitHubLogoIcon } from "@radix-ui/react-icons";
import { RepoJobStatus, RepoJobType } from "@sourcebot/db"; import { RepoIndexingJobStatus, RepoIndexingJobType } from "@sourcebot/db";
import Link from "next/link"; import Link from "next/link";
import { redirect } from "next/navigation"; import { redirect } from "next/navigation";
import { OrgSelector } from "../orgSelector"; import { OrgSelector } from "../orgSelector";
@ -43,11 +43,11 @@ export const NavigationMenu = async ({
where: { where: {
jobs: { jobs: {
some: { some: {
type: RepoJobType.INDEX, type: RepoIndexingJobType.INDEX,
status: { status: {
in: [ in: [
RepoJobStatus.PENDING, RepoIndexingJobStatus.PENDING,
RepoJobStatus.IN_PROGRESS, RepoIndexingJobStatus.IN_PROGRESS,
] ]
} }
}, },

View file

@ -1,5 +1,5 @@
import { env } from "@/env.mjs"; import { env } from "@/env.mjs";
import { RepoJob } from "@sourcebot/db"; import { RepoIndexingJob } from "@sourcebot/db";
import { Header } from "../components/header"; import { Header } from "../components/header";
import { RepoStatus } from "./columns"; import { RepoStatus } from "./columns";
import { RepositoryTable } from "./repositoryTable"; import { RepositoryTable } from "./repositoryTable";
@ -8,7 +8,7 @@ import { withOptionalAuthV2 } from "@/withAuthV2";
import { isServiceError } from "@/lib/utils"; import { isServiceError } from "@/lib/utils";
import { ServiceErrorException } from "@/lib/serviceError"; import { ServiceErrorException } from "@/lib/serviceError";
function getRepoStatus(repo: { indexedAt: Date | null, jobs: RepoJob[] }): RepoStatus { function getRepoStatus(repo: { indexedAt: Date | null, jobs: RepoIndexingJob[] }): RepoStatus {
const latestJob = repo.jobs[0]; const latestJob = repo.jobs[0];
if (latestJob?.status === 'PENDING' || latestJob?.status === 'IN_PROGRESS') { if (latestJob?.status === 'PENDING' || latestJob?.status === 'IN_PROGRESS') {