cleanup org's repos and shards if it's inactive (#194)

* add stripe subscription status and webhook

* add inactive org repo cleanup logic

* mark reactivated org connections for sync
This commit is contained in:
Michael Sukkarieh 2025-02-15 09:58:17 -08:00 committed by GitHub
parent 86a80a4f73
commit 3be3680ee2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 151 additions and 5 deletions

View file

@ -1,7 +1,7 @@
import { Job, Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { createLogger } from "./logger.js";
import { Connection, PrismaClient, Repo, RepoToConnection, RepoIndexingStatus } from "@sourcebot/db";
import { Connection, PrismaClient, Repo, RepoToConnection, RepoIndexingStatus, StripeSubscriptionStatus } from "@sourcebot/db";
import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
import { AppContext, Settings } from "./types.js";
import { captureEvent } from "./posthog.js";
@ -106,8 +106,33 @@ export class RepoManager implements IRepoManager {
}
});
for (const repo of reposWithNoConnections) {
this.logger.info(`Garbage collecting repo with no connections: ${repo.id}`);
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const inactiveOrgs = await this.db.org.findMany({
where: {
stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE,
stripeLastUpdatedAt: {
lt: sevenDaysAgo
}
}
});
const inactiveOrgIds = inactiveOrgs.map(org => org.id);
const inactiveOrgRepos = await this.db.repo.findMany({
where: {
orgId: {
in: inactiveOrgIds
}
}
});
if (inactiveOrgIds.length > 0 && inactiveOrgRepos.length > 0) {
console.log(`Garbage collecting ${inactiveOrgs.length} inactive orgs: ${inactiveOrgIds.join(', ')}`);
}
const reposToDelete = [...reposWithNoConnections, ...inactiveOrgRepos];
for (const repo of reposToDelete) {
this.logger.info(`Garbage collecting repo: ${repo.id}`);
// delete cloned repo
const repoPath = getRepoPath(repo, this.ctx);
@ -129,7 +154,7 @@ export class RepoManager implements IRepoManager {
await this.db.repo.deleteMany({
where: {
id: {
in: reposWithNoConnections.map(repo => repo.id)
in: reposToDelete.map(repo => repo.id)
}
}
});

View file

@ -0,0 +1,6 @@
-- CreateEnum
CREATE TYPE "StripeSubscriptionStatus" AS ENUM ('ACTIVE', 'INACTIVE');
-- AlterTable
ALTER TABLE "Org" ADD COLUMN "stripeLastUpdatedAt" TIMESTAMP(3),
ADD COLUMN "stripeSubscriptionStatus" "StripeSubscriptionStatus";

View file

@ -26,6 +26,11 @@ enum ConnectionSyncStatus {
FAILED
}
enum StripeSubscriptionStatus {
ACTIVE
INACTIVE
}
model Repo {
id Int @id @default(autoincrement())
name String
@ -115,7 +120,9 @@ model Org {
repos Repo[]
secrets Secret[]
stripeCustomerId String?
stripeCustomerId String?
stripeSubscriptionStatus StripeSubscriptionStatus?
stripeLastUpdatedAt DateTime?
/// List of pending invites to this organization
invites Invite[]

View file

@ -20,6 +20,7 @@ import { getStripe } from "@/lib/stripe"
import { getUser } from "@/data/user";
import { Session } from "next-auth";
import { STRIPE_PRODUCT_ID } from "@/lib/environment";
import { StripeSubscriptionStatus } from "@sourcebot/db";
import Stripe from "stripe";
const ajv = new Ajv({
validateFormats: false,
@ -103,6 +104,8 @@ export const createOrg = (name: string, domain: string, stripeCustomerId?: strin
name,
domain,
stripeCustomerId,
stripeSubscriptionStatus: StripeSubscriptionStatus.ACTIVE,
stripeLastUpdatedAt: new Date(),
members: {
create: {
role: "OWNER",

View file

@ -0,0 +1,104 @@
import { headers } from 'next/headers';
import { NextRequest } from 'next/server';
import Stripe from 'stripe';
import { prisma } from '@/prisma';
import { STRIPE_WEBHOOK_SECRET } from '@/lib/environment';
import { getStripe } from '@/lib/stripe';
import { ConnectionSyncStatus, StripeSubscriptionStatus } from '@sourcebot/db';
export async function POST(req: NextRequest) {
const body = await req.text();
const signature = headers().get('stripe-signature');
if (!signature) {
return new Response('No signature', { status: 400 });
}
try {
const stripe = getStripe();
const event = stripe.webhooks.constructEvent(
body,
signature,
STRIPE_WEBHOOK_SECRET!
);
if (event.type === 'customer.subscription.deleted') {
const subscription = event.data.object as Stripe.Subscription;
const customerId = subscription.customer as string;
const org = await prisma.org.findFirst({
where: {
stripeCustomerId: customerId
}
});
if (!org) {
return new Response('Org not found', { status: 404 });
}
await prisma.org.update({
where: {
id: org.id
},
data: {
stripeSubscriptionStatus: StripeSubscriptionStatus.INACTIVE,
stripeLastUpdatedAt: new Date()
}
});
console.log(`Org ${org.id} subscription status updated to INACTIVE`);
return new Response(JSON.stringify({ received: true }), {
status: 200
});
} else if (event.type === 'customer.subscription.created') {
const subscription = event.data.object as Stripe.Subscription;
const customerId = subscription.customer as string;
const org = await prisma.org.findFirst({
where: {
stripeCustomerId: customerId
}
});
if (!org) {
return new Response('Org not found', { status: 404 });
}
await prisma.org.update({
where: {
id: org.id
},
data: {
stripeSubscriptionStatus: StripeSubscriptionStatus.ACTIVE,
stripeLastUpdatedAt: new Date()
}
});
console.log(`Org ${org.id} subscription status updated to ACTIVE`);
// mark all of this org's connections for sync, since their repos may have been previously garbage collected
await prisma.connection.updateMany({
where: {
orgId: org.id
},
data: {
syncStatus: ConnectionSyncStatus.SYNC_NEEDED
}
});
return new Response(JSON.stringify({ received: true }), {
status: 200
});
} else {
console.log(`Received unknown event type: ${event.type}`);
return new Response(JSON.stringify({ received: true }), {
status: 202
});
}
} catch (err) {
console.error('Error processing webhook:', err);
return new Response(
'Webhook error: ' + (err as Error).message,
{ status: 400 }
);
}
}

View file

@ -16,3 +16,4 @@ export const AUTH_URL = getEnv(process.env.AUTH_URL)!;
export const STRIPE_SECRET_KEY = getEnv(process.env.STRIPE_SECRET_KEY);
export const STRIPE_PRODUCT_ID = getEnv(process.env.STRIPE_PRODUCT_ID);
export const STRIPE_WEBHOOK_SECRET = getEnv(process.env.STRIPE_WEBHOOK_SECRET);