permission syncer

This commit is contained in:
bkellam 2025-09-16 22:02:04 -07:00
parent b9a91c20fe
commit 0a3a63c8f0
8 changed files with 321 additions and 141 deletions

View file

@ -11,12 +11,6 @@ import { env } from "./env.js";
import * as Sentry from "@sentry/node";
import { loadConfig, syncSearchContexts } from "@sourcebot/shared";
interface IConnectionManager {
scheduleConnectionSync: (connection: Connection) => Promise<void>;
registerPollingCallback: () => void;
dispose: () => void;
}
const QUEUE_NAME = 'connectionSyncQueue';
type JobPayload = {
@ -30,7 +24,7 @@ type JobResult = {
repoCount: number,
}
export class ConnectionManager implements IConnectionManager {
export class ConnectionManager {
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('connection-manager');
@ -75,8 +69,9 @@ export class ConnectionManager implements IConnectionManager {
});
}
public async registerPollingCallback() {
setInterval(async () => {
public startScheduler() {
this.logger.debug('Starting scheduler');
return setInterval(async () => {
const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs);
const connections = await this.db.connection.findMany({
where: {

View file

@ -52,6 +52,8 @@ export const env = createEnv({
REPO_SYNC_RETRY_BASE_SLEEP_SECONDS: numberSchema.default(60),
GITLAB_CLIENT_QUERY_TIMEOUT_SECONDS: numberSchema.default(60 * 10),
EXPERIMENT_PERMISSION_SYNC_ENABLED: booleanSchema.default("false"),
},
runtimeEnv: process.env,
emptyStringAsUndefined: true,

View file

@ -30,35 +30,21 @@ export type OctokitRepository = {
size?: number,
owner: {
avatar_url: string,
login: string,
}
}
const isHttpError = (error: unknown, status: number): boolean => {
return error !== null
return error !== null
&& typeof error === 'object'
&& 'status' in error
&& 'status' in error
&& error.status === status;
}
export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient, signal: AbortSignal) => {
const hostname = config.url ?
new URL(config.url).hostname :
GITHUB_CLOUD_HOSTNAME;
const { octokit, isAuthenticated } = await createOctokitFromConfig(config, orgId, db);
const token = config.token ?
await getTokenFromConfig(config.token, orgId, db, logger) :
hostname === GITHUB_CLOUD_HOSTNAME ?
env.FALLBACK_GITHUB_CLOUD_TOKEN :
undefined;
const octokit = new Octokit({
auth: token,
...(config.url ? {
baseUrl: `${config.url}/api/v3`
} : {}),
});
if (token) {
if (isAuthenticated) {
try {
await octokit.rest.users.getAuthenticated();
} catch (error) {
@ -127,16 +113,51 @@ export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, o
logger.debug(`Found ${repos.length} total repositories.`);
return {
validRepos: repos,
validRepos: repos,
notFound,
};
}
export const getUserIdsWithReadAccessToRepo = async (owner: string, repo: string, octokit: Octokit) => {
const fetchFn = () => octokit.paginate(octokit.repos.listCollaborators, {
owner,
repo,
per_page: 100,
});
const collaborators = await fetchWithRetry(fetchFn, `repo ${owner}/${repo}`, logger);
return collaborators.map(collaborator => collaborator.id.toString());
}
export const createOctokitFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient): Promise<{ octokit: Octokit, isAuthenticated: boolean }> => {
const hostname = config.url ?
new URL(config.url).hostname :
GITHUB_CLOUD_HOSTNAME;
const token = config.token ?
await getTokenFromConfig(config.token, orgId, db, logger) :
hostname === GITHUB_CLOUD_HOSTNAME ?
env.FALLBACK_GITHUB_CLOUD_TOKEN :
undefined;
const octokit = new Octokit({
auth: token,
...(config.url ? {
baseUrl: `${config.url}/api/v3`
} : {}),
});
return {
octokit,
isAuthenticated: !!token,
};
}
export const shouldExcludeRepo = ({
repo,
include,
exclude
} : {
}: {
repo: OctokitRepository,
include?: {
topics?: GithubConnectionConfig['topics']
@ -156,23 +177,23 @@ export const shouldExcludeRepo = ({
reason = `\`exclude.forks\` is true`;
return true;
}
if (!!exclude?.archived && !!repo.archived) {
reason = `\`exclude.archived\` is true`;
return true;
}
if (exclude?.repos) {
if (micromatch.isMatch(repoName, exclude.repos)) {
reason = `\`exclude.repos\` contains ${repoName}`;
return true;
}
}
if (exclude?.topics) {
const configTopics = exclude.topics.map(topic => topic.toLowerCase());
const repoTopics = repo.topics ?? [];
const matchingTopics = repoTopics.filter((topic) => micromatch.isMatch(topic, configTopics));
if (matchingTopics.length > 0) {
reason = `\`exclude.topics\` matches the following topics: ${matchingTopics.join(', ')}`;
@ -190,17 +211,17 @@ export const shouldExcludeRepo = ({
return true;
}
}
const repoSizeInBytes = repo.size ? repo.size * 1000 : undefined;
if (exclude?.size && repoSizeInBytes) {
const min = exclude.size.min;
const max = exclude.size.max;
if (min && repoSizeInBytes < min) {
reason = `repo is less than \`exclude.size.min\`=${min} bytes.`;
return true;
}
if (max && repoSizeInBytes > max) {
reason = `repo is greater than \`exclude.size.max\`=${max} bytes.`;
return true;

View file

@ -1,44 +1,36 @@
import "./instrument.js";
import * as Sentry from "@sentry/node";
import { PrismaClient } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger";
import { loadConfig } from '@sourcebot/shared';
import { existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { Redis } from 'ioredis';
import path from 'path';
import { AppContext } from "./types.js";
import { main } from "./main.js"
import { PrismaClient } from "@sourcebot/db";
import { ConnectionManager } from './connectionManager.js';
import { DEFAULT_SETTINGS } from './constants.js';
import { env } from "./env.js";
import { createLogger } from "@sourcebot/logger";
import { RepoPermissionSyncer } from './permissionSyncer.js';
import { PromClient } from './promClient.js';
import { RepoManager } from './repoManager.js';
import { AppContext } from "./types.js";
const logger = createLogger('backend-entrypoint');
const getSettings = async (configPath?: string) => {
if (!configPath) {
return DEFAULT_SETTINGS;
}
// Register handler for normal exit
process.on('exit', (code) => {
logger.info(`Process is exiting with code: ${code}`);
});
const config = await loadConfig(configPath);
// Register handlers for abnormal terminations
process.on('SIGINT', () => {
logger.info('Process interrupted (SIGINT)');
process.exit(0);
});
return {
...DEFAULT_SETTINGS,
...config.settings,
}
}
process.on('SIGTERM', () => {
logger.info('Process terminated (SIGTERM)');
process.exit(0);
});
// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
process.exit(1);
});
process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
process.exit(1);
});
const cacheDir = env.DATA_CACHE_DIR;
const reposPath = path.join(cacheDir, 'repos');
@ -59,18 +51,60 @@ const context: AppContext = {
const prisma = new PrismaClient();
main(prisma, context)
.then(async () => {
await prisma.$disconnect();
})
.catch(async (e) => {
logger.error(e);
Sentry.captureException(e);
const redis = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
logger.error(err);
process.exit(1);
});
await prisma.$disconnect();
process.exit(1);
})
.finally(() => {
logger.info("Shutting down...");
});
const promClient = new PromClient();
const settings = await getSettings(env.CONFIG_PATH);
const connectionManager = new ConnectionManager(prisma, settings, redis);
const repoManager = new RepoManager(prisma, settings, redis, promClient, context);
const permissionSyncer = new RepoPermissionSyncer(prisma, redis);
await repoManager.validateIndexedReposHaveShards();
const connectionManagerInterval = connectionManager.startScheduler();
const repoManagerInterval = repoManager.startScheduler();
const permissionSyncerInterval = env.EXPERIMENT_PERMISSION_SYNC_ENABLED ? permissionSyncer.startScheduler() : null;
const cleanup = async (signal: string) => {
logger.info(`Recieved ${signal}, cleaning up...`);
if (permissionSyncerInterval) {
clearInterval(permissionSyncerInterval);
}
clearInterval(connectionManagerInterval);
clearInterval(repoManagerInterval);
connectionManager.dispose();
repoManager.dispose();
permissionSyncer.dispose();
await prisma.$disconnect();
await redis.quit();
}
process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0)));
process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0)));
// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => process.exit(1));
});
process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => process.exit(1));
});

View file

@ -1,49 +0,0 @@
import { PrismaClient } from '@sourcebot/db';
import { createLogger } from "@sourcebot/logger";
import { AppContext } from "./types.js";
import { DEFAULT_SETTINGS } from './constants.js';
import { Redis } from 'ioredis';
import { ConnectionManager } from './connectionManager.js';
import { RepoManager } from './repoManager.js';
import { env } from './env.js';
import { PromClient } from './promClient.js';
import { loadConfig } from '@sourcebot/shared';
const logger = createLogger('backend-main');
const getSettings = async (configPath?: string) => {
if (!configPath) {
return DEFAULT_SETTINGS;
}
const config = await loadConfig(configPath);
return {
...DEFAULT_SETTINGS,
...config.settings,
}
}
export const main = async (db: PrismaClient, context: AppContext) => {
const redis = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
logger.error(err);
process.exit(1);
});
const settings = await getSettings(env.CONFIG_PATH);
const promClient = new PromClient();
const connectionManager = new ConnectionManager(db, settings, redis);
connectionManager.registerPollingCallback();
const repoManager = new RepoManager(db, settings, redis, promClient, context);
await repoManager.validateIndexedReposHaveShards();
await repoManager.blockingPollLoop();
}

View file

@ -0,0 +1,181 @@
import { PrismaClient } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger";
import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type";
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type";
import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type";
import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type";
import { Job, Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { createOctokitFromConfig, getUserIdsWithReadAccessToRepo } from "./github.js";
import { RepoWithConnections } from "./types.js";
type RepoPermissionSyncJob = {
repoId: number;
}
const QUEUE_NAME = 'repoPermissionSyncQueue';
const logger = createLogger('permission-syncer');
export class RepoPermissionSyncer {
private queue: Queue<RepoPermissionSyncJob>;
private worker: Worker<RepoPermissionSyncJob>;
constructor(
private db: PrismaClient,
redis: Redis,
) {
this.queue = new Queue<RepoPermissionSyncJob>(QUEUE_NAME, {
connection: redis,
});
this.worker = new Worker<RepoPermissionSyncJob>(QUEUE_NAME, this.runJob.bind(this), {
connection: redis,
});
this.worker.on('completed', this.onJobCompleted.bind(this));
this.worker.on('failed', this.onJobFailed.bind(this));
}
public async scheduleJob(repoId: number) {
await this.queue.add(QUEUE_NAME, {
repoId,
});
}
public startScheduler() {
logger.debug('Starting scheduler');
// @todo: we should only sync permissions for a repository if it has been at least ~24 hours since the last sync.
return setInterval(async () => {
const repos = await this.db.repo.findMany({
where: {
external_codeHostType: {
in: ['github'],
}
}
});
for (const repo of repos) {
await this.scheduleJob(repo.id);
}
// @todo: make this configurable
}, 1000 * 5);
}
public dispose() {
this.worker.close();
this.queue.close();
}
private async runJob(job: Job<RepoPermissionSyncJob>) {
const id = job.data.repoId;
const repo = await this.db.repo.findUnique({
where: {
id,
},
include: {
connections: {
include: {
connection: true,
},
},
},
});
if (!repo) {
throw new Error(`Repo ${id} not found`);
}
const connection = getFirstConnectionWithToken(repo);
if (!connection) {
throw new Error(`No connection with token found for repo ${id}`);
}
const userIds = await (async () => {
if (connection.connectionType === 'github') {
const config = connection.config as unknown as GithubConnectionConfig;
const { octokit } = await createOctokitFromConfig(config, repo.orgId, this.db);
// @nocheckin - need to handle when repo displayName is not set.
const [owner, repoName] = repo.displayName!.split('/');
const githubUserIds = await getUserIdsWithReadAccessToRepo(owner, repoName, octokit);
const accounts = await this.db.account.findMany({
where: {
provider: 'github',
providerAccountId: {
in: githubUserIds,
}
},
select: {
userId: true,
},
});
return accounts.map(account => account.userId);
}
return [];
})();
logger.info(`User IDs with read access to repo ${id}: ${userIds}`);
await this.db.repo.update({
where: {
id: repo.id,
},
data: {
permittedUsers: {
deleteMany: {},
}
}
});
await this.db.userToRepoPermission.createMany({
data: userIds.map(userId => ({
userId,
repoId: repo.id,
})),
});
}
private async onJobCompleted(job: Job<RepoPermissionSyncJob>) {
logger.info(`Repo permission sync job completed for repo ${job.data.repoId}`);
}
private async onJobFailed(job: Job<RepoPermissionSyncJob> | undefined, err: Error) {
logger.error(`Repo permission sync job failed for repo ${job?.data.repoId}: ${err}`);
}
}
const getFirstConnectionWithToken = (repo: RepoWithConnections) => {
for (const { connection } of repo.connections) {
if (connection.connectionType === 'github') {
const config = connection.config as unknown as GithubConnectionConfig;
if (config.token) {
return connection;
}
}
if (connection.connectionType === 'gitlab') {
const config = connection.config as unknown as GitlabConnectionConfig;
if (config.token) {
return connection;
}
}
if (connection.connectionType === 'gitea') {
const config = connection.config as unknown as GiteaConnectionConfig;
if (config.token) {
return connection;
}
}
if (connection.connectionType === 'bitbucket') {
const config = connection.config as unknown as BitbucketConnectionConfig;
if (config.token) {
return connection;
}
}
}
return undefined;
}

View file

@ -12,12 +12,6 @@ import { PromClient } from './promClient.js';
import * as Sentry from "@sentry/node";
import { env } from './env.js';
interface IRepoManager {
validateIndexedReposHaveShards: () => Promise<void>;
blockingPollLoop: () => void;
dispose: () => void;
}
const REPO_INDEXING_QUEUE = 'repoIndexingQueue';
const REPO_GC_QUEUE = 'repoGarbageCollectionQueue';
@ -32,7 +26,7 @@ type RepoGarbageCollectionPayload = {
const logger = createLogger('repo-manager');
export class RepoManager implements IRepoManager {
export class RepoManager {
private indexWorker: Worker;
private indexQueue: Queue<RepoIndexingPayload>;
private gcWorker: Worker;
@ -68,14 +62,13 @@ export class RepoManager implements IRepoManager {
this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this));
}
public async blockingPollLoop() {
while (true) {
public startScheduler() {
logger.debug('Starting scheduler');
return setInterval(async () => {
await this.fetchAndScheduleRepoIndexing();
await this.fetchAndScheduleRepoGarbageCollection();
await this.fetchAndScheduleRepoTimeouts();
await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs));
}
}, this.settings.reindexRepoPollingIntervalMs);
}
///////////////////////////

View file

@ -1,3 +1,4 @@
import { Connection, Repo, RepoToConnection } from "@sourcebot/db";
import { Settings as SettingsSchema } from "@sourcebot/schemas/v3/index.type";
import { z } from "zod";
@ -50,4 +51,6 @@ export type DeepPartial<T> = T extends object ? {
} : T;
// @see: https://stackoverflow.com/a/69328045
export type WithRequired<T, K extends keyof T> = T & { [P in K]-?: T[P] };
export type WithRequired<T, K extends keyof T> = T & { [P in K]-?: T[P] };
export type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] };