Multi tenancy support in config syncer (#171)

* [wip] initial mt support in config syncer

* Move logout button & profile picture into settings dropdown (#172)

* update sync status properly and fix bug with multiple config in db case

* make config path required in single tenant mode

NOTE: deleting config/repos is currently not supported in multi tenancy case. Support for this will be added in a future PR

---------

Co-authored-by: Brendan Kellam <bshizzle1234@gmail.com>
This commit is contained in:
Michael Sukkarieh 2025-01-21 11:50:35 -08:00 committed by GitHub
parent a5091fb900
commit 7c6adf17aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 243 additions and 53 deletions

View file

@ -6,16 +6,16 @@
"scripts": {
"build": "yarn workspaces run build",
"test": "yarn workspaces run test",
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=single npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=multi npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
"dev:web": "yarn workspace @sourcebot/web dev",
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"
},
"devDependencies": {
"cross-env": "^7.0.3",
"npm-run-all": "^4.1.5"
}
}

View file

@ -7,7 +7,7 @@ import { SourcebotConfigurationSchema } from "./schemas/v2.js";
import { AppContext } from "./types.js";
import { getTokenFromConfig, isRemotePath, marshalBool } from "./utils.js";
export const syncConfig = async (configPath: string, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
export const fetchConfigFromPath = async (configPath: string, signal: AbortSignal) => {
const configContent = await (async () => {
if (isRemotePath(configPath)) {
const response = await fetch(configPath, {
@ -25,9 +25,11 @@ export const syncConfig = async (configPath: string, db: PrismaClient, signal: A
}
})();
// @todo: we should validate the configuration file's structure here.
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfigurationSchema;
return config;
}
export const syncConfig = async (config: SourcebotConfigurationSchema, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
for (const repoConfig of config.repos ?? []) {
switch (repoConfig.type) {
case 'github': {

View file

@ -9,4 +9,5 @@ export const DEFAULT_SETTINGS: Settings = {
reindexIntervalMs: 1000 * 60,
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
indexConcurrencyMultiple: 3,
configSyncConcurrencyMultiple: 3,
}

View file

@ -1,6 +1,10 @@
import dotenv from 'dotenv';
export const getEnv = (env: string | undefined, defaultValue?: string) => {
export const getEnv = (env: string | undefined, defaultValue?: string, required?: boolean) => {
if (required && !env && !defaultValue) {
throw new Error(`Missing required environment variable`);
}
return env ?? defaultValue;
}
@ -15,6 +19,8 @@ dotenv.config({
path: './.env',
});
export const SOURCEBOT_TENANT_MODE = getEnv(process.env.SOURCEBOT_TENANT_MODE, undefined, true);
export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!;
export const SOURCEBOT_TELEMETRY_DISABLED = getEnvBoolean(process.env.SOURCEBOT_TELEMETRY_DISABLED, false)!;
export const SOURCEBOT_INSTALL_ID = getEnv(process.env.SOURCEBOT_INSTALL_ID, 'unknown')!;

View file

@ -6,6 +6,7 @@ import { isRemotePath } from "./utils.js";
import { AppContext } from "./types.js";
import { main } from "./main.js"
import { PrismaClient } from "@sourcebot/db";
import { SOURCEBOT_TENANT_MODE } from "./environment.js";
const parser = new ArgumentParser({
@ -19,7 +20,7 @@ type Arguments = {
parser.add_argument("--configPath", {
help: "Path to config file",
required: true,
required: SOURCEBOT_TENANT_MODE === "single",
});
parser.add_argument("--cacheDir", {
@ -28,8 +29,8 @@ parser.add_argument("--cacheDir", {
});
const args = parser.parse_args() as Arguments;
if (!isRemotePath(args.configPath) && !existsSync(args.configPath)) {
console.error(`Config file ${args.configPath} does not exist`);
if (SOURCEBOT_TENANT_MODE === "single" && !isRemotePath(args.configPath) && !existsSync(args.configPath)) {
console.error(`Config file ${args.configPath} does not exist, and is required in single tenant mode`);
process.exit(1);
}

View file

@ -1,6 +1,6 @@
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
import { ConfigSyncStatus, PrismaClient, Repo, Config, RepoIndexingStatus, Prisma } from '@sourcebot/db';
import { existsSync, watch } from 'fs';
import { syncConfig } from "./config.js";
import { fetchConfigFromPath, syncConfig } from "./config.js";
import { cloneRepository, fetchRepository } from "./git.js";
import { createLogger } from "./logger.js";
import { captureEvent } from "./posthog.js";
@ -11,6 +11,8 @@ import { DEFAULT_SETTINGS } from './constants.js';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
import * as os from 'os';
import { SOURCEBOT_TENANT_MODE } from './environment.js';
import { SourcebotConfigurationSchema } from './schemas/v2.js';
const logger = createLogger('main');
@ -56,6 +58,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
}
}
async function addConfigsToQueue(db: PrismaClient, queue: Queue, configs: Config[]) {
for (const config of configs) {
await db.$transaction(async (tx) => {
await tx.config.update({
where: { id: config.id },
data: { syncStatus: ConfigSyncStatus.IN_SYNC_QUEUE },
});
// Add the job to the queue
await queue.add('configSyncJob', config);
logger.info(`Added job to queue for config ${config.id}`);
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for config ${config.id}: ${err}`);
});
}
}
async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
for (const repo of repos) {
await db.$transaction(async (tx) => {
@ -67,7 +86,7 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
// Add the job to the queue
await queue.add('indexJob', repo);
logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err) => {
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
});
}
@ -76,47 +95,80 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
export const main = async (db: PrismaClient, context: AppContext) => {
let abortController = new AbortController();
let isSyncing = false;
const _syncConfig = async () => {
if (isSyncing) {
abortController.abort();
abortController = new AbortController();
const _syncConfig = async (dbConfig?: Config | undefined) => {
// Fetch config object and update syncing status
let config: SourcebotConfigurationSchema;
switch (SOURCEBOT_TENANT_MODE) {
case 'single':
logger.info(`Syncing configuration file ${context.configPath} ...`);
if (isSyncing) {
abortController.abort();
abortController = new AbortController();
}
config = await fetchConfigFromPath(context.configPath, abortController.signal);
isSyncing = true;
break;
case 'multi':
if(!dbConfig) {
throw new Error('config object is required in multi tenant mode');
}
config = dbConfig.data as SourcebotConfigurationSchema
db.config.update({
where: {
id: dbConfig.id,
},
data: {
syncStatus: ConfigSyncStatus.SYNCING,
}
})
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}
logger.info(`Syncing configuration file ${context.configPath} ...`);
isSyncing = true;
// Attempt to sync the config, handle failure cases
try {
const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context))
logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`);
const { durationMs } = await measure(() => syncConfig(config, db, abortController.signal, context))
logger.info(`Synced configuration in ${durationMs / 1000}s`);
isSyncing = false;
} catch (err: any) {
if (err.name === "AbortError") {
// @note: If we're aborting, we don't want to set isSyncing to false
// since it implies another sync is in progress.
} else {
isSyncing = false;
logger.error(`Failed to sync configuration file ${context.configPath} with error:`);
console.log(err);
switch(SOURCEBOT_TENANT_MODE) {
case 'single':
if (err.name === "AbortError") {
// @note: If we're aborting, we don't want to set isSyncing to false
// since it implies another sync is in progress.
} else {
isSyncing = false;
logger.error(`Failed to sync configuration file with error:`);
console.log(err);
}
break;
case 'multi':
if (dbConfig) {
await db.config.update({
where: {
id: dbConfig.id,
},
data: {
syncStatus: ConfigSyncStatus.FAILED,
}
})
logger.error(`Failed to sync configuration ${dbConfig.id} with error: ${err}`);
} else {
logger.error(`DB config undefined. Failed to sync configuration with error: ${err}`);
}
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}
}
}
// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}
// Re-sync at a fixed interval
setInterval(() => {
_syncConfig();
}, DEFAULT_SETTINGS.resyncIntervalMs);
// Sync immediately on startup
await _syncConfig();
/////////////////////////////
// Init Redis
/////////////////////////////
const redis = new Redis({
host: 'localhost',
port: 6379,
@ -124,18 +176,85 @@ export const main = async (db: PrismaClient, context: AppContext) => {
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err) => {
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
console.error(err);
process.exit(1);
});
/////////////////////////////
// Setup config sync watchers
/////////////////////////////
switch (SOURCEBOT_TENANT_MODE) {
case 'single':
// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}
// Re-sync at a fixed interval
setInterval(() => {
_syncConfig();
}, DEFAULT_SETTINGS.resyncIntervalMs);
// Sync immediately on startup
await _syncConfig();
break;
case 'multi':
// Setup config sync queue and workers
const configSyncQueue = new Queue('configSyncQueue');
const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.configSyncConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting config sync max concurrency to ${numWorkers}`);
const configSyncWorker = new Worker('configSyncQueue', async (job: Job) => {
const config = job.data as Config;
await _syncConfig(config);
}, { connection: redis, concurrency: numWorkers });
configSyncWorker.on('completed', async (job: Job) => {
logger.info(`Config sync job ${job.id} completed`);
const config = job.data as Config;
await db.config.update({
where: {
id: config.id,
},
data: {
syncStatus: ConfigSyncStatus.SYNCED,
}
})
});
configSyncWorker.on('failed', (job: Job | undefined, err: unknown) => {
logger.info(`Config sync job failed with error: ${err}`);
});
setInterval(async () => {
const configs = await db.config.findMany({
where: {
syncStatus: ConfigSyncStatus.SYNC_NEEDED,
}
});
logger.info(`Found ${configs.length} configs to sync...`);
addConfigsToQueue(db, configSyncQueue, configs);
}, 1000);
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}
/////////////////////////
// Setup repo indexing
/////////////////////////
const indexQueue = new Queue('indexQueue');
const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job) => {
logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job: Job) => {
const repo = job.data as Repo;
let indexDuration_s: number | undefined;
@ -166,10 +285,10 @@ export const main = async (db: PrismaClient, context: AppContext) => {
});
}, { connection: redis, concurrency: numWorkers });
worker.on('completed', (job) => {
worker.on('completed', (job: Job) => {
logger.info(`Job ${job.id} completed`);
});
worker.on('failed', async (job: Job | undefined, err) => {
worker.on('failed', async (job: Job | undefined, err: unknown) => {
logger.info(`Job failed with error: ${err}`);
if (job) {
await db.repo.update({
@ -183,6 +302,7 @@ export const main = async (db: PrismaClient, context: AppContext) => {
}
});
// Repo indexing loop
while (true) {
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
const repos = await db.repo.findMany({

View file

@ -78,6 +78,10 @@ export type Settings = {
* The multiple of the number of CPUs to use for indexing.
*/
indexConcurrencyMultiple: number;
/**
* The multiple of the number of CPUs to use for syncing the configuration.
*/
configSyncConcurrencyMultiple: number;
}
// @see : https://stackoverflow.com/a/61132308

View file

@ -0,0 +1,11 @@
-- CreateTable
CREATE TABLE "Config" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"data" JSONB NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL,
"syncedAt" DATETIME,
"syncStatus" TEXT NOT NULL DEFAULT 'SYNC_NEEDED',
"orgId" INTEGER NOT NULL,
CONSTRAINT "Config_orgId_fkey" FOREIGN KEY ("orgId") REFERENCES "Org" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);

View file

@ -18,6 +18,14 @@ enum RepoIndexingStatus {
FAILED
}
enum ConfigSyncStatus {
SYNC_NEEDED
IN_SYNC_QUEUE
SYNCING
SYNCED
FAILED
}
model Repo {
id Int @id @default(autoincrement())
name String
@ -42,12 +50,27 @@ model Repo {
@@unique([external_id, external_codeHostUrl])
}
model Config {
id Int @id @default(autoincrement())
data Json
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
syncedAt DateTime?
syncStatus ConfigSyncStatus @default(SYNC_NEEDED)
// The organization that owns this config
org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
orgId Int
}
model Org {
id Int @id @default(autoincrement())
name String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
members UserToOrg[]
configs Config[]
}
model UserToOrg {

View file

@ -2938,6 +2938,13 @@ cron-parser@^4.9.0:
dependencies:
luxon "^3.2.1"
cross-env@^7.0.3:
version "7.0.3"
resolved "https://registry.yarnpkg.com/cross-env/-/cross-env-7.0.3.tgz#865264b29677dc015ba8418918965dd232fc54cf"
integrity sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==
dependencies:
cross-spawn "^7.0.1"
cross-fetch@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-4.0.0.tgz#f037aef1580bb3a1a35164ea2a848ba81b445983"
@ -2956,7 +2963,7 @@ cross-spawn@^6.0.5:
shebang-command "^1.2.0"
which "^1.2.9"
cross-spawn@^7.0.0, cross-spawn@^7.0.2, cross-spawn@^7.0.3:
cross-spawn@^7.0.0, cross-spawn@^7.0.1, cross-spawn@^7.0.2, cross-spawn@^7.0.3:
version "7.0.6"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f"
integrity sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==
@ -5850,8 +5857,16 @@ string-argv@^0.3.1:
resolved "https://registry.yarnpkg.com/string-argv/-/string-argv-0.3.2.tgz#2b6d0ef24b656274d957d54e0a4bbf6153dc02b6"
integrity sha512-aqD2Q0144Z+/RqG52NeHEkZauTAUWJO8c6yTftGJKO3Tja5tUgIfmIl6kExvhtxSDP7fXB6DvzkfMpCd/F3G+Q==
"string-width-cjs@npm:string-width@^4.2.0", string-width@^4.1.0:
name string-width-cjs
"string-width-cjs@npm:string-width@^4.2.0":
version "4.2.3"
resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010"
integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==
dependencies:
emoji-regex "^8.0.0"
is-fullwidth-code-point "^3.0.0"
strip-ansi "^6.0.1"
string-width@^4.1.0:
version "4.2.3"
resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010"
integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==
@ -5948,7 +5963,14 @@ string_decoder@^1.1.1, string_decoder@^1.3.0:
dependencies:
safe-buffer "~5.2.0"
"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1:
"strip-ansi-cjs@npm:strip-ansi@^6.0.1":
version "6.0.1"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9"
integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==
dependencies:
ansi-regex "^5.0.1"
strip-ansi@^6.0.0, strip-ansi@^6.0.1:
version "6.0.1"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9"
integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==