From 0a3a63c8f08f1aa920b9566c5a13e5989739aacd Mon Sep 17 00:00:00 2001 From: bkellam Date: Tue, 16 Sep 2025 22:02:04 -0700 Subject: [PATCH] permission syncer --- packages/backend/src/connectionManager.ts | 13 +- packages/backend/src/env.ts | 2 + packages/backend/src/github.ts | 77 +++++---- packages/backend/src/index.ts | 118 +++++++++----- packages/backend/src/main.ts | 49 ------ packages/backend/src/permissionSyncer.ts | 181 ++++++++++++++++++++++ packages/backend/src/repoManager.ts | 17 +- packages/backend/src/types.ts | 5 +- 8 files changed, 321 insertions(+), 141 deletions(-) delete mode 100644 packages/backend/src/main.ts create mode 100644 packages/backend/src/permissionSyncer.ts diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index f025bdf7..499393f7 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -11,12 +11,6 @@ import { env } from "./env.js"; import * as Sentry from "@sentry/node"; import { loadConfig, syncSearchContexts } from "@sourcebot/shared"; -interface IConnectionManager { - scheduleConnectionSync: (connection: Connection) => Promise; - registerPollingCallback: () => void; - dispose: () => void; -} - const QUEUE_NAME = 'connectionSyncQueue'; type JobPayload = { @@ -30,7 +24,7 @@ type JobResult = { repoCount: number, } -export class ConnectionManager implements IConnectionManager { +export class ConnectionManager { private worker: Worker; private queue: Queue; private logger = createLogger('connection-manager'); @@ -75,8 +69,9 @@ export class ConnectionManager implements IConnectionManager { }); } - public async registerPollingCallback() { - setInterval(async () => { + public startScheduler() { + this.logger.debug('Starting scheduler'); + return setInterval(async () => { const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs); const connections = await this.db.connection.findMany({ where: { diff --git a/packages/backend/src/env.ts b/packages/backend/src/env.ts index 0a533db0..5d056284 100644 --- a/packages/backend/src/env.ts +++ b/packages/backend/src/env.ts @@ -52,6 +52,8 @@ export const env = createEnv({ REPO_SYNC_RETRY_BASE_SLEEP_SECONDS: numberSchema.default(60), GITLAB_CLIENT_QUERY_TIMEOUT_SECONDS: numberSchema.default(60 * 10), + + EXPERIMENT_PERMISSION_SYNC_ENABLED: booleanSchema.default("false"), }, runtimeEnv: process.env, emptyStringAsUndefined: true, diff --git a/packages/backend/src/github.ts b/packages/backend/src/github.ts index 7c53bf37..a019a879 100644 --- a/packages/backend/src/github.ts +++ b/packages/backend/src/github.ts @@ -30,35 +30,21 @@ export type OctokitRepository = { size?: number, owner: { avatar_url: string, + login: string, } } const isHttpError = (error: unknown, status: number): boolean => { - return error !== null + return error !== null && typeof error === 'object' - && 'status' in error + && 'status' in error && error.status === status; } export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient, signal: AbortSignal) => { - const hostname = config.url ? - new URL(config.url).hostname : - GITHUB_CLOUD_HOSTNAME; + const { octokit, isAuthenticated } = await createOctokitFromConfig(config, orgId, db); - const token = config.token ? - await getTokenFromConfig(config.token, orgId, db, logger) : - hostname === GITHUB_CLOUD_HOSTNAME ? - env.FALLBACK_GITHUB_CLOUD_TOKEN : - undefined; - - const octokit = new Octokit({ - auth: token, - ...(config.url ? { - baseUrl: `${config.url}/api/v3` - } : {}), - }); - - if (token) { + if (isAuthenticated) { try { await octokit.rest.users.getAuthenticated(); } catch (error) { @@ -127,16 +113,51 @@ export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, o logger.debug(`Found ${repos.length} total repositories.`); return { - validRepos: repos, + validRepos: repos, notFound, }; } +export const getUserIdsWithReadAccessToRepo = async (owner: string, repo: string, octokit: Octokit) => { + const fetchFn = () => octokit.paginate(octokit.repos.listCollaborators, { + owner, + repo, + per_page: 100, + }); + + const collaborators = await fetchWithRetry(fetchFn, `repo ${owner}/${repo}`, logger); + return collaborators.map(collaborator => collaborator.id.toString()); +} + +export const createOctokitFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient): Promise<{ octokit: Octokit, isAuthenticated: boolean }> => { + const hostname = config.url ? + new URL(config.url).hostname : + GITHUB_CLOUD_HOSTNAME; + + const token = config.token ? + await getTokenFromConfig(config.token, orgId, db, logger) : + hostname === GITHUB_CLOUD_HOSTNAME ? + env.FALLBACK_GITHUB_CLOUD_TOKEN : + undefined; + + const octokit = new Octokit({ + auth: token, + ...(config.url ? { + baseUrl: `${config.url}/api/v3` + } : {}), + }); + + return { + octokit, + isAuthenticated: !!token, + }; +} + export const shouldExcludeRepo = ({ repo, include, exclude -} : { +}: { repo: OctokitRepository, include?: { topics?: GithubConnectionConfig['topics'] @@ -156,23 +177,23 @@ export const shouldExcludeRepo = ({ reason = `\`exclude.forks\` is true`; return true; } - + if (!!exclude?.archived && !!repo.archived) { reason = `\`exclude.archived\` is true`; return true; } - + if (exclude?.repos) { if (micromatch.isMatch(repoName, exclude.repos)) { reason = `\`exclude.repos\` contains ${repoName}`; return true; } } - + if (exclude?.topics) { const configTopics = exclude.topics.map(topic => topic.toLowerCase()); const repoTopics = repo.topics ?? []; - + const matchingTopics = repoTopics.filter((topic) => micromatch.isMatch(topic, configTopics)); if (matchingTopics.length > 0) { reason = `\`exclude.topics\` matches the following topics: ${matchingTopics.join(', ')}`; @@ -190,17 +211,17 @@ export const shouldExcludeRepo = ({ return true; } } - + const repoSizeInBytes = repo.size ? repo.size * 1000 : undefined; if (exclude?.size && repoSizeInBytes) { const min = exclude.size.min; const max = exclude.size.max; - + if (min && repoSizeInBytes < min) { reason = `repo is less than \`exclude.size.min\`=${min} bytes.`; return true; } - + if (max && repoSizeInBytes > max) { reason = `repo is greater than \`exclude.size.max\`=${max} bytes.`; return true; diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index c93622d6..0f0aa14b 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -1,44 +1,36 @@ import "./instrument.js"; -import * as Sentry from "@sentry/node"; +import { PrismaClient } from "@sourcebot/db"; +import { createLogger } from "@sourcebot/logger"; +import { loadConfig } from '@sourcebot/shared'; import { existsSync } from 'fs'; import { mkdir } from 'fs/promises'; +import { Redis } from 'ioredis'; import path from 'path'; -import { AppContext } from "./types.js"; -import { main } from "./main.js" -import { PrismaClient } from "@sourcebot/db"; +import { ConnectionManager } from './connectionManager.js'; +import { DEFAULT_SETTINGS } from './constants.js'; import { env } from "./env.js"; -import { createLogger } from "@sourcebot/logger"; +import { RepoPermissionSyncer } from './permissionSyncer.js'; +import { PromClient } from './promClient.js'; +import { RepoManager } from './repoManager.js'; +import { AppContext } from "./types.js"; + const logger = createLogger('backend-entrypoint'); +const getSettings = async (configPath?: string) => { + if (!configPath) { + return DEFAULT_SETTINGS; + } -// Register handler for normal exit -process.on('exit', (code) => { - logger.info(`Process is exiting with code: ${code}`); -}); + const config = await loadConfig(configPath); -// Register handlers for abnormal terminations -process.on('SIGINT', () => { - logger.info('Process interrupted (SIGINT)'); - process.exit(0); -}); + return { + ...DEFAULT_SETTINGS, + ...config.settings, + } +} -process.on('SIGTERM', () => { - logger.info('Process terminated (SIGTERM)'); - process.exit(0); -}); - -// Register handlers for uncaught exceptions and unhandled rejections -process.on('uncaughtException', (err) => { - logger.error(`Uncaught exception: ${err.message}`); - process.exit(1); -}); - -process.on('unhandledRejection', (reason, promise) => { - logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); - process.exit(1); -}); const cacheDir = env.DATA_CACHE_DIR; const reposPath = path.join(cacheDir, 'repos'); @@ -59,18 +51,60 @@ const context: AppContext = { const prisma = new PrismaClient(); -main(prisma, context) - .then(async () => { - await prisma.$disconnect(); - }) - .catch(async (e) => { - logger.error(e); - Sentry.captureException(e); +const redis = new Redis(env.REDIS_URL, { + maxRetriesPerRequest: null +}); +redis.ping().then(() => { + logger.info('Connected to redis'); +}).catch((err: unknown) => { + logger.error('Failed to connect to redis'); + logger.error(err); + process.exit(1); +}); - await prisma.$disconnect(); - process.exit(1); - }) - .finally(() => { - logger.info("Shutting down..."); - }); +const promClient = new PromClient(); +const settings = await getSettings(env.CONFIG_PATH); + +const connectionManager = new ConnectionManager(prisma, settings, redis); +const repoManager = new RepoManager(prisma, settings, redis, promClient, context); +const permissionSyncer = new RepoPermissionSyncer(prisma, redis); + +await repoManager.validateIndexedReposHaveShards(); + +const connectionManagerInterval = connectionManager.startScheduler(); +const repoManagerInterval = repoManager.startScheduler(); +const permissionSyncerInterval = env.EXPERIMENT_PERMISSION_SYNC_ENABLED ? permissionSyncer.startScheduler() : null; + + +const cleanup = async (signal: string) => { + logger.info(`Recieved ${signal}, cleaning up...`); + + if (permissionSyncerInterval) { + clearInterval(permissionSyncerInterval); + } + + clearInterval(connectionManagerInterval); + clearInterval(repoManagerInterval); + + connectionManager.dispose(); + repoManager.dispose(); + permissionSyncer.dispose(); + + await prisma.$disconnect(); + await redis.quit(); +} + +process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0))); +process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0))); + +// Register handlers for uncaught exceptions and unhandled rejections +process.on('uncaughtException', (err) => { + logger.error(`Uncaught exception: ${err.message}`); + cleanup('uncaughtException').finally(() => process.exit(1)); +}); + +process.on('unhandledRejection', (reason, promise) => { + logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); + cleanup('unhandledRejection').finally(() => process.exit(1)); +}); diff --git a/packages/backend/src/main.ts b/packages/backend/src/main.ts deleted file mode 100644 index f3cf0050..00000000 --- a/packages/backend/src/main.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { PrismaClient } from '@sourcebot/db'; -import { createLogger } from "@sourcebot/logger"; -import { AppContext } from "./types.js"; -import { DEFAULT_SETTINGS } from './constants.js'; -import { Redis } from 'ioredis'; -import { ConnectionManager } from './connectionManager.js'; -import { RepoManager } from './repoManager.js'; -import { env } from './env.js'; -import { PromClient } from './promClient.js'; -import { loadConfig } from '@sourcebot/shared'; - -const logger = createLogger('backend-main'); - -const getSettings = async (configPath?: string) => { - if (!configPath) { - return DEFAULT_SETTINGS; - } - - const config = await loadConfig(configPath); - - return { - ...DEFAULT_SETTINGS, - ...config.settings, - } -} - -export const main = async (db: PrismaClient, context: AppContext) => { - const redis = new Redis(env.REDIS_URL, { - maxRetriesPerRequest: null - }); - redis.ping().then(() => { - logger.info('Connected to redis'); - }).catch((err: unknown) => { - logger.error('Failed to connect to redis'); - logger.error(err); - process.exit(1); - }); - - const settings = await getSettings(env.CONFIG_PATH); - - const promClient = new PromClient(); - - const connectionManager = new ConnectionManager(db, settings, redis); - connectionManager.registerPollingCallback(); - - const repoManager = new RepoManager(db, settings, redis, promClient, context); - await repoManager.validateIndexedReposHaveShards(); - await repoManager.blockingPollLoop(); -} diff --git a/packages/backend/src/permissionSyncer.ts b/packages/backend/src/permissionSyncer.ts new file mode 100644 index 00000000..d2059267 --- /dev/null +++ b/packages/backend/src/permissionSyncer.ts @@ -0,0 +1,181 @@ +import { PrismaClient } from "@sourcebot/db"; +import { createLogger } from "@sourcebot/logger"; +import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type"; +import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type"; +import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type"; +import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type"; +import { Job, Queue, Worker } from 'bullmq'; +import { Redis } from 'ioredis'; +import { createOctokitFromConfig, getUserIdsWithReadAccessToRepo } from "./github.js"; +import { RepoWithConnections } from "./types.js"; + +type RepoPermissionSyncJob = { + repoId: number; +} + +const QUEUE_NAME = 'repoPermissionSyncQueue'; + +const logger = createLogger('permission-syncer'); + +export class RepoPermissionSyncer { + private queue: Queue; + private worker: Worker; + + constructor( + private db: PrismaClient, + redis: Redis, + ) { + this.queue = new Queue(QUEUE_NAME, { + connection: redis, + }); + this.worker = new Worker(QUEUE_NAME, this.runJob.bind(this), { + connection: redis, + }); + this.worker.on('completed', this.onJobCompleted.bind(this)); + this.worker.on('failed', this.onJobFailed.bind(this)); + } + + public async scheduleJob(repoId: number) { + await this.queue.add(QUEUE_NAME, { + repoId, + }); + } + + public startScheduler() { + logger.debug('Starting scheduler'); + + // @todo: we should only sync permissions for a repository if it has been at least ~24 hours since the last sync. + return setInterval(async () => { + const repos = await this.db.repo.findMany({ + where: { + external_codeHostType: { + in: ['github'], + } + } + }); + + for (const repo of repos) { + await this.scheduleJob(repo.id); + } + + // @todo: make this configurable + }, 1000 * 5); + } + + public dispose() { + this.worker.close(); + this.queue.close(); + } + + private async runJob(job: Job) { + const id = job.data.repoId; + const repo = await this.db.repo.findUnique({ + where: { + id, + }, + include: { + connections: { + include: { + connection: true, + }, + }, + }, + }); + + if (!repo) { + throw new Error(`Repo ${id} not found`); + } + + const connection = getFirstConnectionWithToken(repo); + if (!connection) { + throw new Error(`No connection with token found for repo ${id}`); + } + + const userIds = await (async () => { + if (connection.connectionType === 'github') { + const config = connection.config as unknown as GithubConnectionConfig; + const { octokit } = await createOctokitFromConfig(config, repo.orgId, this.db); + + // @nocheckin - need to handle when repo displayName is not set. + const [owner, repoName] = repo.displayName!.split('/'); + + const githubUserIds = await getUserIdsWithReadAccessToRepo(owner, repoName, octokit); + + const accounts = await this.db.account.findMany({ + where: { + provider: 'github', + providerAccountId: { + in: githubUserIds, + } + }, + select: { + userId: true, + }, + }); + + return accounts.map(account => account.userId); + } + + return []; + })(); + + logger.info(`User IDs with read access to repo ${id}: ${userIds}`); + + await this.db.repo.update({ + where: { + id: repo.id, + }, + data: { + permittedUsers: { + deleteMany: {}, + } + } + }); + + await this.db.userToRepoPermission.createMany({ + data: userIds.map(userId => ({ + userId, + repoId: repo.id, + })), + }); + } + + private async onJobCompleted(job: Job) { + logger.info(`Repo permission sync job completed for repo ${job.data.repoId}`); + } + + private async onJobFailed(job: Job | undefined, err: Error) { + logger.error(`Repo permission sync job failed for repo ${job?.data.repoId}: ${err}`); + } +} + +const getFirstConnectionWithToken = (repo: RepoWithConnections) => { + for (const { connection } of repo.connections) { + if (connection.connectionType === 'github') { + const config = connection.config as unknown as GithubConnectionConfig; + if (config.token) { + return connection; + } + } + if (connection.connectionType === 'gitlab') { + const config = connection.config as unknown as GitlabConnectionConfig; + if (config.token) { + return connection; + } + } + if (connection.connectionType === 'gitea') { + const config = connection.config as unknown as GiteaConnectionConfig; + if (config.token) { + return connection; + } + } + if (connection.connectionType === 'bitbucket') { + const config = connection.config as unknown as BitbucketConnectionConfig; + if (config.token) { + return connection; + } + } + } + + return undefined; +} \ No newline at end of file diff --git a/packages/backend/src/repoManager.ts b/packages/backend/src/repoManager.ts index d8e091ec..cc99a278 100644 --- a/packages/backend/src/repoManager.ts +++ b/packages/backend/src/repoManager.ts @@ -12,12 +12,6 @@ import { PromClient } from './promClient.js'; import * as Sentry from "@sentry/node"; import { env } from './env.js'; -interface IRepoManager { - validateIndexedReposHaveShards: () => Promise; - blockingPollLoop: () => void; - dispose: () => void; -} - const REPO_INDEXING_QUEUE = 'repoIndexingQueue'; const REPO_GC_QUEUE = 'repoGarbageCollectionQueue'; @@ -32,7 +26,7 @@ type RepoGarbageCollectionPayload = { const logger = createLogger('repo-manager'); -export class RepoManager implements IRepoManager { +export class RepoManager { private indexWorker: Worker; private indexQueue: Queue; private gcWorker: Worker; @@ -68,14 +62,13 @@ export class RepoManager implements IRepoManager { this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this)); } - public async blockingPollLoop() { - while (true) { + public startScheduler() { + logger.debug('Starting scheduler'); + return setInterval(async () => { await this.fetchAndScheduleRepoIndexing(); await this.fetchAndScheduleRepoGarbageCollection(); await this.fetchAndScheduleRepoTimeouts(); - - await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs)); - } + }, this.settings.reindexRepoPollingIntervalMs); } /////////////////////////// diff --git a/packages/backend/src/types.ts b/packages/backend/src/types.ts index 58674f49..737720b4 100644 --- a/packages/backend/src/types.ts +++ b/packages/backend/src/types.ts @@ -1,3 +1,4 @@ +import { Connection, Repo, RepoToConnection } from "@sourcebot/db"; import { Settings as SettingsSchema } from "@sourcebot/schemas/v3/index.type"; import { z } from "zod"; @@ -50,4 +51,6 @@ export type DeepPartial = T extends object ? { } : T; // @see: https://stackoverflow.com/a/69328045 -export type WithRequired = T & { [P in K]-?: T[P] }; \ No newline at end of file +export type WithRequired = T & { [P in K]-?: T[P] }; + +export type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] };