Declarative connection cleanup + improvements (#245)

This commit is contained in:
Brendan Kellam 2025-03-25 18:45:21 -07:00 committed by GitHub
parent 691c5937c1
commit 27a1066e1a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 111 additions and 60 deletions

View file

@ -115,8 +115,9 @@ ENV NEXT_TELEMETRY_DISABLED=1
ENV DATA_DIR=/data ENV DATA_DIR=/data
ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot
ENV DB_DATA_DIR=$DATA_CACHE_DIR/db ENV DB_DATA_DIR=$DATA_CACHE_DIR/db
ENV REDIS_DATA_DIR=$DATA_CACHE_DIR/redis
ENV DB_NAME=sourcebot ENV DB_NAME=sourcebot
ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot" ENV DATABASE_URL="postgresql://postgres@localhost:5432/$DB_NAME"
ENV REDIS_URL="redis://localhost:6379" ENV REDIS_URL="redis://localhost:6379"
ENV SRC_TENANT_ENFORCEMENT_MODE=strict ENV SRC_TENANT_ENFORCEMENT_MODE=strict

View file

@ -36,6 +36,11 @@ if [ ! -d "$DB_DATA_DIR" ]; then
su postgres -c "initdb -D $DB_DATA_DIR" su postgres -c "initdb -D $DB_DATA_DIR"
fi fi
# Create the redis data directory if it doesn't exist
if [ ! -d "$REDIS_DATA_DIR" ]; then
mkdir -p $REDIS_DATA_DIR
fi
if [ -z "$SOURCEBOT_ENCRYPTION_KEY" ]; then if [ -z "$SOURCEBOT_ENCRYPTION_KEY" ]; then
echo -e "\e[33m[Warning] SOURCEBOT_ENCRYPTION_KEY is not set.\e[0m" echo -e "\e[33m[Warning] SOURCEBOT_ENCRYPTION_KEY is not set.\e[0m"

View file

@ -1,9 +1,8 @@
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, RepoIndexingStatus } from "@sourcebot/db"; import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
import { Job, Queue, Worker } from 'bullmq'; import { Job, Queue, Worker } from 'bullmq';
import { Settings } from "./types.js"; import { Settings } from "./types.js";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { createLogger } from "./logger.js"; import { createLogger } from "./logger.js";
import os from 'os';
import { Redis } from 'ioredis'; import { Redis } from 'ioredis';
import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js"; import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js";
import { BackendError, BackendException } from "@sourcebot/error"; import { BackendError, BackendException } from "@sourcebot/error";
@ -42,10 +41,9 @@ export class ConnectionManager implements IConnectionManager {
this.queue = new Queue<JobPayload>(QUEUE_NAME, { this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis, connection: redis,
}); });
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), { this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
connection: redis, connection: redis,
concurrency: numCores * this.settings.configSyncConcurrencyMultiple, concurrency: this.settings.maxConnectionSyncJobConcurrency,
}); });
this.worker.on('completed', this.onSyncJobCompleted.bind(this)); this.worker.on('completed', this.onSyncJobCompleted.bind(this));
this.worker.on('failed', this.onSyncJobFailed.bind(this)); this.worker.on('failed', this.onSyncJobFailed.bind(this));
@ -262,11 +260,11 @@ export class ConnectionManager implements IConnectionManager {
}); });
} }
private async onSyncJobFailed(job: Job | undefined, err: unknown) { private async onSyncJobFailed(job: Job<JobPayload> | undefined, err: unknown) {
this.logger.info(`Connection sync job failed with error: ${err}`); this.logger.info(`Connection sync job failed with error: ${err}`);
Sentry.captureException(err, { Sentry.captureException(err, {
tags: { tags: {
repoId: job?.data.repo.id, connectionid: job?.data.connectionId,
jobId: job?.id, jobId: job?.id,
queue: QUEUE_NAME, queue: QUEUE_NAME,
} }

View file

@ -5,13 +5,13 @@ import { Settings } from "./types.js";
*/ */
export const DEFAULT_SETTINGS: Settings = { export const DEFAULT_SETTINGS: Settings = {
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
reindexIntervalMs: 1000 * 60 * 60, // 1 hour
resyncConnectionPollingIntervalMs: 1000,
reindexRepoPollingIntervalMs: 1000,
indexConcurrencyMultiple: 3,
configSyncConcurrencyMultiple: 3,
gcConcurrencyMultiple: 1,
gcGracePeriodMs: 10 * 1000, // 10 seconds
repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours
maxTrigramCount: 20000, maxTrigramCount: 20000,
reindexIntervalMs: 1000 * 60 * 60, // 1 hour
resyncConnectionPollingIntervalMs: 1000 * 1, // 1 second
reindexRepoPollingIntervalMs: 1000 * 1, // 1 second
maxConnectionSyncJobConcurrency: 8,
maxRepoIndexingJobConcurrency: 8,
maxRepoGarbageCollectionJobConcurrency: 8,
repoGarbageCollectionGracePeriodMs: 10 * 1000, // 10 seconds
repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours
} }

View file

@ -257,6 +257,7 @@ const getReposOwnedByUsers = async (users: string[], isAuthenticated: boolean, o
}; };
} catch (error) { } catch (error) {
Sentry.captureException(error); Sentry.captureException(error);
logger.error(`Failed to fetch repositories for user ${user}.`, error);
if (isHttpError(error, 404)) { if (isHttpError(error, 404)) {
logger.error(`User ${user} not found or no access`); logger.error(`User ${user} not found or no access`);
@ -302,6 +303,7 @@ const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSi
}; };
} catch (error) { } catch (error) {
Sentry.captureException(error); Sentry.captureException(error);
logger.error(`Failed to fetch repositories for org ${org}.`, error);
if (isHttpError(error, 404)) { if (isHttpError(error, 404)) {
logger.error(`Organization ${org} not found or no access`); logger.error(`Organization ${org} not found or no access`);
@ -349,6 +351,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna
} catch (error) { } catch (error) {
Sentry.captureException(error); Sentry.captureException(error);
logger.error(`Failed to fetch repository ${repo}.`, error);
if (isHttpError(error, 404)) { if (isHttpError(error, 404)) {
logger.error(`Repository ${repo} not found or no access`); logger.error(`Repository ${repo} not found or no access`);

View file

@ -82,6 +82,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
}; };
} catch (e: any) { } catch (e: any) {
Sentry.captureException(e); Sentry.captureException(e);
logger.error(`Failed to fetch projects for group ${group}.`, e);
const status = e?.cause?.response?.status; const status = e?.cause?.response?.status;
if (status === 404) { if (status === 404) {
@ -118,6 +119,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
}; };
} catch (e: any) { } catch (e: any) {
Sentry.captureException(e); Sentry.captureException(e);
logger.error(`Failed to fetch projects for user ${user}.`, e);
const status = e?.cause?.response?.status; const status = e?.cause?.response?.status;
if (status === 404) { if (status === 404) {
@ -152,6 +154,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
}; };
} catch (e: any) { } catch (e: any) {
Sentry.captureException(e); Sentry.captureException(e);
logger.error(`Failed to fetch project ${project}.`, e);
const status = e?.cause?.response?.status; const status = e?.cause?.response?.status;

View file

@ -8,7 +8,6 @@ import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./util
import { cloneRepository, fetchRepository } from "./git.js"; import { cloneRepository, fetchRepository } from "./git.js";
import { existsSync, readdirSync, promises } from 'fs'; import { existsSync, readdirSync, promises } from 'fs';
import { indexGitRepository } from "./zoekt.js"; import { indexGitRepository } from "./zoekt.js";
import os from 'os';
import { PromClient } from './promClient.js'; import { PromClient } from './promClient.js';
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
@ -43,15 +42,13 @@ export class RepoManager implements IRepoManager {
private promClient: PromClient, private promClient: PromClient,
private ctx: AppContext, private ctx: AppContext,
) { ) {
const numCores = os.cpus().length;
// Repo indexing // Repo indexing
this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, { this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, {
connection: redis, connection: redis,
}); });
this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), { this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), {
connection: redis, connection: redis,
concurrency: numCores * this.settings.indexConcurrencyMultiple, concurrency: this.settings.maxRepoIndexingJobConcurrency,
}); });
this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this)); this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this));
this.indexWorker.on('failed', this.onIndexJobFailed.bind(this)); this.indexWorker.on('failed', this.onIndexJobFailed.bind(this));
@ -62,7 +59,7 @@ export class RepoManager implements IRepoManager {
}); });
this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), { this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), {
connection: redis, connection: redis,
concurrency: numCores * this.settings.gcConcurrencyMultiple, concurrency: this.settings.maxRepoGarbageCollectionJobConcurrency,
}); });
this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this)); this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this));
this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this)); this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this));
@ -396,7 +393,7 @@ export class RepoManager implements IRepoManager {
//////////////////////////////////// ////////////////////////////////////
const thresholdDate = new Date(Date.now() - this.settings.gcGracePeriodMs); const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs);
const reposWithNoConnections = await this.db.repo.findMany({ const reposWithNoConnections = await this.db.repo.findMany({
where: { where: {
repoIndexingStatus: { repoIndexingStatus: {

View file

@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Connection" ADD COLUMN "isDeclarative" BOOLEAN NOT NULL DEFAULT false;

View file

@ -67,6 +67,7 @@ model Connection {
id Int @id @default(autoincrement()) id Int @id @default(autoincrement())
name String name String
config Json config Json
isDeclarative Boolean @default(false)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
syncedAt DateTime? syncedAt DateTime?

View file

@ -10,37 +10,45 @@ const schema = {
"properties": { "properties": {
"maxFileSize": { "maxFileSize": {
"type": "number", "type": "number",
"description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed." "description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.",
"minimum": 1
}, },
"maxTrigramCount": { "maxTrigramCount": {
"type": "number", "type": "number",
"description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed." "description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.",
"minimum": 1
}, },
"reindexIntervalMs": { "reindexIntervalMs": {
"type": "number", "type": "number",
"description": "The interval (in milliseconds) at which the indexer should re-index all repositories." "description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.",
"minimum": 1
}, },
"resyncConnectionPollingIntervalMs": { "resyncConnectionPollingIntervalMs": {
"type": "number", "type": "number",
"description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced." "description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 5 seconds.",
"minimum": 1
}, },
"reindexRepoPollingIntervalMs": { "reindexRepoPollingIntervalMs": {
"type": "number", "type": "number",
"description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed." "description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 5 seconds.",
"minimum": 1
}, },
"indexConcurrencyMultiple": { "maxConnectionSyncJobConcurrency": {
"type": "number", "type": "number",
"description": "The multiple of the number of CPUs to use for indexing." "description": "The number of connection sync jobs to run concurrently. Defaults to 8.",
"minimum": 1
}, },
"configSyncConcurrencyMultiple": { "maxRepoIndexingJobConcurrency": {
"type": "number", "type": "number",
"description": "The multiple of the number of CPUs to use for syncing the configuration." "description": "The number of repo indexing jobs to run concurrently. Defaults to 8.",
"minimum": 1
}, },
"gcConcurrencyMultiple": { "maxRepoGarbageCollectionJobConcurrency": {
"type": "number", "type": "number",
"description": "The multiple of the number of CPUs to use for garbage collection." "description": "The number of repo GC jobs to run concurrently. Defaults to 8.",
"minimum": 1
}, },
"gcGracePeriodMs": { "repoGarbageCollectionGracePeriodMs": {
"type": "number", "type": "number",
"description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded." "description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded."
}, },

View file

@ -28,41 +28,41 @@ export interface SourcebotConfig {
*/ */
export interface Settings { export interface Settings {
/** /**
* The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. * The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.
*/ */
maxFileSize?: number; maxFileSize?: number;
/** /**
* The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. * The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.
*/ */
maxTrigramCount?: number; maxTrigramCount?: number;
/** /**
* The interval (in milliseconds) at which the indexer should re-index all repositories. * The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.
*/ */
reindexIntervalMs?: number; reindexIntervalMs?: number;
/** /**
* The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. * The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 5 seconds.
*/ */
resyncConnectionPollingIntervalMs?: number; resyncConnectionPollingIntervalMs?: number;
/** /**
* The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. * The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 5 seconds.
*/ */
reindexRepoPollingIntervalMs?: number; reindexRepoPollingIntervalMs?: number;
/** /**
* The multiple of the number of CPUs to use for indexing. * The number of connection sync jobs to run concurrently. Defaults to 8.
*/ */
indexConcurrencyMultiple?: number; maxConnectionSyncJobConcurrency?: number;
/** /**
* The multiple of the number of CPUs to use for syncing the configuration. * The number of repo indexing jobs to run concurrently. Defaults to 8.
*/ */
configSyncConcurrencyMultiple?: number; maxRepoIndexingJobConcurrency?: number;
/** /**
* The multiple of the number of CPUs to use for garbage collection. * The number of repo GC jobs to run concurrently. Defaults to 8.
*/ */
gcConcurrencyMultiple?: number; maxRepoGarbageCollectionJobConcurrency?: number;
/** /**
* The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded. * The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded.
*/ */
gcGracePeriodMs?: number; repoGarbageCollectionGracePeriodMs?: number;
/** /**
* The timeout (in milliseconds) for a repo indexing to timeout. * The timeout (in milliseconds) for a repo indexing to timeout.
*/ */

View file

@ -13,6 +13,7 @@ import { SourcebotLogo } from './components/sourcebotLogo';
export default function Error({ error, reset }: { error: Error & { digest?: string }, reset: () => void }) { export default function Error({ error, reset }: { error: Error & { digest?: string }, reset: () => void }) {
useEffect(() => { useEffect(() => {
Sentry.captureException(error); Sentry.captureException(error);
console.error(error);
}, [error]); }, [error]);
const { message, errorCode, statusCode } = useMemo(() => { const { message, errorCode, statusCode } = useMemo(() => {

View file

@ -130,11 +130,13 @@ const initSingleTenancy = async () => {
update: { update: {
config: newConnectionConfig as unknown as Prisma.InputJsonValue, config: newConnectionConfig as unknown as Prisma.InputJsonValue,
syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined, syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined,
isDeclarative: true,
}, },
create: { create: {
name: key, name: key,
connectionType: newConnectionConfig.type, connectionType: newConnectionConfig.type,
config: newConnectionConfig as unknown as Prisma.InputJsonValue, config: newConnectionConfig as unknown as Prisma.InputJsonValue,
isDeclarative: true,
org: { org: {
connect: { connect: {
id: SINGLE_TENANT_ORG_ID, id: SINGLE_TENANT_ORG_ID,
@ -160,6 +162,25 @@ const initSingleTenancy = async () => {
}) })
} }
} }
const deletedConnections = await prisma.connection.findMany({
where: {
isDeclarative: true,
name: {
notIn: Object.keys(config.connections),
},
orgId: SINGLE_TENANT_ORG_ID,
}
});
for (const connection of deletedConnections) {
console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
await prisma.connection.delete({
where: {
id: connection.id,
}
})
}
} }
} }
} }

View file

@ -9,43 +9,54 @@
"properties": { "properties": {
"maxFileSize": { "maxFileSize": {
"type": "number", "type": "number",
"description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed." "description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.",
"minimum": 1
}, },
"maxTrigramCount": { "maxTrigramCount": {
"type": "number", "type": "number",
"description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed." "description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.",
"minimum": 1
}, },
"reindexIntervalMs": { "reindexIntervalMs": {
"type": "number", "type": "number",
"description": "The interval (in milliseconds) at which the indexer should re-index all repositories." "description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.",
"minimum": 1
}, },
"resyncConnectionPollingIntervalMs": { "resyncConnectionPollingIntervalMs": {
"type": "number", "type": "number",
"description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced." "description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 1 second.",
"minimum": 1
}, },
"reindexRepoPollingIntervalMs": { "reindexRepoPollingIntervalMs": {
"type": "number", "type": "number",
"description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed." "description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 1 second.",
"minimum": 1
}, },
"indexConcurrencyMultiple": { "maxConnectionSyncJobConcurrency": {
"type": "number", "type": "number",
"description": "The multiple of the number of CPUs to use for indexing." "description": "The number of connection sync jobs to run concurrently. Defaults to 8.",
"minimum": 1
}, },
"configSyncConcurrencyMultiple": { "maxRepoIndexingJobConcurrency": {
"type": "number", "type": "number",
"description": "The multiple of the number of CPUs to use for syncing the configuration." "description": "The number of repo indexing jobs to run concurrently. Defaults to 8.",
"minimum": 1
}, },
"gcConcurrencyMultiple": { "maxRepoGarbageCollectionJobConcurrency": {
"type": "number", "type": "number",
"description": "The multiple of the number of CPUs to use for garbage collection." "description": "The number of repo GC jobs to run concurrently. Defaults to 8.",
"minimum": 1
}, },
"gcGracePeriodMs": { "repoGarbageCollectionGracePeriodMs": {
"type": "number", "type": "number",
"description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded." "description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded. Defaults to 10 seconds.",
"minimum": 1
}, },
"repoIndexTimeoutMs": { "repoIndexTimeoutMs": {
"type": "number", "type": "number",
"description": "The timeout (in milliseconds) for a repo indexing to timeout." "description": "The timeout (in milliseconds) for a repo indexing to timeout. Defaults to 2 hours.",
"minimum": 1
} }
}, },
"additionalProperties": false "additionalProperties": false

View file

@ -31,7 +31,7 @@ stdout_logfile_maxbytes=0
redirect_stderr=true redirect_stderr=true
[program:redis] [program:redis]
command=redis-server command=redis-server --dir %(ENV_REDIS_DATA_DIR)s
autostart=true autostart=true
autorestart=true autorestart=true
startretries=3 startretries=3