Add back file watch for declarative config

This commit is contained in:
bkellam 2025-04-01 14:51:02 -07:00
parent 1080e6a5c8
commit f8adf5974d

View file

@ -3,6 +3,7 @@ import { env } from './env.mjs';
import { prisma } from "@/prisma";
import { SINGLE_TENANT_USER_ID, SINGLE_TENANT_ORG_ID, SINGLE_TENANT_ORG_DOMAIN, SINGLE_TENANT_ORG_NAME, SINGLE_TENANT_USER_EMAIL } from './lib/constants';
import { readFile } from 'fs/promises';
import { watch } from 'fs';
import stripJsonComments from 'strip-json-comments';
import { SourcebotConfig } from "@sourcebot/schemas/v3/index.type";
import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
@ -21,6 +22,114 @@ const isRemotePath = (path: string) => {
return path.startsWith('https://') || path.startsWith('http://');
}
const scheduleDeclarativeConfigSync = async (configPath: string) => {
const configContent = await (async () => {
if (isRemotePath(configPath)) {
const response = await fetch(configPath);
if (!response.ok) {
throw new Error(`Failed to fetch config file ${configPath}: ${response.statusText}`);
}
return response.text();
} else {
return readFile(configPath, {
encoding: 'utf-8',
});
}
})();
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfig;
const isValidConfig = ajv.validate(indexSchema, config);
if (!isValidConfig) {
throw new Error(`Config file '${configPath}' is invalid: ${ajv.errorsText(ajv.errors)}`);
}
if (config.connections) {
for (const [key, newConnectionConfig] of Object.entries(config.connections)) {
const currentConnection = await prisma.connection.findUnique({
where: {
name_orgId: {
name: key,
orgId: SINGLE_TENANT_ORG_ID,
}
},
include: {
repos: {
include: {
repo: true,
}
}
}
});
const currentConnectionConfig = currentConnection ? currentConnection.config as unknown as ConnectionConfig : undefined;
const syncNeededOnUpdate =
(currentConnectionConfig && JSON.stringify(currentConnectionConfig) !== JSON.stringify(newConnectionConfig)) ||
(currentConnection?.syncStatus === ConnectionSyncStatus.FAILED);
const connectionDb = await prisma.connection.upsert({
where: {
name_orgId: {
name: key,
orgId: SINGLE_TENANT_ORG_ID,
}
},
update: {
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined,
isDeclarative: true,
},
create: {
name: key,
connectionType: newConnectionConfig.type,
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
isDeclarative: true,
org: {
connect: {
id: SINGLE_TENANT_ORG_ID,
}
}
}
});
console.log(`Upserted connection with name '${key}'. Connection ID: ${connectionDb.id}`);
// Re-try any repos that failed to index.
const failedRepos = currentConnection?.repos.filter(repo => repo.repo.repoIndexingStatus === RepoIndexingStatus.FAILED).map(repo => repo.repo.id) ?? [];
if (failedRepos.length > 0) {
await prisma.repo.updateMany({
where: {
id: {
in: failedRepos,
}
},
data: {
repoIndexingStatus: RepoIndexingStatus.NEW,
}
})
}
}
const deletedConnections = await prisma.connection.findMany({
where: {
isDeclarative: true,
name: {
notIn: Object.keys(config.connections),
},
orgId: SINGLE_TENANT_ORG_ID,
}
});
for (const connection of deletedConnections) {
console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
await prisma.connection.delete({
where: {
id: connection.id,
}
})
}
}
}
const initSingleTenancy = async () => {
await prisma.org.upsert({
where: {
@ -77,110 +186,14 @@ const initSingleTenancy = async () => {
// Load any connections defined declaratively in the config file.
const configPath = env.CONFIG_PATH;
if (configPath) {
const configContent = await (async () => {
if (isRemotePath(configPath)) {
const response = await fetch(configPath);
if (!response.ok) {
throw new Error(`Failed to fetch config file ${configPath}: ${response.statusText}`);
}
return response.text();
} else {
return readFile(configPath, {
encoding: 'utf-8',
});
}
})();
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfig;
const isValidConfig = ajv.validate(indexSchema, config);
if (!isValidConfig) {
throw new Error(`Config file '${configPath}' is invalid: ${ajv.errorsText(ajv.errors)}`);
}
await scheduleDeclarativeConfigSync(configPath);
if (config.connections) {
for (const [key, newConnectionConfig] of Object.entries(config.connections)) {
const currentConnection = await prisma.connection.findUnique({
where: {
name_orgId: {
name: key,
orgId: SINGLE_TENANT_ORG_ID,
}
},
include: {
repos: {
include: {
repo: true,
}
}
}
});
const currentConnectionConfig = currentConnection ? currentConnection.config as unknown as ConnectionConfig : undefined;
const syncNeededOnUpdate =
(currentConnectionConfig && JSON.stringify(currentConnectionConfig) !== JSON.stringify(newConnectionConfig)) ||
(currentConnection?.syncStatus === ConnectionSyncStatus.FAILED);
const connectionDb = await prisma.connection.upsert({
where: {
name_orgId: {
name: key,
orgId: SINGLE_TENANT_ORG_ID,
}
},
update: {
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined,
isDeclarative: true,
},
create: {
name: key,
connectionType: newConnectionConfig.type,
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
isDeclarative: true,
org: {
connect: {
id: SINGLE_TENANT_ORG_ID,
}
}
}
});
console.log(`Upserted connection with name '${key}'. Connection ID: ${connectionDb.id}`);
// Re-try any repos that failed to index.
const failedRepos = currentConnection?.repos.filter(repo => repo.repo.repoIndexingStatus === RepoIndexingStatus.FAILED).map(repo => repo.repo.id) ?? [];
if (failedRepos.length > 0) {
await prisma.repo.updateMany({
where: {
id: {
in: failedRepos,
}
},
data: {
repoIndexingStatus: RepoIndexingStatus.NEW,
}
})
}
}
const deletedConnections = await prisma.connection.findMany({
where: {
isDeclarative: true,
name: {
notIn: Object.keys(config.connections),
},
orgId: SINGLE_TENANT_ORG_ID,
}
// watch for changes assuming it is a local file
if (!isRemotePath(configPath)) {
watch(configPath, () => {
console.log(`Config file ${configPath} changed. Re-syncing...`);
scheduleDeclarativeConfigSync(configPath);
});
for (const connection of deletedConnections) {
console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
await prisma.connection.delete({
where: {
id: connection.id,
}
})
}
}
}
}