mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 04:15:30 +00:00
move config syncing into backend
This commit is contained in:
parent
02ef3b8301
commit
df0ca07f84
16 changed files with 240 additions and 213 deletions
|
|
@ -40,6 +40,7 @@
|
||||||
"argparse": "^2.0.1",
|
"argparse": "^2.0.1",
|
||||||
"azure-devops-node-api": "^15.1.1",
|
"azure-devops-node-api": "^15.1.1",
|
||||||
"bullmq": "^5.34.10",
|
"bullmq": "^5.34.10",
|
||||||
|
"chokidar": "^4.0.3",
|
||||||
"cross-fetch": "^4.0.0",
|
"cross-fetch": "^4.0.0",
|
||||||
"dotenv": "^16.4.5",
|
"dotenv": "^16.4.5",
|
||||||
"express": "^4.21.2",
|
"express": "^4.21.2",
|
||||||
|
|
|
||||||
126
packages/backend/src/configManager.ts
Normal file
126
packages/backend/src/configManager.ts
Normal file
|
|
@ -0,0 +1,126 @@
|
||||||
|
import { Prisma, PrismaClient } from "@sourcebot/db";
|
||||||
|
import { createLogger } from "@sourcebot/logger";
|
||||||
|
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
|
||||||
|
import { loadConfig } from "@sourcebot/shared";
|
||||||
|
import chokidar, { FSWatcher } from 'chokidar';
|
||||||
|
import { ConnectionManager } from "./connectionManager.js";
|
||||||
|
import { SINGLE_TENANT_ORG_ID } from "./constants.js";
|
||||||
|
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
|
||||||
|
|
||||||
|
const logger = createLogger('config-manager');
|
||||||
|
|
||||||
|
export class ConfigManager {
|
||||||
|
private watcher: FSWatcher;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private db: PrismaClient,
|
||||||
|
private connectionManager: ConnectionManager,
|
||||||
|
configPath: string,
|
||||||
|
) {
|
||||||
|
this.watcher = chokidar.watch(configPath, {
|
||||||
|
ignoreInitial: true, // Don't fire events for existing files
|
||||||
|
awaitWriteFinish: {
|
||||||
|
stabilityThreshold: 100, // File size stable for 100ms
|
||||||
|
pollInterval: 100 // Check every 100ms
|
||||||
|
},
|
||||||
|
atomic: true // Handle atomic writes (temp file + rename)
|
||||||
|
});
|
||||||
|
|
||||||
|
this.watcher.on('change', async () => {
|
||||||
|
logger.info(`Config file ${configPath} changed. Syncing config.`);
|
||||||
|
try {
|
||||||
|
await this.syncConfig(configPath);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to sync config: ${error}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.syncConfig(configPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private syncConfig = async (configPath: string) => {
|
||||||
|
const config = await loadConfig(configPath);
|
||||||
|
|
||||||
|
await this.syncConnections(config.connections);
|
||||||
|
await syncSearchContexts({
|
||||||
|
contexts: config.contexts,
|
||||||
|
orgId: SINGLE_TENANT_ORG_ID,
|
||||||
|
db: this.db,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private syncConnections = async (connections?: { [key: string]: ConnectionConfig }) => {
|
||||||
|
if (connections) {
|
||||||
|
for (const [key, newConnectionConfig] of Object.entries(connections)) {
|
||||||
|
const existingConnection = await this.db.connection.findUnique({
|
||||||
|
where: {
|
||||||
|
name_orgId: {
|
||||||
|
name: key,
|
||||||
|
orgId: SINGLE_TENANT_ORG_ID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
const existingConnectionConfig = existingConnection ? existingConnection.config as unknown as ConnectionConfig : undefined;
|
||||||
|
const connectionNeedsSyncing =
|
||||||
|
!existingConnection ||
|
||||||
|
(JSON.stringify(existingConnectionConfig) !== JSON.stringify(newConnectionConfig));
|
||||||
|
|
||||||
|
// Either update the existing connection or create a new one.
|
||||||
|
const connection = existingConnection ?
|
||||||
|
await this.db.connection.update({
|
||||||
|
where: {
|
||||||
|
id: existingConnection.id,
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
|
||||||
|
isDeclarative: true,
|
||||||
|
}
|
||||||
|
}) :
|
||||||
|
await this.db.connection.create({
|
||||||
|
data: {
|
||||||
|
name: key,
|
||||||
|
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
|
||||||
|
connectionType: newConnectionConfig.type,
|
||||||
|
isDeclarative: true,
|
||||||
|
org: {
|
||||||
|
connect: {
|
||||||
|
id: SINGLE_TENANT_ORG_ID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (connectionNeedsSyncing) {
|
||||||
|
const [jobId] = await this.connectionManager.createJobs([connection]);
|
||||||
|
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete any connections that are no longer in the config.
|
||||||
|
const deletedConnections = await this.db.connection.findMany({
|
||||||
|
where: {
|
||||||
|
isDeclarative: true,
|
||||||
|
name: {
|
||||||
|
notIn: Object.keys(connections ?? {}),
|
||||||
|
},
|
||||||
|
orgId: SINGLE_TENANT_ORG_ID,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const connection of deletedConnections) {
|
||||||
|
logger.info(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
|
||||||
|
await this.db.connection.delete({
|
||||||
|
where: {
|
||||||
|
id: connection.id,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public dispose = async () => {
|
||||||
|
await this.watcher.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -2,13 +2,15 @@ import * as Sentry from "@sentry/node";
|
||||||
import { Connection, ConnectionSyncJobStatus, PrismaClient } from "@sourcebot/db";
|
import { Connection, ConnectionSyncJobStatus, PrismaClient } from "@sourcebot/db";
|
||||||
import { createLogger } from "@sourcebot/logger";
|
import { createLogger } from "@sourcebot/logger";
|
||||||
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
|
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
|
||||||
import { loadConfig, syncSearchContexts } from "@sourcebot/shared";
|
import { loadConfig } from "@sourcebot/shared";
|
||||||
import { Job, Queue, ReservedJob, Worker } from "groupmq";
|
import { Job, Queue, ReservedJob, Worker } from "groupmq";
|
||||||
import { Redis } from 'ioredis';
|
import { Redis } from 'ioredis';
|
||||||
import { env } from "./env.js";
|
import { env } from "./env.js";
|
||||||
import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js";
|
import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js";
|
||||||
import { Settings } from "./types.js";
|
import { Settings } from "./types.js";
|
||||||
import { groupmqLifecycleExceptionWrapper } from "./utils.js";
|
import { groupmqLifecycleExceptionWrapper } from "./utils.js";
|
||||||
|
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
|
||||||
|
import { captureEvent } from "./posthog.js";
|
||||||
|
|
||||||
const LOG_TAG = 'connection-manager';
|
const LOG_TAG = 'connection-manager';
|
||||||
const logger = createLogger(LOG_TAG);
|
const logger = createLogger(LOG_TAG);
|
||||||
|
|
@ -136,6 +138,8 @@ export class ConnectionManager {
|
||||||
jobId: job.id,
|
jobId: job.id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return jobs.map(job => job.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async runJob(job: ReservedJob<JobPayload>): Promise<JobResult> {
|
private async runJob(job: ReservedJob<JobPayload>): Promise<JobResult> {
|
||||||
|
|
@ -297,10 +301,11 @@ export class ConnectionManager {
|
||||||
|
|
||||||
logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`);
|
logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`);
|
||||||
|
|
||||||
// captureEvent('backend_connection_sync_job_completed', {
|
const result = job.returnvalue as JobResult;
|
||||||
// connectionId: connectionId,
|
captureEvent('backend_connection_sync_job_completed', {
|
||||||
// repoCount: result.repoCount,
|
connectionId: connectionId,
|
||||||
// });
|
repoCount: result.repoCount,
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
private onJobFailed = async (job: Job<JobPayload>) =>
|
private onJobFailed = async (job: Job<JobPayload>) =>
|
||||||
|
|
@ -332,10 +337,10 @@ export class ConnectionManager {
|
||||||
logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
|
logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// captureEvent('backend_connection_sync_job_failed', {
|
captureEvent('backend_connection_sync_job_failed', {
|
||||||
// connectionId: connectionId,
|
connectionId: job.data.connectionId,
|
||||||
// error: err instanceof BackendException ? err.code : 'UNKNOWN',
|
error: job.failedReason,
|
||||||
// });
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
private onJobStalled = async (jobId: string) =>
|
private onJobStalled = async (jobId: string) =>
|
||||||
|
|
@ -354,6 +359,11 @@ export class ConnectionManager {
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`);
|
logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`);
|
||||||
|
|
||||||
|
captureEvent('backend_connection_sync_job_failed', {
|
||||||
|
connectionId: connection.id,
|
||||||
|
error: 'Job stalled',
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
private async onWorkerError(error: Error) {
|
private async onWorkerError(error: Error) {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
import { env } from "./env.js";
|
import { env } from "./env.js";
|
||||||
import path from "path";
|
import path from "path";
|
||||||
|
|
||||||
|
export const SINGLE_TENANT_ORG_ID = 1;
|
||||||
|
|
||||||
export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES = [
|
export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES = [
|
||||||
'github',
|
'github',
|
||||||
];
|
];
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,7 @@
|
||||||
import micromatch from "micromatch";
|
import micromatch from "micromatch";
|
||||||
import { createLogger } from "@sourcebot/logger";
|
import { createLogger } from "@sourcebot/logger";
|
||||||
import { PrismaClient } from "@sourcebot/db";
|
import { PrismaClient } from "@sourcebot/db";
|
||||||
import { getPlan, hasEntitlement } from "../entitlements.js";
|
import { getPlan, hasEntitlement, SOURCEBOT_SUPPORT_EMAIL } from "@sourcebot/shared";
|
||||||
import { SOURCEBOT_SUPPORT_EMAIL } from "../constants.js";
|
|
||||||
import { SearchContext } from "@sourcebot/schemas/v3/index.type";
|
import { SearchContext } from "@sourcebot/schemas/v3/index.type";
|
||||||
|
|
||||||
const logger = createLogger('sync-search-contexts');
|
const logger = createLogger('sync-search-contexts');
|
||||||
|
|
@ -47,7 +47,7 @@ export const env = createEnv({
|
||||||
DEBUG_ENABLE_GROUPMQ_LOGGING: booleanSchema.default('false'),
|
DEBUG_ENABLE_GROUPMQ_LOGGING: booleanSchema.default('false'),
|
||||||
|
|
||||||
DATABASE_URL: z.string().url().default("postgresql://postgres:postgres@localhost:5432/postgres"),
|
DATABASE_URL: z.string().url().default("postgresql://postgres:postgres@localhost:5432/postgres"),
|
||||||
CONFIG_PATH: z.string().optional(),
|
CONFIG_PATH: z.string(),
|
||||||
|
|
||||||
CONNECTION_MANAGER_UPSERT_TIMEOUT_MS: numberSchema.default(300000),
|
CONNECTION_MANAGER_UPSERT_TIMEOUT_MS: numberSchema.default(300000),
|
||||||
REPO_SYNC_RETRY_BASE_SLEEP_SECONDS: numberSchema.default(60),
|
REPO_SYNC_RETRY_BASE_SLEEP_SECONDS: numberSchema.default(60),
|
||||||
|
|
@ -56,6 +56,8 @@ export const env = createEnv({
|
||||||
|
|
||||||
EXPERIMENT_EE_PERMISSION_SYNC_ENABLED: booleanSchema.default('false'),
|
EXPERIMENT_EE_PERMISSION_SYNC_ENABLED: booleanSchema.default('false'),
|
||||||
AUTH_EE_GITHUB_BASE_URL: z.string().optional(),
|
AUTH_EE_GITHUB_BASE_URL: z.string().optional(),
|
||||||
|
|
||||||
|
FORCE_ENABLE_ANONYMOUS_ACCESS: booleanSchema.default('false'),
|
||||||
},
|
},
|
||||||
runtimeEnv: process.env,
|
runtimeEnv: process.env,
|
||||||
emptyStringAsUndefined: true,
|
emptyStringAsUndefined: true,
|
||||||
|
|
|
||||||
|
|
@ -257,7 +257,7 @@ const getReposOwnedByUsers = async (users: string[], octokit: Octokit, signal: A
|
||||||
const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSignal, url?: string) => {
|
const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSignal, url?: string) => {
|
||||||
const results = await Promise.allSettled(orgs.map(async (org) => {
|
const results = await Promise.allSettled(orgs.map(async (org) => {
|
||||||
try {
|
try {
|
||||||
logger.info(`Fetching repository info for org ${org}...`);
|
logger.debug(`Fetching repository info for org ${org}...`);
|
||||||
|
|
||||||
const octokitToUse = await getOctokitWithGithubApp(octokit, org, url, `org ${org}`);
|
const octokitToUse = await getOctokitWithGithubApp(octokit, org, url, `org ${org}`);
|
||||||
const { durationMs, data } = await measure(async () => {
|
const { durationMs, data } = await measure(async () => {
|
||||||
|
|
@ -272,7 +272,7 @@ const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSi
|
||||||
return fetchWithRetry(fetchFn, `org ${org}`, logger);
|
return fetchWithRetry(fetchFn, `org ${org}`, logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.info(`Found ${data.length} in org ${org} in ${durationMs}ms.`);
|
logger.debug(`Found ${data.length} in org ${org} in ${durationMs}ms.`);
|
||||||
return {
|
return {
|
||||||
type: 'valid' as const,
|
type: 'valid' as const,
|
||||||
data
|
data
|
||||||
|
|
@ -306,7 +306,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna
|
||||||
const results = await Promise.allSettled(repoList.map(async (repo) => {
|
const results = await Promise.allSettled(repoList.map(async (repo) => {
|
||||||
try {
|
try {
|
||||||
const [owner, repoName] = repo.split('/');
|
const [owner, repoName] = repo.split('/');
|
||||||
logger.info(`Fetching repository info for ${repo}...`);
|
logger.debug(`Fetching repository info for ${repo}...`);
|
||||||
|
|
||||||
const octokitToUse = await getOctokitWithGithubApp(octokit, owner, url, `repo ${repo}`);
|
const octokitToUse = await getOctokitWithGithubApp(octokit, owner, url, `repo ${repo}`);
|
||||||
const { durationMs, data: result } = await measure(async () => {
|
const { durationMs, data: result } = await measure(async () => {
|
||||||
|
|
@ -321,7 +321,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna
|
||||||
return fetchWithRetry(fetchFn, repo, logger);
|
return fetchWithRetry(fetchFn, repo, logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.info(`Found info for repository ${repo} in ${durationMs}ms`);
|
logger.debug(`Found info for repository ${repo} in ${durationMs}ms`);
|
||||||
return {
|
return {
|
||||||
type: 'valid' as const,
|
type: 'valid' as const,
|
||||||
data: [result.data]
|
data: [result.data]
|
||||||
|
|
|
||||||
|
|
@ -6,14 +6,15 @@ import { getConfigSettings, hasEntitlement } from '@sourcebot/shared';
|
||||||
import { existsSync } from 'fs';
|
import { existsSync } from 'fs';
|
||||||
import { mkdir } from 'fs/promises';
|
import { mkdir } from 'fs/promises';
|
||||||
import { Redis } from 'ioredis';
|
import { Redis } from 'ioredis';
|
||||||
|
import { ConfigManager } from "./configManager.js";
|
||||||
import { ConnectionManager } from './connectionManager.js';
|
import { ConnectionManager } from './connectionManager.js';
|
||||||
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
|
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
|
||||||
|
import { GithubAppManager } from "./ee/githubAppManager.js";
|
||||||
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
|
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
|
||||||
import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js";
|
import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js";
|
||||||
import { GithubAppManager } from "./ee/githubAppManager.js";
|
|
||||||
import { env } from "./env.js";
|
import { env } from "./env.js";
|
||||||
import { RepoIndexManager } from "./repoIndexManager.js";
|
|
||||||
import { PromClient } from './promClient.js';
|
import { PromClient } from './promClient.js';
|
||||||
|
import { RepoIndexManager } from "./repoIndexManager.js";
|
||||||
|
|
||||||
|
|
||||||
const logger = createLogger('backend-entrypoint');
|
const logger = createLogger('backend-entrypoint');
|
||||||
|
|
@ -53,6 +54,7 @@ const connectionManager = new ConnectionManager(prisma, settings, redis);
|
||||||
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
|
||||||
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
|
||||||
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);
|
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);
|
||||||
|
const configManager = new ConfigManager(prisma, connectionManager, env.CONFIG_PATH);
|
||||||
|
|
||||||
connectionManager.startScheduler();
|
connectionManager.startScheduler();
|
||||||
repoIndexManager.startScheduler();
|
repoIndexManager.startScheduler();
|
||||||
|
|
@ -66,6 +68,8 @@ else if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && hasEntitlement(
|
||||||
userPermissionSyncer.startScheduler();
|
userPermissionSyncer.startScheduler();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info('Worker started.');
|
||||||
|
|
||||||
const cleanup = async (signal: string) => {
|
const cleanup = async (signal: string) => {
|
||||||
logger.info(`Received ${signal}, cleaning up...`);
|
logger.info(`Received ${signal}, cleaning up...`);
|
||||||
|
|
||||||
|
|
@ -79,6 +83,7 @@ const cleanup = async (signal: string) => {
|
||||||
repoPermissionSyncer.dispose(),
|
repoPermissionSyncer.dispose(),
|
||||||
userPermissionSyncer.dispose(),
|
userPermissionSyncer.dispose(),
|
||||||
promClient.dispose(),
|
promClient.dispose(),
|
||||||
|
configManager.dispose(),
|
||||||
]),
|
]),
|
||||||
new Promise((_, reject) =>
|
new Promise((_, reject) =>
|
||||||
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
|
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*
|
||||||
|
Warnings:
|
||||||
|
|
||||||
|
- You are about to drop the column `syncStatus` on the `Connection` table. All the data in the column will be lost.
|
||||||
|
- You are about to drop the column `syncStatusMetadata` on the `Connection` table. All the data in the column will be lost.
|
||||||
|
|
||||||
|
*/
|
||||||
|
-- CreateEnum
|
||||||
|
CREATE TYPE "ConnectionSyncJobStatus" AS ENUM ('PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED');
|
||||||
|
|
||||||
|
-- AlterTable
|
||||||
|
ALTER TABLE "Connection" DROP COLUMN "syncStatus",
|
||||||
|
DROP COLUMN "syncStatusMetadata";
|
||||||
|
|
||||||
|
-- CreateTable
|
||||||
|
CREATE TABLE "ConnectionSyncJob" (
|
||||||
|
"id" TEXT NOT NULL,
|
||||||
|
"status" "ConnectionSyncJobStatus" NOT NULL DEFAULT 'PENDING',
|
||||||
|
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||||
|
"completedAt" TIMESTAMP(3),
|
||||||
|
"warningMessages" TEXT[],
|
||||||
|
"errorMessage" TEXT,
|
||||||
|
"connectionId" INTEGER NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT "ConnectionSyncJob_pkey" PRIMARY KEY ("id")
|
||||||
|
);
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "ConnectionSyncJob" ADD CONSTRAINT "ConnectionSyncJob_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "Connection"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
-- Ensure single tenant organization exists
|
||||||
|
INSERT INTO "Org" (id, name, domain, "inviteLinkId", "createdAt", "updatedAt")
|
||||||
|
VALUES (1, 'default', '~', gen_random_uuid(), NOW(), NOW())
|
||||||
|
ON CONFLICT (id) DO NOTHING;
|
||||||
|
|
||||||
|
-- Backfill inviteLinkId for any existing orgs that don't have one
|
||||||
|
UPDATE "Org"
|
||||||
|
SET "inviteLinkId" = gen_random_uuid()
|
||||||
|
WHERE "inviteLinkId" IS NULL;
|
||||||
|
|
@ -134,11 +134,6 @@ model Connection {
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
repos RepoToConnection[]
|
repos RepoToConnection[]
|
||||||
|
|
||||||
/// @deprecated
|
|
||||||
syncStatus ConnectionSyncStatus @default(SYNC_NEEDED)
|
|
||||||
/// @deprecated
|
|
||||||
syncStatusMetadata Json?
|
|
||||||
|
|
||||||
// The type of connection (e.g., github, gitlab, etc.)
|
// The type of connection (e.g., github, gitlab, etc.)
|
||||||
connectionType String
|
connectionType String
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,4 @@ export {
|
||||||
isRemotePath,
|
isRemotePath,
|
||||||
getConfigSettings,
|
getConfigSettings,
|
||||||
} from "./utils.js";
|
} from "./utils.js";
|
||||||
export {
|
|
||||||
syncSearchContexts,
|
|
||||||
} from "./ee/syncSearchContexts.js";
|
|
||||||
export * from "./constants.js";
|
export * from "./constants.js";
|
||||||
|
|
@ -113,7 +113,6 @@
|
||||||
"ai": "^5.0.45",
|
"ai": "^5.0.45",
|
||||||
"ajv": "^8.17.1",
|
"ajv": "^8.17.1",
|
||||||
"bcryptjs": "^3.0.2",
|
"bcryptjs": "^3.0.2",
|
||||||
"chokidar": "^4.0.3",
|
|
||||||
"class-variance-authority": "^0.7.0",
|
"class-variance-authority": "^0.7.0",
|
||||||
"client-only": "^0.0.1",
|
"client-only": "^0.0.1",
|
||||||
"clsx": "^2.1.1",
|
"clsx": "^2.1.1",
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ import { headers } from 'next/headers';
|
||||||
import { NextRequest } from 'next/server';
|
import { NextRequest } from 'next/server';
|
||||||
import Stripe from 'stripe';
|
import Stripe from 'stripe';
|
||||||
import { prisma } from '@/prisma';
|
import { prisma } from '@/prisma';
|
||||||
import { ConnectionSyncStatus, StripeSubscriptionStatus } from '@sourcebot/db';
|
import { StripeSubscriptionStatus } from '@sourcebot/db';
|
||||||
import { stripeClient } from '@/ee/features/billing/stripe';
|
import { stripeClient } from '@/ee/features/billing/stripe';
|
||||||
import { env } from '@/env.mjs';
|
import { env } from '@/env.mjs';
|
||||||
import { createLogger } from "@sourcebot/logger";
|
import { createLogger } from "@sourcebot/logger";
|
||||||
|
|
@ -85,16 +85,6 @@ export async function POST(req: NextRequest) {
|
||||||
});
|
});
|
||||||
logger.info(`Org ${org.id} subscription status updated to ACTIVE`);
|
logger.info(`Org ${org.id} subscription status updated to ACTIVE`);
|
||||||
|
|
||||||
// mark all of this org's connections for sync, since their repos may have been previously garbage collected
|
|
||||||
await prisma.connection.updateMany({
|
|
||||||
where: {
|
|
||||||
orgId: org.id
|
|
||||||
},
|
|
||||||
data: {
|
|
||||||
syncStatus: ConnectionSyncStatus.SYNC_NEEDED
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return new Response(JSON.stringify({ received: true }), {
|
return new Response(JSON.stringify({ received: true }), {
|
||||||
status: 200
|
status: 200
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -1,129 +1,17 @@
|
||||||
import { ConnectionSyncStatus, OrgRole, Prisma } from '@sourcebot/db';
|
|
||||||
import { env } from './env.mjs';
|
|
||||||
import { prisma } from "@/prisma";
|
|
||||||
import { SINGLE_TENANT_ORG_ID, SINGLE_TENANT_ORG_DOMAIN, SOURCEBOT_GUEST_USER_ID, SINGLE_TENANT_ORG_NAME } from './lib/constants';
|
|
||||||
import chokidar from 'chokidar';
|
|
||||||
import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
|
|
||||||
import { hasEntitlement, loadConfig, isRemotePath, syncSearchContexts } from '@sourcebot/shared';
|
|
||||||
import { isServiceError, getOrgMetadata } from './lib/utils';
|
|
||||||
import { ServiceErrorException } from './lib/serviceError';
|
|
||||||
import { SOURCEBOT_SUPPORT_EMAIL } from "@/lib/constants";
|
|
||||||
import { createLogger } from "@sourcebot/logger";
|
|
||||||
import { createGuestUser } from '@/lib/authUtils';
|
import { createGuestUser } from '@/lib/authUtils';
|
||||||
|
import { SOURCEBOT_SUPPORT_EMAIL } from "@/lib/constants";
|
||||||
|
import { prisma } from "@/prisma";
|
||||||
|
import { OrgRole } from '@sourcebot/db';
|
||||||
|
import { createLogger } from "@sourcebot/logger";
|
||||||
|
import { hasEntitlement, loadConfig } from '@sourcebot/shared';
|
||||||
import { getOrgFromDomain } from './data/org';
|
import { getOrgFromDomain } from './data/org';
|
||||||
|
import { env } from './env.mjs';
|
||||||
|
import { SINGLE_TENANT_ORG_DOMAIN, SINGLE_TENANT_ORG_ID, SOURCEBOT_GUEST_USER_ID } from './lib/constants';
|
||||||
|
import { ServiceErrorException } from './lib/serviceError';
|
||||||
|
import { getOrgMetadata, isServiceError } from './lib/utils';
|
||||||
|
|
||||||
const logger = createLogger('web-initialize');
|
const logger = createLogger('web-initialize');
|
||||||
|
|
||||||
const syncConnections = async (connections?: { [key: string]: ConnectionConfig }) => {
|
|
||||||
if (connections) {
|
|
||||||
for (const [key, newConnectionConfig] of Object.entries(connections)) {
|
|
||||||
const currentConnection = await prisma.connection.findUnique({
|
|
||||||
where: {
|
|
||||||
name_orgId: {
|
|
||||||
name: key,
|
|
||||||
orgId: SINGLE_TENANT_ORG_ID,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
include: {
|
|
||||||
repos: {
|
|
||||||
include: {
|
|
||||||
repo: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
const currentConnectionConfig = currentConnection ? currentConnection.config as unknown as ConnectionConfig : undefined;
|
|
||||||
const syncNeededOnUpdate =
|
|
||||||
(currentConnectionConfig && JSON.stringify(currentConnectionConfig) !== JSON.stringify(newConnectionConfig)) ||
|
|
||||||
(currentConnection?.syncStatus === ConnectionSyncStatus.FAILED);
|
|
||||||
|
|
||||||
const connectionDb = await prisma.connection.upsert({
|
|
||||||
where: {
|
|
||||||
name_orgId: {
|
|
||||||
name: key,
|
|
||||||
orgId: SINGLE_TENANT_ORG_ID,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
update: {
|
|
||||||
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
|
|
||||||
syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined,
|
|
||||||
isDeclarative: true,
|
|
||||||
},
|
|
||||||
create: {
|
|
||||||
name: key,
|
|
||||||
connectionType: newConnectionConfig.type,
|
|
||||||
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
|
|
||||||
isDeclarative: true,
|
|
||||||
org: {
|
|
||||||
connect: {
|
|
||||||
id: SINGLE_TENANT_ORG_ID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info(`Upserted connection with name '${key}'. Connection ID: ${connectionDb.id}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete any connections that are no longer in the config.
|
|
||||||
const deletedConnections = await prisma.connection.findMany({
|
|
||||||
where: {
|
|
||||||
isDeclarative: true,
|
|
||||||
name: {
|
|
||||||
notIn: Object.keys(connections ?? {}),
|
|
||||||
},
|
|
||||||
orgId: SINGLE_TENANT_ORG_ID,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (const connection of deletedConnections) {
|
|
||||||
logger.info(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
|
|
||||||
await prisma.connection.delete({
|
|
||||||
where: {
|
|
||||||
id: connection.id,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const syncDeclarativeConfig = async (configPath: string) => {
|
|
||||||
const config = await loadConfig(configPath);
|
|
||||||
|
|
||||||
const forceEnableAnonymousAccess = config.settings?.enablePublicAccess ?? env.FORCE_ENABLE_ANONYMOUS_ACCESS === 'true';
|
|
||||||
if (forceEnableAnonymousAccess) {
|
|
||||||
const hasAnonymousAccessEntitlement = hasEntitlement("anonymous-access");
|
|
||||||
if (!hasAnonymousAccessEntitlement) {
|
|
||||||
logger.warn(`FORCE_ENABLE_ANONYMOUS_ACCESS env var is set to true but anonymous access entitlement is not available. Setting will be ignored.`);
|
|
||||||
} else {
|
|
||||||
const org = await getOrgFromDomain(SINGLE_TENANT_ORG_DOMAIN);
|
|
||||||
if (org) {
|
|
||||||
const currentMetadata = getOrgMetadata(org);
|
|
||||||
const mergedMetadata = {
|
|
||||||
...(currentMetadata ?? {}),
|
|
||||||
anonymousAccessEnabled: true,
|
|
||||||
};
|
|
||||||
|
|
||||||
await prisma.org.update({
|
|
||||||
where: { id: org.id },
|
|
||||||
data: {
|
|
||||||
metadata: mergedMetadata,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
logger.info(`Anonymous access enabled via FORCE_ENABLE_ANONYMOUS_ACCESS environment variable`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await syncConnections(config.connections);
|
|
||||||
await syncSearchContexts({
|
|
||||||
contexts: config.contexts,
|
|
||||||
orgId: SINGLE_TENANT_ORG_ID,
|
|
||||||
db: prisma,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const pruneOldGuestUser = async () => {
|
const pruneOldGuestUser = async () => {
|
||||||
// The old guest user doesn't have the GUEST role
|
// The old guest user doesn't have the GUEST role
|
||||||
const guestUser = await prisma.userToOrg.findUnique({
|
const guestUser = await prisma.userToOrg.findUnique({
|
||||||
|
|
@ -150,35 +38,6 @@ const pruneOldGuestUser = async () => {
|
||||||
}
|
}
|
||||||
|
|
||||||
const initSingleTenancy = async () => {
|
const initSingleTenancy = async () => {
|
||||||
// Back fill the inviteId if the org has already been created to prevent needing to wipe the db
|
|
||||||
await prisma.$transaction(async (tx) => {
|
|
||||||
const org = await tx.org.findUnique({
|
|
||||||
where: {
|
|
||||||
id: SINGLE_TENANT_ORG_ID,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!org) {
|
|
||||||
await tx.org.create({
|
|
||||||
data: {
|
|
||||||
id: SINGLE_TENANT_ORG_ID,
|
|
||||||
name: SINGLE_TENANT_ORG_NAME,
|
|
||||||
domain: SINGLE_TENANT_ORG_DOMAIN,
|
|
||||||
inviteLinkId: crypto.randomUUID(),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else if (!org.inviteLinkId) {
|
|
||||||
await tx.org.update({
|
|
||||||
where: {
|
|
||||||
id: SINGLE_TENANT_ORG_ID,
|
|
||||||
},
|
|
||||||
data: {
|
|
||||||
inviteLinkId: crypto.randomUUID(),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// This is needed because v4 introduces the GUEST org role as well as making authentication required.
|
// This is needed because v4 introduces the GUEST org role as well as making authentication required.
|
||||||
// To keep things simple, we'll just delete the old guest user if it exists in the DB
|
// To keep things simple, we'll just delete the old guest user if it exists in the DB
|
||||||
await pruneOldGuestUser();
|
await pruneOldGuestUser();
|
||||||
|
|
@ -205,30 +64,32 @@ const initSingleTenancy = async () => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load any connections defined declaratively in the config file.
|
// Sync anonymous access config from the config file
|
||||||
const configPath = env.CONFIG_PATH;
|
if (env.CONFIG_PATH) {
|
||||||
if (configPath) {
|
const config = await loadConfig(env.CONFIG_PATH);
|
||||||
await syncDeclarativeConfig(configPath);
|
const forceEnableAnonymousAccess = config.settings?.enablePublicAccess ?? env.FORCE_ENABLE_ANONYMOUS_ACCESS === 'true';
|
||||||
|
|
||||||
// watch for changes assuming it is a local file
|
if (forceEnableAnonymousAccess) {
|
||||||
if (!isRemotePath(configPath)) {
|
if (!hasAnonymousAccessEntitlement) {
|
||||||
const watcher = chokidar.watch(configPath, {
|
logger.warn(`FORCE_ENABLE_ANONYMOUS_ACCESS env var is set to true but anonymous access entitlement is not available. Setting will be ignored.`);
|
||||||
ignoreInitial: true, // Don't fire events for existing files
|
} else {
|
||||||
awaitWriteFinish: {
|
const org = await getOrgFromDomain(SINGLE_TENANT_ORG_DOMAIN);
|
||||||
stabilityThreshold: 100, // File size stable for 100ms
|
if (org) {
|
||||||
pollInterval: 100 // Check every 100ms
|
const currentMetadata = getOrgMetadata(org);
|
||||||
|
const mergedMetadata = {
|
||||||
|
...(currentMetadata ?? {}),
|
||||||
|
anonymousAccessEnabled: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
await prisma.org.update({
|
||||||
|
where: { id: org.id },
|
||||||
|
data: {
|
||||||
|
metadata: mergedMetadata,
|
||||||
},
|
},
|
||||||
atomic: true // Handle atomic writes (temp file + rename)
|
|
||||||
});
|
});
|
||||||
|
logger.info(`Anonymous access enabled via FORCE_ENABLE_ANONYMOUS_ACCESS environment variable`);
|
||||||
watcher.on('change', async () => {
|
}
|
||||||
logger.info(`Config file ${configPath} changed. Re-syncing...`);
|
|
||||||
try {
|
|
||||||
await syncDeclarativeConfig(configPath);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(`Failed to sync config: ${error}`);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7796,6 +7796,7 @@ __metadata:
|
||||||
argparse: "npm:^2.0.1"
|
argparse: "npm:^2.0.1"
|
||||||
azure-devops-node-api: "npm:^15.1.1"
|
azure-devops-node-api: "npm:^15.1.1"
|
||||||
bullmq: "npm:^5.34.10"
|
bullmq: "npm:^5.34.10"
|
||||||
|
chokidar: "npm:^4.0.3"
|
||||||
cross-env: "npm:^7.0.3"
|
cross-env: "npm:^7.0.3"
|
||||||
cross-fetch: "npm:^4.0.0"
|
cross-fetch: "npm:^4.0.0"
|
||||||
dotenv: "npm:^16.4.5"
|
dotenv: "npm:^16.4.5"
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue