From 27a1066e1afb5cb86dc565d45d24b76d6a239dbb Mon Sep 17 00:00:00 2001 From: Brendan Kellam Date: Tue, 25 Mar 2025 18:45:21 -0700 Subject: [PATCH] Declarative connection cleanup + improvements (#245) --- Dockerfile | 3 +- entrypoint.sh | 5 +++ packages/backend/src/connectionManager.ts | 10 ++--- packages/backend/src/constants.ts | 16 ++++---- packages/backend/src/github.ts | 3 ++ packages/backend/src/gitlab.ts | 3 ++ packages/backend/src/repoManager.ts | 9 ++--- .../migration.sql | 2 + packages/db/prisma/schema.prisma | 1 + packages/schemas/src/v3/index.schema.ts | 32 +++++++++------ packages/schemas/src/v3/index.type.ts | 24 ++++++------ packages/web/src/app/error.tsx | 1 + packages/web/src/initialize.ts | 21 ++++++++++ schemas/v3/index.json | 39 ++++++++++++------- supervisord.conf | 2 +- 15 files changed, 111 insertions(+), 60 deletions(-) create mode 100644 packages/db/prisma/migrations/20250325161848_add_is_declarative_flag_to_connection/migration.sql diff --git a/Dockerfile b/Dockerfile index bc98f82d..67f27bb9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -115,8 +115,9 @@ ENV NEXT_TELEMETRY_DISABLED=1 ENV DATA_DIR=/data ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot ENV DB_DATA_DIR=$DATA_CACHE_DIR/db +ENV REDIS_DATA_DIR=$DATA_CACHE_DIR/redis ENV DB_NAME=sourcebot -ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot" +ENV DATABASE_URL="postgresql://postgres@localhost:5432/$DB_NAME" ENV REDIS_URL="redis://localhost:6379" ENV SRC_TENANT_ENFORCEMENT_MODE=strict diff --git a/entrypoint.sh b/entrypoint.sh index ca9833b3..1d39f0f3 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -36,6 +36,11 @@ if [ ! -d "$DB_DATA_DIR" ]; then su postgres -c "initdb -D $DB_DATA_DIR" fi +# Create the redis data directory if it doesn't exist +if [ ! -d "$REDIS_DATA_DIR" ]; then + mkdir -p $REDIS_DATA_DIR +fi + if [ -z "$SOURCEBOT_ENCRYPTION_KEY" ]; then echo -e "\e[33m[Warning] SOURCEBOT_ENCRYPTION_KEY is not set.\e[0m" diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index 815de3cd..81d184c4 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -1,9 +1,8 @@ -import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, RepoIndexingStatus } from "@sourcebot/db"; +import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db"; import { Job, Queue, Worker } from 'bullmq'; import { Settings } from "./types.js"; import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; import { createLogger } from "./logger.js"; -import os from 'os'; import { Redis } from 'ioredis'; import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js"; import { BackendError, BackendException } from "@sourcebot/error"; @@ -42,10 +41,9 @@ export class ConnectionManager implements IConnectionManager { this.queue = new Queue(QUEUE_NAME, { connection: redis, }); - const numCores = os.cpus().length; this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), { connection: redis, - concurrency: numCores * this.settings.configSyncConcurrencyMultiple, + concurrency: this.settings.maxConnectionSyncJobConcurrency, }); this.worker.on('completed', this.onSyncJobCompleted.bind(this)); this.worker.on('failed', this.onSyncJobFailed.bind(this)); @@ -262,11 +260,11 @@ export class ConnectionManager implements IConnectionManager { }); } - private async onSyncJobFailed(job: Job | undefined, err: unknown) { + private async onSyncJobFailed(job: Job | undefined, err: unknown) { this.logger.info(`Connection sync job failed with error: ${err}`); Sentry.captureException(err, { tags: { - repoId: job?.data.repo.id, + connectionid: job?.data.connectionId, jobId: job?.id, queue: QUEUE_NAME, } diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index 3b719122..2973f95a 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -5,13 +5,13 @@ import { Settings } from "./types.js"; */ export const DEFAULT_SETTINGS: Settings = { maxFileSize: 2 * 1024 * 1024, // 2MB in bytes - reindexIntervalMs: 1000 * 60 * 60, // 1 hour - resyncConnectionPollingIntervalMs: 1000, - reindexRepoPollingIntervalMs: 1000, - indexConcurrencyMultiple: 3, - configSyncConcurrencyMultiple: 3, - gcConcurrencyMultiple: 1, - gcGracePeriodMs: 10 * 1000, // 10 seconds - repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours maxTrigramCount: 20000, + reindexIntervalMs: 1000 * 60 * 60, // 1 hour + resyncConnectionPollingIntervalMs: 1000 * 1, // 1 second + reindexRepoPollingIntervalMs: 1000 * 1, // 1 second + maxConnectionSyncJobConcurrency: 8, + maxRepoIndexingJobConcurrency: 8, + maxRepoGarbageCollectionJobConcurrency: 8, + repoGarbageCollectionGracePeriodMs: 10 * 1000, // 10 seconds + repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours } \ No newline at end of file diff --git a/packages/backend/src/github.ts b/packages/backend/src/github.ts index f6a41db9..d976fb8b 100644 --- a/packages/backend/src/github.ts +++ b/packages/backend/src/github.ts @@ -257,6 +257,7 @@ const getReposOwnedByUsers = async (users: string[], isAuthenticated: boolean, o }; } catch (error) { Sentry.captureException(error); + logger.error(`Failed to fetch repositories for user ${user}.`, error); if (isHttpError(error, 404)) { logger.error(`User ${user} not found or no access`); @@ -302,6 +303,7 @@ const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSi }; } catch (error) { Sentry.captureException(error); + logger.error(`Failed to fetch repositories for org ${org}.`, error); if (isHttpError(error, 404)) { logger.error(`Organization ${org} not found or no access`); @@ -349,6 +351,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna } catch (error) { Sentry.captureException(error); + logger.error(`Failed to fetch repository ${repo}.`, error); if (isHttpError(error, 404)) { logger.error(`Repository ${repo} not found or no access`); diff --git a/packages/backend/src/gitlab.ts b/packages/backend/src/gitlab.ts index 86ad2768..2981e50e 100644 --- a/packages/backend/src/gitlab.ts +++ b/packages/backend/src/gitlab.ts @@ -82,6 +82,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o }; } catch (e: any) { Sentry.captureException(e); + logger.error(`Failed to fetch projects for group ${group}.`, e); const status = e?.cause?.response?.status; if (status === 404) { @@ -118,6 +119,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o }; } catch (e: any) { Sentry.captureException(e); + logger.error(`Failed to fetch projects for user ${user}.`, e); const status = e?.cause?.response?.status; if (status === 404) { @@ -152,6 +154,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o }; } catch (e: any) { Sentry.captureException(e); + logger.error(`Failed to fetch project ${project}.`, e); const status = e?.cause?.response?.status; diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index ff8cd585..d88f513a 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -8,7 +8,6 @@ import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./util import { cloneRepository, fetchRepository } from "./git.js"; import { existsSync, readdirSync, promises } from 'fs'; import { indexGitRepository } from "./zoekt.js"; -import os from 'os'; import { PromClient } from './promClient.js'; import * as Sentry from "@sentry/node"; @@ -43,15 +42,13 @@ export class RepoManager implements IRepoManager { private promClient: PromClient, private ctx: AppContext, ) { - const numCores = os.cpus().length; - // Repo indexing this.indexQueue = new Queue(REPO_INDEXING_QUEUE, { connection: redis, }); this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), { connection: redis, - concurrency: numCores * this.settings.indexConcurrencyMultiple, + concurrency: this.settings.maxRepoIndexingJobConcurrency, }); this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this)); this.indexWorker.on('failed', this.onIndexJobFailed.bind(this)); @@ -62,7 +59,7 @@ export class RepoManager implements IRepoManager { }); this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), { connection: redis, - concurrency: numCores * this.settings.gcConcurrencyMultiple, + concurrency: this.settings.maxRepoGarbageCollectionJobConcurrency, }); this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this)); this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this)); @@ -396,7 +393,7 @@ export class RepoManager implements IRepoManager { //////////////////////////////////// - const thresholdDate = new Date(Date.now() - this.settings.gcGracePeriodMs); + const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs); const reposWithNoConnections = await this.db.repo.findMany({ where: { repoIndexingStatus: { diff --git a/packages/db/prisma/migrations/20250325161848_add_is_declarative_flag_to_connection/migration.sql b/packages/db/prisma/migrations/20250325161848_add_is_declarative_flag_to_connection/migration.sql new file mode 100644 index 00000000..fb0b0ef3 --- /dev/null +++ b/packages/db/prisma/migrations/20250325161848_add_is_declarative_flag_to_connection/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Connection" ADD COLUMN "isDeclarative" BOOLEAN NOT NULL DEFAULT false; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index ce6edc3c..26b3a98c 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -67,6 +67,7 @@ model Connection { id Int @id @default(autoincrement()) name String config Json + isDeclarative Boolean @default(false) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt syncedAt DateTime? diff --git a/packages/schemas/src/v3/index.schema.ts b/packages/schemas/src/v3/index.schema.ts index 24890a4b..258635d5 100644 --- a/packages/schemas/src/v3/index.schema.ts +++ b/packages/schemas/src/v3/index.schema.ts @@ -10,37 +10,45 @@ const schema = { "properties": { "maxFileSize": { "type": "number", - "description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed." + "description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.", + "minimum": 1 }, "maxTrigramCount": { "type": "number", - "description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed." + "description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.", + "minimum": 1 }, "reindexIntervalMs": { "type": "number", - "description": "The interval (in milliseconds) at which the indexer should re-index all repositories." + "description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.", + "minimum": 1 }, "resyncConnectionPollingIntervalMs": { "type": "number", - "description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced." + "description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 5 seconds.", + "minimum": 1 }, "reindexRepoPollingIntervalMs": { "type": "number", - "description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed." + "description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 5 seconds.", + "minimum": 1 }, - "indexConcurrencyMultiple": { + "maxConnectionSyncJobConcurrency": { "type": "number", - "description": "The multiple of the number of CPUs to use for indexing." + "description": "The number of connection sync jobs to run concurrently. Defaults to 8.", + "minimum": 1 }, - "configSyncConcurrencyMultiple": { + "maxRepoIndexingJobConcurrency": { "type": "number", - "description": "The multiple of the number of CPUs to use for syncing the configuration." + "description": "The number of repo indexing jobs to run concurrently. Defaults to 8.", + "minimum": 1 }, - "gcConcurrencyMultiple": { + "maxRepoGarbageCollectionJobConcurrency": { "type": "number", - "description": "The multiple of the number of CPUs to use for garbage collection." + "description": "The number of repo GC jobs to run concurrently. Defaults to 8.", + "minimum": 1 }, - "gcGracePeriodMs": { + "repoGarbageCollectionGracePeriodMs": { "type": "number", "description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded." }, diff --git a/packages/schemas/src/v3/index.type.ts b/packages/schemas/src/v3/index.type.ts index 7692d609..45e5d7ea 100644 --- a/packages/schemas/src/v3/index.type.ts +++ b/packages/schemas/src/v3/index.type.ts @@ -28,41 +28,41 @@ export interface SourcebotConfig { */ export interface Settings { /** - * The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. + * The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB. */ maxFileSize?: number; /** - * The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. + * The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000. */ maxTrigramCount?: number; /** - * The interval (in milliseconds) at which the indexer should re-index all repositories. + * The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour. */ reindexIntervalMs?: number; /** - * The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. + * The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 5 seconds. */ resyncConnectionPollingIntervalMs?: number; /** - * The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. + * The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 5 seconds. */ reindexRepoPollingIntervalMs?: number; /** - * The multiple of the number of CPUs to use for indexing. + * The number of connection sync jobs to run concurrently. Defaults to 8. */ - indexConcurrencyMultiple?: number; + maxConnectionSyncJobConcurrency?: number; /** - * The multiple of the number of CPUs to use for syncing the configuration. + * The number of repo indexing jobs to run concurrently. Defaults to 8. */ - configSyncConcurrencyMultiple?: number; + maxRepoIndexingJobConcurrency?: number; /** - * The multiple of the number of CPUs to use for garbage collection. + * The number of repo GC jobs to run concurrently. Defaults to 8. */ - gcConcurrencyMultiple?: number; + maxRepoGarbageCollectionJobConcurrency?: number; /** * The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded. */ - gcGracePeriodMs?: number; + repoGarbageCollectionGracePeriodMs?: number; /** * The timeout (in milliseconds) for a repo indexing to timeout. */ diff --git a/packages/web/src/app/error.tsx b/packages/web/src/app/error.tsx index 379c7302..69f503e8 100644 --- a/packages/web/src/app/error.tsx +++ b/packages/web/src/app/error.tsx @@ -13,6 +13,7 @@ import { SourcebotLogo } from './components/sourcebotLogo'; export default function Error({ error, reset }: { error: Error & { digest?: string }, reset: () => void }) { useEffect(() => { Sentry.captureException(error); + console.error(error); }, [error]); const { message, errorCode, statusCode } = useMemo(() => { diff --git a/packages/web/src/initialize.ts b/packages/web/src/initialize.ts index 42fac9d6..5ca164c9 100644 --- a/packages/web/src/initialize.ts +++ b/packages/web/src/initialize.ts @@ -130,11 +130,13 @@ const initSingleTenancy = async () => { update: { config: newConnectionConfig as unknown as Prisma.InputJsonValue, syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined, + isDeclarative: true, }, create: { name: key, connectionType: newConnectionConfig.type, config: newConnectionConfig as unknown as Prisma.InputJsonValue, + isDeclarative: true, org: { connect: { id: SINGLE_TENANT_ORG_ID, @@ -160,6 +162,25 @@ const initSingleTenancy = async () => { }) } } + + const deletedConnections = await prisma.connection.findMany({ + where: { + isDeclarative: true, + name: { + notIn: Object.keys(config.connections), + }, + orgId: SINGLE_TENANT_ORG_ID, + } + }); + + for (const connection of deletedConnections) { + console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`); + await prisma.connection.delete({ + where: { + id: connection.id, + } + }) + } } } } diff --git a/schemas/v3/index.json b/schemas/v3/index.json index 21baac5e..8119ad54 100644 --- a/schemas/v3/index.json +++ b/schemas/v3/index.json @@ -9,43 +9,54 @@ "properties": { "maxFileSize": { "type": "number", - "description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed." + "description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.", + "minimum": 1 + }, "maxTrigramCount": { "type": "number", - "description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed." + "description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.", + "minimum": 1 }, "reindexIntervalMs": { "type": "number", - "description": "The interval (in milliseconds) at which the indexer should re-index all repositories." + "description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.", + "minimum": 1 }, "resyncConnectionPollingIntervalMs": { "type": "number", - "description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced." + "description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 1 second.", + "minimum": 1 }, "reindexRepoPollingIntervalMs": { "type": "number", - "description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed." + "description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 1 second.", + "minimum": 1 }, - "indexConcurrencyMultiple": { + "maxConnectionSyncJobConcurrency": { "type": "number", - "description": "The multiple of the number of CPUs to use for indexing." + "description": "The number of connection sync jobs to run concurrently. Defaults to 8.", + "minimum": 1 }, - "configSyncConcurrencyMultiple": { + "maxRepoIndexingJobConcurrency": { "type": "number", - "description": "The multiple of the number of CPUs to use for syncing the configuration." + "description": "The number of repo indexing jobs to run concurrently. Defaults to 8.", + "minimum": 1 }, - "gcConcurrencyMultiple": { + "maxRepoGarbageCollectionJobConcurrency": { "type": "number", - "description": "The multiple of the number of CPUs to use for garbage collection." + "description": "The number of repo GC jobs to run concurrently. Defaults to 8.", + "minimum": 1 }, - "gcGracePeriodMs": { + "repoGarbageCollectionGracePeriodMs": { "type": "number", - "description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded." + "description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded. Defaults to 10 seconds.", + "minimum": 1 }, "repoIndexTimeoutMs": { "type": "number", - "description": "The timeout (in milliseconds) for a repo indexing to timeout." + "description": "The timeout (in milliseconds) for a repo indexing to timeout. Defaults to 2 hours.", + "minimum": 1 } }, "additionalProperties": false diff --git a/supervisord.conf b/supervisord.conf index 6ec1fbba..bc96a487 100644 --- a/supervisord.conf +++ b/supervisord.conf @@ -31,7 +31,7 @@ stdout_logfile_maxbytes=0 redirect_stderr=true [program:redis] -command=redis-server +command=redis-server --dir %(ENV_REDIS_DATA_DIR)s autostart=true autorestart=true startretries=3