From f8adf5974d6a98f30f817a9331d4a378b511698e Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 1 Apr 2025 14:51:02 -0700 Subject: [PATCH] Add back file watch for declarative config --- packages/web/src/initialize.ts | 217 +++++++++++++++++---------------- 1 file changed, 115 insertions(+), 102 deletions(-) diff --git a/packages/web/src/initialize.ts b/packages/web/src/initialize.ts index 5ca164c9..0d97404e 100644 --- a/packages/web/src/initialize.ts +++ b/packages/web/src/initialize.ts @@ -3,6 +3,7 @@ import { env } from './env.mjs'; import { prisma } from "@/prisma"; import { SINGLE_TENANT_USER_ID, SINGLE_TENANT_ORG_ID, SINGLE_TENANT_ORG_DOMAIN, SINGLE_TENANT_ORG_NAME, SINGLE_TENANT_USER_EMAIL } from './lib/constants'; import { readFile } from 'fs/promises'; +import { watch } from 'fs'; import stripJsonComments from 'strip-json-comments'; import { SourcebotConfig } from "@sourcebot/schemas/v3/index.type"; import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type'; @@ -21,6 +22,114 @@ const isRemotePath = (path: string) => { return path.startsWith('https://') || path.startsWith('http://'); } +const scheduleDeclarativeConfigSync = async (configPath: string) => { + const configContent = await (async () => { + if (isRemotePath(configPath)) { + const response = await fetch(configPath); + if (!response.ok) { + throw new Error(`Failed to fetch config file ${configPath}: ${response.statusText}`); + } + return response.text(); + } else { + return readFile(configPath, { + encoding: 'utf-8', + }); + } + })(); + + const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfig; + const isValidConfig = ajv.validate(indexSchema, config); + if (!isValidConfig) { + throw new Error(`Config file '${configPath}' is invalid: ${ajv.errorsText(ajv.errors)}`); + } + + if (config.connections) { + for (const [key, newConnectionConfig] of Object.entries(config.connections)) { + const currentConnection = await prisma.connection.findUnique({ + where: { + name_orgId: { + name: key, + orgId: SINGLE_TENANT_ORG_ID, + } + }, + include: { + repos: { + include: { + repo: true, + } + } + } + }); + + const currentConnectionConfig = currentConnection ? currentConnection.config as unknown as ConnectionConfig : undefined; + const syncNeededOnUpdate = + (currentConnectionConfig && JSON.stringify(currentConnectionConfig) !== JSON.stringify(newConnectionConfig)) || + (currentConnection?.syncStatus === ConnectionSyncStatus.FAILED); + + const connectionDb = await prisma.connection.upsert({ + where: { + name_orgId: { + name: key, + orgId: SINGLE_TENANT_ORG_ID, + } + }, + update: { + config: newConnectionConfig as unknown as Prisma.InputJsonValue, + syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined, + isDeclarative: true, + }, + create: { + name: key, + connectionType: newConnectionConfig.type, + config: newConnectionConfig as unknown as Prisma.InputJsonValue, + isDeclarative: true, + org: { + connect: { + id: SINGLE_TENANT_ORG_ID, + } + } + } + }); + + console.log(`Upserted connection with name '${key}'. Connection ID: ${connectionDb.id}`); + + // Re-try any repos that failed to index. + const failedRepos = currentConnection?.repos.filter(repo => repo.repo.repoIndexingStatus === RepoIndexingStatus.FAILED).map(repo => repo.repo.id) ?? []; + if (failedRepos.length > 0) { + await prisma.repo.updateMany({ + where: { + id: { + in: failedRepos, + } + }, + data: { + repoIndexingStatus: RepoIndexingStatus.NEW, + } + }) + } + } + + const deletedConnections = await prisma.connection.findMany({ + where: { + isDeclarative: true, + name: { + notIn: Object.keys(config.connections), + }, + orgId: SINGLE_TENANT_ORG_ID, + } + }); + + for (const connection of deletedConnections) { + console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`); + await prisma.connection.delete({ + where: { + id: connection.id, + } + }) + } + } +} + const initSingleTenancy = async () => { await prisma.org.upsert({ where: { @@ -77,110 +186,14 @@ const initSingleTenancy = async () => { // Load any connections defined declaratively in the config file. const configPath = env.CONFIG_PATH; if (configPath) { - const configContent = await (async () => { - if (isRemotePath(configPath)) { - const response = await fetch(configPath); - if (!response.ok) { - throw new Error(`Failed to fetch config file ${configPath}: ${response.statusText}`); - } - return response.text(); - } else { - return readFile(configPath, { - encoding: 'utf-8', - }); - } - })(); - - const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfig; - const isValidConfig = ajv.validate(indexSchema, config); - if (!isValidConfig) { - throw new Error(`Config file '${configPath}' is invalid: ${ajv.errorsText(ajv.errors)}`); - } + await scheduleDeclarativeConfigSync(configPath); - if (config.connections) { - for (const [key, newConnectionConfig] of Object.entries(config.connections)) { - const currentConnection = await prisma.connection.findUnique({ - where: { - name_orgId: { - name: key, - orgId: SINGLE_TENANT_ORG_ID, - } - }, - include: { - repos: { - include: { - repo: true, - } - } - } - }); - - const currentConnectionConfig = currentConnection ? currentConnection.config as unknown as ConnectionConfig : undefined; - const syncNeededOnUpdate = - (currentConnectionConfig && JSON.stringify(currentConnectionConfig) !== JSON.stringify(newConnectionConfig)) || - (currentConnection?.syncStatus === ConnectionSyncStatus.FAILED); - - const connectionDb = await prisma.connection.upsert({ - where: { - name_orgId: { - name: key, - orgId: SINGLE_TENANT_ORG_ID, - } - }, - update: { - config: newConnectionConfig as unknown as Prisma.InputJsonValue, - syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined, - isDeclarative: true, - }, - create: { - name: key, - connectionType: newConnectionConfig.type, - config: newConnectionConfig as unknown as Prisma.InputJsonValue, - isDeclarative: true, - org: { - connect: { - id: SINGLE_TENANT_ORG_ID, - } - } - } - }); - - console.log(`Upserted connection with name '${key}'. Connection ID: ${connectionDb.id}`); - - // Re-try any repos that failed to index. - const failedRepos = currentConnection?.repos.filter(repo => repo.repo.repoIndexingStatus === RepoIndexingStatus.FAILED).map(repo => repo.repo.id) ?? []; - if (failedRepos.length > 0) { - await prisma.repo.updateMany({ - where: { - id: { - in: failedRepos, - } - }, - data: { - repoIndexingStatus: RepoIndexingStatus.NEW, - } - }) - } - } - - const deletedConnections = await prisma.connection.findMany({ - where: { - isDeclarative: true, - name: { - notIn: Object.keys(config.connections), - }, - orgId: SINGLE_TENANT_ORG_ID, - } + // watch for changes assuming it is a local file + if (!isRemotePath(configPath)) { + watch(configPath, () => { + console.log(`Config file ${configPath} changed. Re-syncing...`); + scheduleDeclarativeConfigSync(configPath); }); - - for (const connection of deletedConnections) { - console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`); - await prisma.connection.delete({ - where: { - id: connection.id, - } - }) - } } } }