This commit is contained in:
bkellam 2025-09-20 16:00:11 -07:00
parent 0209342a65
commit 46084c3a07
15 changed files with 405 additions and 372 deletions

View file

@ -1,4 +1,4 @@
<Warning>
This is an experimental feature. Certain functionality may be buggy or incomplete, and breaking changes may ship in non-major releases. Have feedback? Submit a [issue](https://github.com/sourcebot-dev/sourcebot/issues) on GitHub.
This is an experimental feature. Certain functionality may be incomplete and breaking changes may ship in non-major releases. Have feedback? Submit a [issue](https://github.com/sourcebot-dev/sourcebot/issues) on GitHub.
</Warning>

View file

@ -28,6 +28,7 @@ export class ConnectionManager {
private worker: Worker;
private queue: Queue<JobPayload>;
private logger = createLogger('connection-manager');
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
@ -71,7 +72,7 @@ export class ConnectionManager {
public startScheduler() {
this.logger.debug('Starting scheduler');
return setInterval(async () => {
this.interval = setInterval(async () => {
const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs);
const connections = await this.db.connection.findMany({
where: {
@ -364,6 +365,9 @@ export class ConnectionManager {
}
public dispose() {
if (this.interval) {
clearInterval(this.interval);
}
this.worker.close();
this.queue.close();
}

View file

@ -1,17 +1,14 @@
import * as Sentry from "@sentry/node";
import { PrismaClient, Repo, RepoPermissionSyncJobStatus } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger";
import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type";
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type";
import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type";
import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type";
import { hasEntitlement } from "@sourcebot/shared";
import { Job, Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { env } from "../env.js";
import { createOctokitFromConfig, getUserIdsWithReadAccessToRepo } from "../github.js";
import { RepoWithConnections, Settings } from "../types.js";
import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
import { hasEntitlement } from "@sourcebot/shared";
import { env } from "../env.js";
import { createOctokitFromToken, getRepoCollaborators } from "../github.js";
import { Settings } from "../types.js";
import { getAuthCredentialsForRepo } from "../utils.js";
type RepoPermissionSyncJob = {
jobId: string;
@ -25,6 +22,7 @@ const logger = createLogger('repo-permission-syncer');
export class RepoPermissionSyncer {
private queue: Queue<RepoPermissionSyncJob>;
private worker: Worker<RepoPermissionSyncJob>;
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
@ -49,7 +47,7 @@ export class RepoPermissionSyncer {
logger.debug('Starting scheduler');
return setInterval(async () => {
this.interval = setInterval(async () => {
// @todo: make this configurable
const thresholdDate = new Date(Date.now() - this.settings.experiment_repoDrivenPermissionSyncIntervalMs);
@ -104,6 +102,9 @@ export class RepoPermissionSyncer {
}
public dispose() {
if (this.interval) {
clearInterval(this.interval);
}
this.worker.close();
this.queue.close();
}
@ -157,15 +158,17 @@ export class RepoPermissionSyncer {
logger.info(`Syncing permissions for repo ${repo.displayName}...`);
const connection = getFirstConnectionWithToken(repo);
if (!connection) {
throw new Error(`No connection with token found for repo ${id}`);
const credentials = await getAuthCredentialsForRepo(repo, this.db, logger);
if (!credentials) {
throw new Error(`No credentials found for repo ${id}`);
}
const userIds = await (async () => {
if (connection.connectionType === 'github') {
const config = connection.config as unknown as GithubConnectionConfig;
const { octokit } = await createOctokitFromConfig(config, repo.orgId, this.db);
if (repo.external_codeHostType === 'github') {
const { octokit } = await createOctokitFromToken({
token: credentials.token,
url: credentials.hostUrl,
});
// @note: this is a bit of a hack since the displayName _might_ not be set..
// however, this property was introduced many versions ago and _should_ be set
@ -176,7 +179,8 @@ export class RepoPermissionSyncer {
const [owner, repoName] = repo.displayName.split('/');
const githubUserIds = await getUserIdsWithReadAccessToRepo(owner, repoName, octokit);
const collaborators = await getRepoCollaborators(owner, repoName, octokit);
const githubUserIds = collaborators.map(collaborator => collaborator.id.toString());
const accounts = await this.db.account.findMany({
where: {
@ -268,34 +272,3 @@ export class RepoPermissionSyncer {
}
}
}
const getFirstConnectionWithToken = (repo: RepoWithConnections) => {
for (const { connection } of repo.connections) {
if (connection.connectionType === 'github') {
const config = connection.config as unknown as GithubConnectionConfig;
if (config.token) {
return connection;
}
}
if (connection.connectionType === 'gitlab') {
const config = connection.config as unknown as GitlabConnectionConfig;
if (config.token) {
return connection;
}
}
if (connection.connectionType === 'gitea') {
const config = connection.config as unknown as GiteaConnectionConfig;
if (config.token) {
return connection;
}
}
if (connection.connectionType === 'bitbucket') {
const config = connection.config as unknown as BitbucketConnectionConfig;
if (config.token) {
return connection;
}
}
}
return undefined;
}

View file

@ -1,4 +1,3 @@
import { Octokit } from "@octokit/rest";
import * as Sentry from "@sentry/node";
import { PrismaClient, User, UserPermissionSyncJobStatus } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger";
@ -6,7 +5,7 @@ import { Job, Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
import { env } from "../env.js";
import { createOctokitFromOAuthToken, getReposForAuthenticatedUser } from "../github.js";
import { createOctokitFromToken, getReposForAuthenticatedUser } from "../github.js";
import { hasEntitlement } from "@sourcebot/shared";
import { Settings } from "../types.js";
@ -22,6 +21,7 @@ type UserPermissionSyncJob = {
export class UserPermissionSyncer {
private queue: Queue<UserPermissionSyncJob>;
private worker: Worker<UserPermissionSyncJob>;
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
@ -46,7 +46,7 @@ export class UserPermissionSyncer {
logger.debug('Starting scheduler');
return setInterval(async () => {
this.interval = setInterval(async () => {
const thresholdDate = new Date(Date.now() - this.settings.experiment_userDrivenPermissionSyncIntervalMs);
const users = await this.db.user.findMany({
@ -102,6 +102,9 @@ export class UserPermissionSyncer {
}
public dispose() {
if (this.interval) {
clearInterval(this.interval);
}
this.worker.close();
this.queue.close();
}
@ -151,50 +154,61 @@ export class UserPermissionSyncer {
logger.info(`Syncing permissions for user ${user.email}...`);
for (const account of user.accounts) {
const repoIds = await (async () => {
// Get a list of all repos that the user has access to from all connected accounts.
const repoIds = await (async () => {
const aggregatedRepoIds: Set<number> = new Set();
for (const account of user.accounts) {
if (account.provider === 'github') {
const octokit = await createOctokitFromOAuthToken(account.access_token);
if (!account.access_token) {
throw new Error(`User '${user.email}' does not have an GitHub OAuth access token associated with their GitHub account.`);
}
const { octokit } = await createOctokitFromToken({
token: account.access_token,
url: env.AUTH_EE_GITHUB_BASE_URL,
});
// @note: we only care about the private repos since we don't need to build a mapping
// for public repos.
// @see: packages/web/src/prisma.ts
const repoIds = await getReposForAuthenticatedUser(/* visibility = */ 'private', octokit);
const githubRepos = await getReposForAuthenticatedUser(/* visibility = */ 'private', octokit);
const gitHubRepoIds = githubRepos.map(repo => repo.id.toString());
const repos = await this.db.repo.findMany({
where: {
external_codeHostType: 'github',
external_id: {
in: repoIds,
in: gitHubRepoIds,
}
}
});
return repos.map(repo => repo.id);
repos.forEach(repo => aggregatedRepoIds.add(repo.id));
}
}
return [];
})();
return Array.from(aggregatedRepoIds);
})();
await this.db.$transaction([
this.db.user.update({
where: {
id: user.id,
},
data: {
accessibleRepos: {
deleteMany: {},
}
await this.db.$transaction([
this.db.user.update({
where: {
id: user.id,
},
data: {
accessibleRepos: {
deleteMany: {},
}
}),
this.db.userToRepoPermission.createMany({
data: repoIds.map(repoId => ({
userId: user.id,
repoId,
}))
})
]);
}
}
}),
this.db.userToRepoPermission.createMany({
data: repoIds.map(repoId => ({
userId: user.id,
repoId,
})),
skipDuplicates: true,
})
]);
}
private async onJobCompleted(job: Job<UserPermissionSyncJob>) {

View file

@ -5,9 +5,15 @@ import { env } from './env.js';
type onProgressFn = (event: SimpleGitProgressEvent) => void;
export const cloneRepository = async (
remoteUrl: URL,
path: string,
onProgress?: onProgressFn
{
cloneUrl,
path,
onProgress,
}: {
cloneUrl: string,
path: string,
onProgress?: onProgressFn
}
) => {
try {
await mkdir(path, { recursive: true });
@ -19,7 +25,7 @@ export const cloneRepository = async (
})
await git.clone(
remoteUrl.toString(),
cloneUrl,
path,
[
"--bare",
@ -42,9 +48,15 @@ export const cloneRepository = async (
};
export const fetchRepository = async (
remoteUrl: URL,
path: string,
onProgress?: onProgressFn
{
cloneUrl,
path,
onProgress,
}: {
cloneUrl: string,
path: string,
onProgress?: onProgressFn
}
) => {
try {
const git = simpleGit({
@ -54,7 +66,7 @@ export const fetchRepository = async (
})
await git.fetch([
remoteUrl.toString(),
cloneUrl,
"+refs/heads/*:refs/heads/*",
"--prune",
"--progress"

View file

@ -41,8 +41,35 @@ const isHttpError = (error: unknown, status: number): boolean => {
&& error.status === status;
}
export const createOctokitFromToken = async ({ token, url }: { token?: string, url?: string }): Promise<{ octokit: Octokit, isAuthenticated: boolean }> => {
const octokit = new Octokit({
auth: token,
...(url ? {
baseUrl: `${url}/api/v3`
} : {}),
});
return {
octokit,
isAuthenticated: !!token,
};
}
export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient, signal: AbortSignal) => {
const { octokit, isAuthenticated } = await createOctokitFromConfig(config, orgId, db);
const hostname = config.url ?
new URL(config.url).hostname :
GITHUB_CLOUD_HOSTNAME;
const token = config.token ?
await getTokenFromConfig(config.token, orgId, db, logger) :
hostname === GITHUB_CLOUD_HOSTNAME ?
env.FALLBACK_GITHUB_CLOUD_TOKEN :
undefined;
const { octokit, isAuthenticated } = await createOctokitFromToken({
token,
url: config.url,
});
if (isAuthenticated) {
try {
@ -118,143 +145,37 @@ export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, o
};
}
export const getUserIdsWithReadAccessToRepo = async (owner: string, repo: string, octokit: Octokit) => {
const fetchFn = () => octokit.paginate(octokit.repos.listCollaborators, {
owner,
repo,
per_page: 100,
});
export const getRepoCollaborators = async (owner: string, repo: string, octokit: Octokit) => {
try {
const fetchFn = () => octokit.paginate(octokit.repos.listCollaborators, {
owner,
repo,
per_page: 100,
});
const collaborators = await fetchWithRetry(fetchFn, `repo ${owner}/${repo}`, logger);
return collaborators.map(collaborator => collaborator.id.toString());
const collaborators = await fetchWithRetry(fetchFn, `repo ${owner}/${repo}`, logger);
return collaborators;
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to fetch collaborators for repo ${owner}/${repo}.`, error);
throw error;
}
}
export const getReposForAuthenticatedUser = async (visibility: 'all' | 'private' | 'public' = 'all', octokit: Octokit) => {
const fetchFn = () => octokit.paginate(octokit.repos.listForAuthenticatedUser, {
per_page: 100,
visibility,
});
try {
const fetchFn = () => octokit.paginate(octokit.repos.listForAuthenticatedUser, {
per_page: 100,
visibility,
});
const repos = await fetchWithRetry(fetchFn, `authenticated user`, logger);
return repos.map(repo => repo.id.toString());
}
export const createOctokitFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient): Promise<{ octokit: Octokit, isAuthenticated: boolean }> => {
const hostname = config.url ?
new URL(config.url).hostname :
GITHUB_CLOUD_HOSTNAME;
const token = config.token ?
await getTokenFromConfig(config.token, orgId, db, logger) :
hostname === GITHUB_CLOUD_HOSTNAME ?
env.FALLBACK_GITHUB_CLOUD_TOKEN :
undefined;
const octokit = new Octokit({
auth: token,
...(config.url ? {
baseUrl: `${config.url}/api/v3`
} : {}),
});
return {
octokit,
isAuthenticated: !!token,
};
}
export const createOctokitFromOAuthToken = async (token: string | null): Promise<Octokit> => {
const apiUrl = env.AUTH_EE_GITHUB_BASE_URL ? `${env.AUTH_EE_GITHUB_BASE_URL}/api/v3` : "https://api.github.com";
return new Octokit({
auth: token,
baseUrl: apiUrl,
});
}
export const shouldExcludeRepo = ({
repo,
include,
exclude
}: {
repo: OctokitRepository,
include?: {
topics?: GithubConnectionConfig['topics']
},
exclude?: GithubConnectionConfig['exclude']
}) => {
let reason = '';
const repoName = repo.full_name;
const shouldExclude = (() => {
if (!repo.clone_url) {
reason = 'clone_url is undefined';
return true;
}
if (!!exclude?.forks && repo.fork) {
reason = `\`exclude.forks\` is true`;
return true;
}
if (!!exclude?.archived && !!repo.archived) {
reason = `\`exclude.archived\` is true`;
return true;
}
if (exclude?.repos) {
if (micromatch.isMatch(repoName, exclude.repos)) {
reason = `\`exclude.repos\` contains ${repoName}`;
return true;
}
}
if (exclude?.topics) {
const configTopics = exclude.topics.map(topic => topic.toLowerCase());
const repoTopics = repo.topics ?? [];
const matchingTopics = repoTopics.filter((topic) => micromatch.isMatch(topic, configTopics));
if (matchingTopics.length > 0) {
reason = `\`exclude.topics\` matches the following topics: ${matchingTopics.join(', ')}`;
return true;
}
}
if (include?.topics) {
const configTopics = include.topics.map(topic => topic.toLowerCase());
const repoTopics = repo.topics ?? [];
const matchingTopics = repoTopics.filter((topic) => micromatch.isMatch(topic, configTopics));
if (matchingTopics.length === 0) {
reason = `\`include.topics\` does not match any of the following topics: ${configTopics.join(', ')}`;
return true;
}
}
const repoSizeInBytes = repo.size ? repo.size * 1000 : undefined;
if (exclude?.size && repoSizeInBytes) {
const min = exclude.size.min;
const max = exclude.size.max;
if (min && repoSizeInBytes < min) {
reason = `repo is less than \`exclude.size.min\`=${min} bytes.`;
return true;
}
if (max && repoSizeInBytes > max) {
reason = `repo is greater than \`exclude.size.max\`=${max} bytes.`;
return true;
}
}
return false;
})();
if (shouldExclude) {
logger.debug(`Excluding repo ${repoName}. Reason: ${reason}`);
return true;
const repos = await fetchWithRetry(fetchFn, `authenticated user`, logger);
return repos;
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to fetch repositories for authenticated user.`, error);
throw error;
}
return false;
}
const getReposOwnedByUsers = async (users: string[], octokit: Octokit, signal: AbortSignal) => {
@ -409,3 +330,89 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna
notFoundRepos,
};
}
export const shouldExcludeRepo = ({
repo,
include,
exclude
}: {
repo: OctokitRepository,
include?: {
topics?: GithubConnectionConfig['topics']
},
exclude?: GithubConnectionConfig['exclude']
}) => {
let reason = '';
const repoName = repo.full_name;
const shouldExclude = (() => {
if (!repo.clone_url) {
reason = 'clone_url is undefined';
return true;
}
if (!!exclude?.forks && repo.fork) {
reason = `\`exclude.forks\` is true`;
return true;
}
if (!!exclude?.archived && !!repo.archived) {
reason = `\`exclude.archived\` is true`;
return true;
}
if (exclude?.repos) {
if (micromatch.isMatch(repoName, exclude.repos)) {
reason = `\`exclude.repos\` contains ${repoName}`;
return true;
}
}
if (exclude?.topics) {
const configTopics = exclude.topics.map(topic => topic.toLowerCase());
const repoTopics = repo.topics ?? [];
const matchingTopics = repoTopics.filter((topic) => micromatch.isMatch(topic, configTopics));
if (matchingTopics.length > 0) {
reason = `\`exclude.topics\` matches the following topics: ${matchingTopics.join(', ')}`;
return true;
}
}
if (include?.topics) {
const configTopics = include.topics.map(topic => topic.toLowerCase());
const repoTopics = repo.topics ?? [];
const matchingTopics = repoTopics.filter((topic) => micromatch.isMatch(topic, configTopics));
if (matchingTopics.length === 0) {
reason = `\`include.topics\` does not match any of the following topics: ${configTopics.join(', ')}`;
return true;
}
}
const repoSizeInBytes = repo.size ? repo.size * 1000 : undefined;
if (exclude?.size && repoSizeInBytes) {
const min = exclude.size.min;
const max = exclude.size.max;
if (min && repoSizeInBytes < min) {
reason = `repo is less than \`exclude.size.min\`=${min} bytes.`;
return true;
}
if (max && repoSizeInBytes > max) {
reason = `repo is greater than \`exclude.size.max\`=${max} bytes.`;
return true;
}
}
return false;
})();
if (shouldExclude) {
logger.debug(`Excluding repo ${repoName}. Reason: ${reason}`);
return true;
}
return false;
}

View file

@ -74,34 +74,21 @@ const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
await repoManager.validateIndexedReposHaveShards();
const connectionManagerInterval = connectionManager.startScheduler();
const repoManagerInterval = repoManager.startScheduler();
let repoPermissionSyncerInterval: NodeJS.Timeout | null = null;
let userPermissionSyncerInterval: NodeJS.Timeout | null = null;
connectionManager.startScheduler();
repoManager.startScheduler();
if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && !hasEntitlement('permission-syncing')) {
logger.error('Permission syncing is not supported in current plan. Please contact support@sourcebot.dev for assistance.');
process.exit(1);
}
else if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && hasEntitlement('permission-syncing')) {
repoPermissionSyncerInterval = repoPermissionSyncer.startScheduler();
userPermissionSyncerInterval = userPermissionSyncer.startScheduler();
repoPermissionSyncer.startScheduler();
userPermissionSyncer.startScheduler();
}
const cleanup = async (signal: string) => {
logger.info(`Recieved ${signal}, cleaning up...`);
if (userPermissionSyncerInterval) {
clearInterval(userPermissionSyncerInterval);
}
if (repoPermissionSyncerInterval) {
clearInterval(repoPermissionSyncerInterval);
}
clearInterval(connectionManagerInterval);
clearInterval(repoManagerInterval);
connectionManager.dispose();
repoManager.dispose();
repoPermissionSyncer.dispose();

View file

@ -1,21 +1,19 @@
import { Job, Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { createLogger } from "@sourcebot/logger";
import { Connection, PrismaClient, Repo, RepoToConnection, RepoIndexingStatus, StripeSubscriptionStatus } from "@sourcebot/db";
import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig, BitbucketConnectionConfig, AzureDevOpsConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
import { AppContext, Settings, repoMetadataSchema } from "./types.js";
import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./utils.js";
import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from "./git.js";
import { existsSync, readdirSync, promises } from 'fs';
import { indexGitRepository } from "./zoekt.js";
import { PromClient } from './promClient.js';
import * as Sentry from "@sentry/node";
import { PrismaClient, Repo, RepoIndexingStatus, StripeSubscriptionStatus } from "@sourcebot/db";
import { createLogger } from "@sourcebot/logger";
import { Job, Queue, Worker } from 'bullmq';
import { existsSync, promises, readdirSync } from 'fs';
import { Redis } from 'ioredis';
import { env } from './env.js';
import { cloneRepository, fetchRepository, unsetGitConfig, upsertGitConfig } from "./git.js";
import { PromClient } from './promClient.js';
import { AppContext, RepoWithConnections, Settings, repoMetadataSchema } from "./types.js";
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from "./utils.js";
import { indexGitRepository } from "./zoekt.js";
const REPO_INDEXING_QUEUE = 'repoIndexingQueue';
const REPO_GC_QUEUE = 'repoGarbageCollectionQueue';
type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] };
type RepoIndexingPayload = {
repo: RepoWithConnections,
}
@ -31,6 +29,7 @@ export class RepoManager {
private indexQueue: Queue<RepoIndexingPayload>;
private gcWorker: Worker;
private gcQueue: Queue<RepoGarbageCollectionPayload>;
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
@ -64,7 +63,7 @@ export class RepoManager {
public startScheduler() {
logger.debug('Starting scheduler');
return setInterval(async () => {
this.interval = setInterval(async () => {
await this.fetchAndScheduleRepoIndexing();
await this.fetchAndScheduleRepoGarbageCollection();
await this.fetchAndScheduleRepoTimeouts();
@ -162,68 +161,6 @@ export class RepoManager {
}
}
// TODO: do this better? ex: try using the tokens from all the connections
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referencing.
private async getCloneCredentialsForRepo(repo: RepoWithConnections, db: PrismaClient): Promise<{ username?: string, password: string } | undefined> {
for (const { connection } of repo.connections) {
if (connection.connectionType === 'github') {
const config = connection.config as unknown as GithubConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
password: token,
}
}
} else if (connection.connectionType === 'gitlab') {
const config = connection.config as unknown as GitlabConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
username: 'oauth2',
password: token,
}
}
} else if (connection.connectionType === 'gitea') {
const config = connection.config as unknown as GiteaConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
password: token,
}
}
} else if (connection.connectionType === 'bitbucket') {
const config = connection.config as unknown as BitbucketConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
const username = config.user ?? 'x-token-auth';
return {
username,
password: token,
}
}
} else if (connection.connectionType === 'azuredevops') {
const config = connection.config as unknown as AzureDevOpsConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
// @note: If we don't provide a username, the password will be set as the username. This seems to work
// for ADO cloud but not for ADO server. To fix this, we set a placeholder username to ensure the password
// is set correctly
username: 'user',
password: token,
}
}
}
}
return undefined;
}
private async syncGitRepository(repo: RepoWithConnections, repoAlreadyInIndexingState: boolean) {
const { path: repoPath, isReadOnly } = getRepoPath(repo, this.ctx);
@ -236,21 +173,8 @@ export class RepoManager {
await promises.rm(repoPath, { recursive: true, force: true });
}
const credentials = await this.getCloneCredentialsForRepo(repo, this.db);
const remoteUrl = new URL(repo.cloneUrl);
if (credentials) {
// @note: URL has a weird behavior where if you set the password but
// _not_ the username, the ":" delimiter will still be present in the
// URL (e.g., https://:password@example.com). To get around this, if
// we only have a password, we set the username to the password.
// @see: https://www.typescriptlang.org/play/?#code/MYewdgzgLgBArgJwDYwLwzAUwO4wKoBKAMgBQBEAFlFAA4QBcA9I5gB4CGAtjUpgHShOZADQBKANwAoREj412ECNhAIAJmhhl5i5WrJTQkELz5IQAcxIy+UEAGUoCAJZhLo0UA
if (!credentials.username) {
remoteUrl.username = credentials.password;
} else {
remoteUrl.username = credentials.username;
remoteUrl.password = credentials.password;
}
}
const credentials = await getAuthCredentialsForRepo(repo, this.db);
const cloneUrlMaybeWithToken = credentials?.cloneUrlWithToken ?? repo.cloneUrl;
if (existsSync(repoPath) && !isReadOnly) {
// @NOTE: in #483, we changed the cloning method s.t., we _no longer_
@ -262,13 +186,13 @@ export class RepoManager {
await unsetGitConfig(repoPath, ["remote.origin.url"]);
logger.info(`Fetching ${repo.displayName}...`);
const { durationMs } = await measure(() => fetchRepository(
remoteUrl,
repoPath,
({ method, stage, progress }) => {
const { durationMs } = await measure(() => fetchRepository({
cloneUrl: cloneUrlMaybeWithToken,
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
}
));
}));
const fetchDuration_s = durationMs / 1000;
process.stdout.write('\n');
@ -277,13 +201,13 @@ export class RepoManager {
} else if (!isReadOnly) {
logger.info(`Cloning ${repo.displayName}...`);
const { durationMs } = await measure(() => cloneRepository(
remoteUrl,
repoPath,
({ method, stage, progress }) => {
const { durationMs } = await measure(() => cloneRepository({
cloneUrl: cloneUrlMaybeWithToken,
path: repoPath,
onProgress: ({ method, stage, progress }) => {
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
}
));
}));
const cloneDuration_s = durationMs / 1000;
process.stdout.write('\n');
@ -628,6 +552,9 @@ export class RepoManager {
}
public async dispose() {
if (this.interval) {
clearInterval(this.interval);
}
this.indexWorker.close();
this.indexQueue.close();
this.gcQueue.close();

View file

@ -54,3 +54,10 @@ export type DeepPartial<T> = T extends object ? {
export type WithRequired<T, K extends keyof T> = T & { [P in K]-?: T[P] };
export type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection })[] };
export type RepoAuthCredentials = {
hostUrl?: string;
token: string;
cloneUrlWithToken: string;
}

View file

@ -1,10 +1,11 @@
import { Logger } from "winston";
import { AppContext } from "./types.js";
import { AppContext, RepoAuthCredentials, RepoWithConnections } from "./types.js";
import path from 'path';
import { PrismaClient, Repo } from "@sourcebot/db";
import { getTokenFromConfig as getTokenFromConfigBase } from "@sourcebot/crypto";
import { BackendException, BackendError } from "@sourcebot/error";
import * as Sentry from "@sentry/node";
import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig, BitbucketConnectionConfig, AzureDevOpsConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
export const measure = async <T>(cb: () => Promise<T>) => {
const start = Date.now();
@ -117,3 +118,114 @@ export const fetchWithRetry = async <T>(
}
}
}
// TODO: do this better? ex: try using the tokens from all the connections
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referencing.
export const getAuthCredentialsForRepo = async (repo: RepoWithConnections, db: PrismaClient, logger?: Logger): Promise<RepoAuthCredentials | undefined> => {
for (const { connection } of repo.connections) {
if (connection.connectionType === 'github') {
const config = connection.config as unknown as GithubConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
hostUrl: config.url,
token,
cloneUrlWithToken: createGitCloneUrlWithToken(
repo.cloneUrl,
{
password: token,
}
),
}
}
} else if (connection.connectionType === 'gitlab') {
const config = connection.config as unknown as GitlabConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
hostUrl: config.url,
token,
cloneUrlWithToken: createGitCloneUrlWithToken(
repo.cloneUrl,
{
username: 'oauth2',
password: token
}
),
}
}
} else if (connection.connectionType === 'gitea') {
const config = connection.config as unknown as GiteaConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
hostUrl: config.url,
token,
cloneUrlWithToken: createGitCloneUrlWithToken(
repo.cloneUrl,
{
password: token
}
),
}
}
} else if (connection.connectionType === 'bitbucket') {
const config = connection.config as unknown as BitbucketConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
const username = config.user ?? 'x-token-auth';
return {
hostUrl: config.url,
token,
cloneUrlWithToken: createGitCloneUrlWithToken(
repo.cloneUrl,
{
username,
password: token
}
),
}
}
} else if (connection.connectionType === 'azuredevops') {
const config = connection.config as unknown as AzureDevOpsConnectionConfig;
if (config.token) {
const token = await getTokenFromConfig(config.token, connection.orgId, db, logger);
return {
hostUrl: config.url,
token,
cloneUrlWithToken: createGitCloneUrlWithToken(
repo.cloneUrl,
{
// @note: If we don't provide a username, the password will be set as the username. This seems to work
// for ADO cloud but not for ADO server. To fix this, we set a placeholder username to ensure the password
// is set correctly
username: 'user',
password: token
}
),
}
}
}
}
return undefined;
}
const createGitCloneUrlWithToken = (cloneUrl: string, credentials: { username?: string, password: string }) => {
const url = new URL(cloneUrl);
// @note: URL has a weird behavior where if you set the password but
// _not_ the username, the ":" delimiter will still be present in the
// URL (e.g., https://:password@example.com). To get around this, if
// we only have a password, we set the username to the password.
// @see: https://www.typescriptlang.org/play/?#code/MYewdgzgLgBArgJwDYwLwzAUwO4wKoBKAMgBQBEAFlFAA4QBcA9I5gB4CGAtjUpgHShOZADQBKANwAoREj412ECNhAIAJmhhl5i5WrJTQkELz5IQAcxIy+UEAGUoCAJZhLo0UA
if (!credentials.username) {
url.username = credentials.password;
} else {
url.username = credentials.username;
url.password = credentials.password;
}
return url.toString();
}

View file

@ -40,7 +40,7 @@ import JoinRequestSubmittedEmail from "./emails/joinRequestSubmittedEmail";
import { AGENTIC_SEARCH_TUTORIAL_DISMISSED_COOKIE_NAME, MOBILE_UNSUPPORTED_SPLASH_SCREEN_DISMISSED_COOKIE_NAME, SEARCH_MODE_COOKIE_NAME, SINGLE_TENANT_ORG_DOMAIN, SOURCEBOT_GUEST_USER_ID, SOURCEBOT_SUPPORT_EMAIL } from "./lib/constants";
import { orgDomainSchema, orgNameSchema, repositoryQuerySchema } from "./lib/schemas";
import { ApiKeyPayload, TenancyMode } from "./lib/types";
import { withOptionalAuthV2 } from "./withAuthV2";
import { withAuthV2, withOptionalAuthV2 } from "./withAuthV2";
const ajv = new Ajv({
validateFormats: false,
@ -1017,31 +1017,22 @@ export const flagConnectionForSync = async (connectionId: number, domain: string
})
));
export const flagReposForIndex = async (repoIds: number[], domain: string) => sew(() =>
withAuth((userId) =>
withOrgMembership(userId, domain, async ({ org }) => {
await prisma.repo.updateMany({
where: {
id: { in: repoIds },
orgId: org.id,
...(env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' ? {
permittedUsers: {
some: {
userId: userId,
}
}
} : {})
},
data: {
repoIndexingStatus: RepoIndexingStatus.NEW,
}
});
return {
success: true,
export const flagReposForIndex = async (repoIds: number[]) => sew(() =>
withAuthV2(async ({ org, prisma }) => {
await prisma.repo.updateMany({
where: {
id: { in: repoIds },
orgId: org.id,
},
data: {
repoIndexingStatus: RepoIndexingStatus.NEW,
}
})
));
});
return {
success: true,
}
}));
export const deleteConnection = async (connectionId: number, domain: string): Promise<{ success: boolean } | ServiceError> => sew(() =>
withAuth((userId) =>

View file

@ -98,7 +98,7 @@ export const RepoList = ({ connectionId }: RepoListProps) => {
}
setIsRetryAllFailedReposLoading(true);
flagReposForIndex(failedRepos.map((repo) => repo.repoId), domain)
flagReposForIndex(failedRepos.map((repo) => repo.repoId))
.then((response) => {
if (isServiceError(response)) {
captureEvent('wa_connection_retry_all_failed_repos_fail', {});
@ -116,7 +116,7 @@ export const RepoList = ({ connectionId }: RepoListProps) => {
.finally(() => {
setIsRetryAllFailedReposLoading(false);
});
}, [captureEvent, domain, failedRepos, refetchRepos, toast]);
}, [captureEvent, failedRepos, refetchRepos, toast]);
const filteredRepos = useMemo(() => {
if (isServiceError(unfilteredRepos)) {

View file

@ -70,7 +70,7 @@ export const RepoListItem = ({
</div>
<div className="flex flex-row items-center gap-4">
{status === RepoIndexingStatus.FAILED && (
<RetryRepoIndexButton repoId={repoId} domain={domain} />
<RetryRepoIndexButton repoId={repoId} />
)}
<div className="flex flex-row items-center gap-0">
<StatusIcon

View file

@ -9,10 +9,9 @@ import useCaptureEvent from "@/hooks/useCaptureEvent";
interface RetryRepoIndexButtonProps {
repoId: number;
domain: string;
}
export const RetryRepoIndexButton = ({ repoId, domain }: RetryRepoIndexButtonProps) => {
export const RetryRepoIndexButton = ({ repoId }: RetryRepoIndexButtonProps) => {
const captureEvent = useCaptureEvent();
return (
@ -21,7 +20,7 @@ export const RetryRepoIndexButton = ({ repoId, domain }: RetryRepoIndexButtonPro
size="sm"
className="ml-2"
onClick={async () => {
const result = await flagReposForIndex([repoId], domain);
const result = await flagReposForIndex([repoId]);
if (isServiceError(result)) {
toast({
description: `❌ Failed to flag repository for indexing.`,

View file

@ -20,7 +20,7 @@ interface OptionalAuthContext {
interface RequiredAuthContext {
user: User;
org: Org;
role: Omit<OrgRole, 'GUEST'>;
role: Exclude<OrgRole, 'GUEST'>;
prisma: PrismaClient;
}