add retries for 429 issues (#196)

* add connection compile retry and hard repo limit

* add more retry checks

* cleanup unused change

* address feedback
This commit is contained in:
Michael Sukkarieh 2025-02-15 16:37:50 -08:00 committed by GitHub
parent 331a41888e
commit 54d14ea98e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 256 additions and 105 deletions

View file

@ -28,4 +28,9 @@ clean:
packages/crypto/dist \ packages/crypto/dist \
.sourcebot .sourcebot
soft-reset:
rm -rf .sourcebot
redis-cli FLUSHALL
.PHONY: bin .PHONY: bin

View file

@ -18,7 +18,7 @@
}, },
"repos": [ "repos": [
"torvalds/linux", "torvalds/linux",
"pytorch/pytorch", "pytorch/pytorch",
"commaai/openpilot", "commaai/openpilot",
"ggerganov/whisper.cpp", "ggerganov/whisper.cpp",
"ggerganov/llama.cpp", "ggerganov/llama.cpp",
@ -42,7 +42,6 @@
"TheAlgorithms/Python", "TheAlgorithms/Python",
"trekhleb/javascript-algorithms", "trekhleb/javascript-algorithms",
"tensorflow/tensorflow", "tensorflow/tensorflow",
"torvalds/linux",
"getify/You-Dont-Know-JS", "getify/You-Dont-Know-JS",
"CyC2018/CS-Notes", "CyC2018/CS-Notes",
"ohmyzsh/ohmyzsh", "ohmyzsh/ohmyzsh",
@ -106,10 +105,8 @@
"Hack-with-Github/Awesome-Hacking", "Hack-with-Github/Awesome-Hacking",
"nvbn/thefuck", "nvbn/thefuck",
"mtdvio/every-programmer-should-know", "mtdvio/every-programmer-should-know",
"pytorch/pytorch",
"storybookjs/storybook", "storybookjs/storybook",
"neovim/neovim", "neovim/neovim",
"tailwindlabs/tailwindcss",
"microsoft/Web-Dev-For-Beginners", "microsoft/Web-Dev-For-Beginners",
"django/django", "django/django",
"florinpop17/app-ideas", "florinpop17/app-ideas",
@ -153,7 +150,6 @@
"fighting41love/funNLP", "fighting41love/funNLP",
"vitejs/vite", "vitejs/vite",
"thedaviddias/Front-End-Checklist", "thedaviddias/Front-End-Checklist",
"ggerganov/llama.cpp",
"coder/code-server", "coder/code-server",
"moby/moby", "moby/moby",
"CompVis/stable-diffusion", "CompVis/stable-diffusion",

View file

@ -6,7 +6,7 @@ import { Settings } from "./types.js";
export const DEFAULT_SETTINGS: Settings = { export const DEFAULT_SETTINGS: Settings = {
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
autoDeleteStaleRepos: true, autoDeleteStaleRepos: true,
reindexIntervalMs: 1000 * 60, reindexIntervalMs: 1000 * 60 * 60, // 1 hour
resyncConnectionPollingIntervalMs: 1000, resyncConnectionPollingIntervalMs: 1000,
reindexRepoPollingIntervalMs: 1000, reindexRepoPollingIntervalMs: 1000,
indexConcurrencyMultiple: 3, indexConcurrencyMultiple: 3,

View file

@ -18,6 +18,10 @@ export const getEnvBoolean = (env: string | undefined, defaultValue: boolean) =>
dotenv.config({ dotenv.config({
path: './.env', path: './.env',
}); });
dotenv.config({
path: './.env.local',
override: true
});
export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!; export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!;
@ -26,3 +30,7 @@ export const SOURCEBOT_INSTALL_ID = getEnv(process.env.SOURCEBOT_INSTALL_ID, 'un
export const SOURCEBOT_VERSION = getEnv(process.env.SOURCEBOT_VERSION, 'unknown')!; export const SOURCEBOT_VERSION = getEnv(process.env.SOURCEBOT_VERSION, 'unknown')!;
export const POSTHOG_PAPIK = getEnv(process.env.POSTHOG_PAPIK); export const POSTHOG_PAPIK = getEnv(process.env.POSTHOG_PAPIK);
export const POSTHOG_HOST = getEnv(process.env.POSTHOG_HOST); export const POSTHOG_HOST = getEnv(process.env.POSTHOG_HOST);
export const FALLBACK_GITHUB_TOKEN = getEnv(process.env.FALLBACK_GITHUB_TOKEN);
export const FALLBACK_GITLAB_TOKEN = getEnv(process.env.FALLBACK_GITLAB_TOKEN);
export const FALLBACK_GITEA_TOKEN = getEnv(process.env.FALLBACK_GITEA_TOKEN);

View file

@ -2,7 +2,7 @@ import fetch from 'cross-fetch';
import { GerritConfig } from "@sourcebot/schemas/v2/index.type" import { GerritConfig } from "@sourcebot/schemas/v2/index.type"
import { createLogger } from './logger.js'; import { createLogger } from './logger.js';
import micromatch from "micromatch"; import micromatch from "micromatch";
import { measure, marshalBool, excludeReposByName, includeReposByName } from './utils.js'; import { measure, marshalBool, excludeReposByName, includeReposByName, fetchWithRetry } from './utils.js';
// https://gerrit-review.googlesource.com/Documentation/rest-api.html // https://gerrit-review.googlesource.com/Documentation/rest-api.html
interface GerritProjects { interface GerritProjects {
@ -30,13 +30,13 @@ interface GerritWebLink {
const logger = createLogger('Gerrit'); const logger = createLogger('Gerrit');
export const getGerritReposFromConfig = async (config: GerritConfig): Promise<GerritProject[]> => { export const getGerritReposFromConfig = async (config: GerritConfig): Promise<GerritProject[]> => {
const url = config.url.endsWith('/') ? config.url : `${config.url}/`; const url = config.url.endsWith('/') ? config.url : `${config.url}/`;
const hostname = new URL(config.url).hostname; const hostname = new URL(config.url).hostname;
let { durationMs, data: projects } = await measure(async () => { let { durationMs, data: projects } = await measure(async () => {
try { try {
return fetchAllProjects(url) const fetchFn = () => fetchAllProjects(url);
return fetchWithRetry(fetchFn, `projects from ${url}`, logger);
} catch (err) { } catch (err) {
logger.error(`Failed to fetch projects from ${url}`, err); logger.error(`Failed to fetch projects from ${url}`, err);
return null; return null;
@ -44,7 +44,7 @@ export const getGerritReposFromConfig = async (config: GerritConfig): Promise<Ge
}); });
if (!projects) { if (!projects) {
return []; throw new Error(`Failed to fetch projects from ${url}`);
} }
// exclude "All-Projects" and "All-Users" projects // exclude "All-Projects" and "All-Users" projects

View file

@ -99,7 +99,7 @@ export const getGitRepoFromConfig = async (config: GitConfig, ctx: AppContext) =
.filter(Boolean) .filter(Boolean)
.map(branch => branch.replace('refs/heads/', '')); .map(branch => branch.replace('refs/heads/', ''));
repo.branches = branches.filter(branch => repo.branches = branches.filter(branch =>
branchGlobs.some(glob => new RegExp(glob).test(branch)) branchGlobs.some(glob => new RegExp(glob).test(branch))
); );
} }
@ -114,7 +114,7 @@ export const getGitRepoFromConfig = async (config: GitConfig, ctx: AppContext) =
.filter(Boolean) .filter(Boolean)
.map(tag => tag.replace('refs/tags/', '')); .map(tag => tag.replace('refs/tags/', ''));
repo.tags = tags.filter(tag => repo.tags = tags.filter(tag =>
tagGlobs.some(glob => new RegExp(glob).test(tag)) tagGlobs.some(glob => new RegExp(glob).test(tag))
); );
} }

View file

@ -1,36 +1,47 @@
import { Api, giteaApi, HttpResponse, Repository as GiteaRepository } from 'gitea-js'; import { Api, giteaApi, HttpResponse, Repository as GiteaRepository } from 'gitea-js';
import { GiteaConnectionConfig } from '@sourcebot/schemas/v3/gitea.type'; import { GiteaConnectionConfig } from '@sourcebot/schemas/v3/gitea.type';
import { getTokenFromConfig, measure } from './utils.js'; import { getTokenFromConfig, measure, fetchWithRetry } from './utils.js';
import fetch from 'cross-fetch'; import fetch from 'cross-fetch';
import { createLogger } from './logger.js'; import { createLogger } from './logger.js';
import micromatch from 'micromatch'; import micromatch from 'micromatch';
import { PrismaClient } from '@sourcebot/db'; import { PrismaClient } from '@sourcebot/db';
import { FALLBACK_GITEA_TOKEN } from './environment.js';
const logger = createLogger('Gitea'); const logger = createLogger('Gitea');
export const getGiteaReposFromConfig = async (config: GiteaConnectionConfig, orgId: number, db: PrismaClient) => { export const getGiteaReposFromConfig = async (config: GiteaConnectionConfig, orgId: number, db: PrismaClient) => {
// TODO: pass in DB here to fetch secret properly
const token = config.token ? await getTokenFromConfig(config.token, orgId, db) : undefined; const token = config.token ? await getTokenFromConfig(config.token, orgId, db) : undefined;
const api = giteaApi(config.url ?? 'https://gitea.com', { const api = giteaApi(config.url ?? 'https://gitea.com', {
token, token: token ?? FALLBACK_GITEA_TOKEN,
customFetch: fetch, customFetch: fetch,
}); });
let allRepos: GiteaRepository[] = []; let allRepos: GiteaRepository[] = [];
if (config.orgs) { if (config.orgs) {
const _repos = await getReposForOrgs(config.orgs, api); const _repos = await fetchWithRetry(
() => getReposForOrgs(config.orgs!, api),
`orgs ${config.orgs.join(', ')}`,
logger
);
allRepos = allRepos.concat(_repos); allRepos = allRepos.concat(_repos);
} }
if (config.repos) { if (config.repos) {
const _repos = await getRepos(config.repos, api); const _repos = await fetchWithRetry(
() => getRepos(config.repos!, api),
`repos ${config.repos.join(', ')}`,
logger
);
allRepos = allRepos.concat(_repos); allRepos = allRepos.concat(_repos);
} }
if (config.users) { if (config.users) {
const _repos = await getReposOwnedByUsers(config.users, api); const _repos = await fetchWithRetry(
() => getReposOwnedByUsers(config.users!, api),
`users ${config.users.join(', ')}`,
logger
);
allRepos = allRepos.concat(_repos); allRepos = allRepos.concat(_repos);
} }
@ -50,7 +61,11 @@ export const getGiteaReposFromConfig = async (config: GiteaConnectionConfig, org
allRepos = await Promise.all( allRepos = await Promise.all(
allRepos.map(async (repo) => { allRepos.map(async (repo) => {
const [owner, name] = repo.full_name!.split('/'); const [owner, name] = repo.full_name!.split('/');
let branches = (await getBranchesForRepo(owner, name, api)).map(branch => branch.name!); let branches = (await fetchWithRetry(
() => getBranchesForRepo(owner, name, api),
`branches for ${owner}/${name}`,
logger
)).map(branch => branch.name!);
branches = micromatch.match(branches, branchGlobs); branches = micromatch.match(branches, branchGlobs);
return { return {
@ -66,7 +81,11 @@ export const getGiteaReposFromConfig = async (config: GiteaConnectionConfig, org
allRepos = await Promise.all( allRepos = await Promise.all(
allRepos.map(async (allRepos) => { allRepos.map(async (allRepos) => {
const [owner, name] = allRepos.name!.split('/'); const [owner, name] = allRepos.name!.split('/');
let tags = (await getTagsForRepo(owner, name, api)).map(tag => tag.name!); let tags = (await fetchWithRetry(
() => getTagsForRepo(owner, name, api),
`tags for ${owner}/${name}`,
logger
)).map(tag => tag.name!);
tags = micromatch.match(tags, tagGlobs); tags = micromatch.match(tags, tagGlobs);
return { return {
@ -146,7 +165,7 @@ const getTagsForRepo = async <T>(owner: string, repo: string, api: Api<T>) => {
return tags; return tags;
} catch (e) { } catch (e) {
logger.error(`Failed to fetch tags for repo ${owner}/${repo}.`, e); logger.error(`Failed to fetch tags for repo ${owner}/${repo}.`, e);
return []; throw e;
} }
} }
@ -162,7 +181,7 @@ const getBranchesForRepo = async <T>(owner: string, repo: string, api: Api<T>) =
return branches; return branches;
} catch (e) { } catch (e) {
logger.error(`Failed to fetch branches for repo ${owner}/${repo}.`, e); logger.error(`Failed to fetch branches for repo ${owner}/${repo}.`, e);
return []; throw e;
} }
} }
@ -181,7 +200,7 @@ const getReposOwnedByUsers = async <T>(users: string[], api: Api<T>) => {
return data; return data;
} catch (e) { } catch (e) {
logger.error(`Failed to fetch repos for user ${user}.`, e); logger.error(`Failed to fetch repos for user ${user}.`, e);
return []; throw e;
} }
}))).flat(); }))).flat();
@ -204,7 +223,7 @@ const getReposForOrgs = async <T>(orgs: string[], api: Api<T>) => {
return data; return data;
} catch (e) { } catch (e) {
logger.error(`Failed to fetch repos for org ${org}.`, e); logger.error(`Failed to fetch repos for org ${org}.`, e);
return []; throw e;
} }
}))).flat(); }))).flat();
} }
@ -224,7 +243,7 @@ const getRepos = async <T>(repos: string[], api: Api<T>) => {
return [response.data]; return [response.data];
} catch (e) { } catch (e) {
logger.error(`Failed to fetch repository info for ${repo}.`, e); logger.error(`Failed to fetch repository info for ${repo}.`, e);
return []; throw e;
} }
}))).flat(); }))).flat();
} }

View file

@ -1,11 +1,10 @@
import { Octokit } from "@octokit/rest"; import { Octokit } from "@octokit/rest";
import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type"; import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type";
import { createLogger } from "./logger.js"; import { createLogger } from "./logger.js";
import { AppContext } from "./types.js"; import { getTokenFromConfig, measure, fetchWithRetry } from "./utils.js";
import { getTokenFromConfig, measure } from "./utils.js";
import micromatch from "micromatch"; import micromatch from "micromatch";
import { PrismaClient } from "@sourcebot/db"; import { PrismaClient } from "@sourcebot/db";
import { FALLBACK_GITHUB_TOKEN } from "./environment.js";
const logger = createLogger("GitHub"); const logger = createLogger("GitHub");
export type OctokitRepository = { export type OctokitRepository = {
@ -33,7 +32,7 @@ export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, o
const token = config.token ? await getTokenFromConfig(config.token, orgId, db) : undefined; const token = config.token ? await getTokenFromConfig(config.token, orgId, db) : undefined;
const octokit = new Octokit({ const octokit = new Octokit({
auth: token, auth: token ?? FALLBACK_GITHUB_TOKEN,
...(config.url ? { ...(config.url ? {
baseUrl: `${config.url}/api/v3` baseUrl: `${config.url}/api/v3`
} : {}), } : {}),
@ -78,7 +77,7 @@ export const getGitHubReposFromConfig = async (config: GithubConnectionConfig, o
export const getGitHubRepoFromId = async (id: string, hostURL: string, token?: string) => { export const getGitHubRepoFromId = async (id: string, hostURL: string, token?: string) => {
const octokit = new Octokit({ const octokit = new Octokit({
auth: token, auth: token ?? FALLBACK_GITHUB_TOKEN,
...(hostURL !== 'https://github.com' ? { ...(hostURL !== 'https://github.com' ? {
baseUrl: `${hostURL}/api/v3` baseUrl: `${hostURL}/api/v3`
} : {}) } : {})
@ -182,31 +181,34 @@ const getReposOwnedByUsers = async (users: string[], isAuthenticated: boolean, o
logger.debug(`Fetching repository info for user ${user}...`); logger.debug(`Fetching repository info for user ${user}...`);
const { durationMs, data } = await measure(async () => { const { durationMs, data } = await measure(async () => {
if (isAuthenticated) { const fetchFn = async () => {
return octokit.paginate(octokit.repos.listForAuthenticatedUser, { if (isAuthenticated) {
username: user, return octokit.paginate(octokit.repos.listForAuthenticatedUser, {
visibility: 'all', username: user,
affiliation: 'owner', visibility: 'all',
per_page: 100, affiliation: 'owner',
request: { per_page: 100,
signal, request: {
}, signal,
}); },
} else { });
return octokit.paginate(octokit.repos.listForUser, { } else {
username: user, return octokit.paginate(octokit.repos.listForUser, {
per_page: 100, username: user,
request: { per_page: 100,
signal, request: {
}, signal,
}); },
} });
}
};
return fetchWithRetry(fetchFn, `user ${user}`, logger);
}); });
logger.debug(`Found ${data.length} owned by user ${user} in ${durationMs}ms.`); logger.debug(`Found ${data.length} owned by user ${user} in ${durationMs}ms.`);
return data; return data;
} catch (e) { } catch (e) {
// @todo: handle rate limiting errors
logger.error(`Failed to fetch repository info for user ${user}.`, e); logger.error(`Failed to fetch repository info for user ${user}.`, e);
throw e; throw e;
} }
@ -218,20 +220,23 @@ const getReposOwnedByUsers = async (users: string[], isAuthenticated: boolean, o
const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSignal) => { const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSignal) => {
const repos = (await Promise.all(orgs.map(async (org) => { const repos = (await Promise.all(orgs.map(async (org) => {
try { try {
logger.debug(`Fetching repository info for org ${org}...`); logger.info(`Fetching repository info for org ${org}...`);
const { durationMs, data } = await measure(() => octokit.paginate(octokit.repos.listForOrg, { const { durationMs, data } = await measure(async () => {
org: org, const fetchFn = () => octokit.paginate(octokit.repos.listForOrg, {
per_page: 100, org: org,
request: { per_page: 100,
signal request: {
} signal
})); }
});
logger.debug(`Found ${data.length} in org ${org} in ${durationMs}ms.`); return fetchWithRetry(fetchFn, `org ${org}`, logger);
});
logger.info(`Found ${data.length} in org ${org} in ${durationMs}ms.`);
return data; return data;
} catch (e) { } catch (e) {
// @todo: handle rate limiting errors
logger.error(`Failed to fetch repository info for org ${org}.`, e); logger.error(`Failed to fetch repository info for org ${org}.`, e);
throw e; throw e;
} }
@ -243,22 +248,25 @@ const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSi
const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSignal) => { const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSignal) => {
const repos = (await Promise.all(repoList.map(async (repo) => { const repos = (await Promise.all(repoList.map(async (repo) => {
try { try {
logger.debug(`Fetching repository info for ${repo}...`);
const [owner, repoName] = repo.split('/'); const [owner, repoName] = repo.split('/');
const { durationMs, data: result } = await measure(() => octokit.repos.get({ logger.info(`Fetching repository info for ${repo}...`);
owner,
repo: repoName,
request: {
signal
}
}));
logger.debug(`Found info for repository ${repo} in ${durationMs}ms`); const { durationMs, data: result } = await measure(async () => {
const fetchFn = () => octokit.repos.get({
owner,
repo: repoName,
request: {
signal
}
});
return fetchWithRetry(fetchFn, repo, logger);
});
logger.info(`Found info for repository ${repo} in ${durationMs}ms`);
return [result.data]; return [result.data];
} catch (e) { } catch (e) {
// @todo: handle rate limiting errors
logger.error(`Failed to fetch repository info for ${repo}.`, e); logger.error(`Failed to fetch repository info for ${repo}.`, e);
throw e; throw e;
} }

View file

@ -2,38 +2,43 @@ import { Gitlab, ProjectSchema } from "@gitbeaker/rest";
import micromatch from "micromatch"; import micromatch from "micromatch";
import { createLogger } from "./logger.js"; import { createLogger } from "./logger.js";
import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type" import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type"
import { getTokenFromConfig, measure } from "./utils.js"; import { getTokenFromConfig, measure, fetchWithRetry } from "./utils.js";
import { PrismaClient } from "@sourcebot/db"; import { PrismaClient } from "@sourcebot/db";
import { FALLBACK_GITLAB_TOKEN } from "./environment.js";
const logger = createLogger("GitLab"); const logger = createLogger("GitLab");
export const GITLAB_CLOUD_HOSTNAME = "gitlab.com"; export const GITLAB_CLOUD_HOSTNAME = "gitlab.com";
export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, orgId: number, db: PrismaClient) => { export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, orgId: number, db: PrismaClient) => {
const token = config.token ? await getTokenFromConfig(config.token, orgId, db) : undefined; const token = config.token ? await getTokenFromConfig(config.token, orgId, db) : undefined;
const api = new Gitlab({ const api = new Gitlab({
...(config.token ? { ...(token ? {
token, token,
} : {}), } : {
token: FALLBACK_GITLAB_TOKEN,
}),
...(config.url ? { ...(config.url ? {
host: config.url, host: config.url,
} : {}), } : {}),
}); });
const hostname = config.url ? new URL(config.url).hostname : GITLAB_CLOUD_HOSTNAME; const hostname = config.url ? new URL(config.url).hostname : GITLAB_CLOUD_HOSTNAME;
let allProjects: ProjectSchema[] = []; let allProjects: ProjectSchema[] = [];
if (config.all === true) { if (config.all === true) {
if (hostname !== GITLAB_CLOUD_HOSTNAME) { if (hostname !== GITLAB_CLOUD_HOSTNAME) {
try { try {
logger.debug(`Fetching all projects visible in ${config.url}...`); logger.debug(`Fetching all projects visible in ${config.url}...`);
const { durationMs, data: _projects } = await measure(() => api.Projects.all({ const { durationMs, data: _projects } = await measure(async () => {
perPage: 100, const fetchFn = () => api.Projects.all({
})); perPage: 100,
});
return fetchWithRetry(fetchFn, `all projects in ${config.url}`, logger);
});
logger.debug(`Found ${_projects.length} projects in ${durationMs}ms.`); logger.debug(`Found ${_projects.length} projects in ${durationMs}ms.`);
allProjects = allProjects.concat(_projects); allProjects = allProjects.concat(_projects);
} catch (e) { } catch (e) {
logger.error(`Failed to fetch all projects visible in ${config.url}.`, e); logger.error(`Failed to fetch all projects visible in ${config.url}.`, e);
throw e;
} }
} else { } else {
logger.warn(`Ignoring option all:true in config : host is ${GITLAB_CLOUD_HOSTNAME}`); logger.warn(`Ignoring option all:true in config : host is ${GITLAB_CLOUD_HOSTNAME}`);
@ -44,15 +49,18 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
const _projects = (await Promise.all(config.groups.map(async (group) => { const _projects = (await Promise.all(config.groups.map(async (group) => {
try { try {
logger.debug(`Fetching project info for group ${group}...`); logger.debug(`Fetching project info for group ${group}...`);
const { durationMs, data } = await measure(() => api.Groups.allProjects(group, { const { durationMs, data } = await measure(async () => {
perPage: 100, const fetchFn = () => api.Groups.allProjects(group, {
includeSubgroups: true perPage: 100,
})); includeSubgroups: true
});
return fetchWithRetry(fetchFn, `group ${group}`, logger);
});
logger.debug(`Found ${data.length} projects in group ${group} in ${durationMs}ms.`); logger.debug(`Found ${data.length} projects in group ${group} in ${durationMs}ms.`);
return data; return data;
} catch (e) { } catch (e) {
logger.error(`Failed to fetch project info for group ${group}.`, e); logger.error(`Failed to fetch project info for group ${group}.`, e);
return []; throw e;
} }
}))).flat(); }))).flat();
@ -63,14 +71,17 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
const _projects = (await Promise.all(config.users.map(async (user) => { const _projects = (await Promise.all(config.users.map(async (user) => {
try { try {
logger.debug(`Fetching project info for user ${user}...`); logger.debug(`Fetching project info for user ${user}...`);
const { durationMs, data } = await measure(() => api.Users.allProjects(user, { const { durationMs, data } = await measure(async () => {
perPage: 100, const fetchFn = () => api.Users.allProjects(user, {
})); perPage: 100,
});
return fetchWithRetry(fetchFn, `user ${user}`, logger);
});
logger.debug(`Found ${data.length} projects owned by user ${user} in ${durationMs}ms.`); logger.debug(`Found ${data.length} projects owned by user ${user} in ${durationMs}ms.`);
return data; return data;
} catch (e) { } catch (e) {
logger.error(`Failed to fetch project info for user ${user}.`, e); logger.error(`Failed to fetch project info for user ${user}.`, e);
return []; throw e;
} }
}))).flat(); }))).flat();
@ -81,12 +92,15 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
const _projects = (await Promise.all(config.projects.map(async (project) => { const _projects = (await Promise.all(config.projects.map(async (project) => {
try { try {
logger.debug(`Fetching project info for project ${project}...`); logger.debug(`Fetching project info for project ${project}...`);
const { durationMs, data } = await measure(() => api.Projects.show(project)); const { durationMs, data } = await measure(async () => {
const fetchFn = () => api.Projects.show(project);
return fetchWithRetry(fetchFn, `project ${project}`, logger);
});
logger.debug(`Found project ${project} in ${durationMs}ms.`); logger.debug(`Found project ${project} in ${durationMs}ms.`);
return [data]; return [data];
} catch (e) { } catch (e) {
logger.error(`Failed to fetch project info for project ${project}.`, e); logger.error(`Failed to fetch project info for project ${project}.`, e);
return []; throw e;
} }
}))).flat(); }))).flat();

View file

@ -203,7 +203,7 @@ export class RepoManager implements IRepoManager {
this.logger.info(`Fetching ${repo.id}...`); this.logger.info(`Fetching ${repo.id}...`);
const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => { const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) //this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
})); }));
fetchDuration_s = durationMs / 1000; fetchDuration_s = durationMs / 1000;
@ -222,7 +222,7 @@ export class RepoManager implements IRepoManager {
} }
const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => { const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => {
this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`) //this.logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
})); }));
cloneDuration_s = durationMs / 1000; cloneDuration_s = durationMs / 1000;
@ -243,6 +243,7 @@ export class RepoManager implements IRepoManager {
} }
private async runIndexJob(job: Job<JobPayload>) { private async runIndexJob(job: Job<JobPayload>) {
this.logger.info(`Running index job (id: ${job.id}) for repo ${job.data.repo.id}`);
const repo = job.data.repo as RepoWithConnections; const repo = job.data.repo as RepoWithConnections;
await this.db.repo.update({ await this.db.repo.update({
where: { where: {
@ -257,10 +258,30 @@ export class RepoManager implements IRepoManager {
let fetchDuration_s: number | undefined; let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined; let cloneDuration_s: number | undefined;
const stats = await this.syncGitRepository(repo); let stats;
indexDuration_s = stats.indexDuration_s; let attempts = 0;
fetchDuration_s = stats.fetchDuration_s; const maxAttempts = 3;
cloneDuration_s = stats.cloneDuration_s;
while (attempts < maxAttempts) {
try {
stats = await this.syncGitRepository(repo);
break;
} catch (error) {
attempts++;
if (attempts === maxAttempts) {
this.logger.error(`Failed to sync repository ${repo.id} after ${maxAttempts} attempts. Error: ${error}`);
throw error;
}
const sleepDuration = 5000 * Math.pow(2, attempts - 1);
this.logger.error(`Failed to sync repository ${repo.id}, attempt ${attempts}/${maxAttempts}. Sleeping for ${sleepDuration / 1000}s... Error: ${error}`);
await new Promise(resolve => setTimeout(resolve, sleepDuration));
}
}
indexDuration_s = stats!.indexDuration_s;
fetchDuration_s = stats!.fetchDuration_s;
cloneDuration_s = stats!.cloneDuration_s;
captureEvent('repo_synced', { captureEvent('repo_synced', {
vcs: 'git', vcs: 'git',
@ -286,7 +307,7 @@ export class RepoManager implements IRepoManager {
} }
private async onIndexJobFailed(job: Job<JobPayload> | undefined, err: unknown) { private async onIndexJobFailed(job: Job<JobPayload> | undefined, err: unknown) {
this.logger.info(`Repo index job failed with error: ${err}`); this.logger.info(`Repo index job failed (id: ${job?.id ?? 'unknown'}) with error: ${err}`);
if (job) { if (job) {
await this.db.repo.update({ await this.db.repo.update({
where: { where: {

View file

@ -163,4 +163,31 @@ export const getRepoPath = (repo: Repo, ctx: AppContext) => {
export const getShardPrefix = (orgId: number, repoId: number) => { export const getShardPrefix = (orgId: number, repoId: number) => {
return `${orgId}_${repoId}`; return `${orgId}_${repoId}`;
}
export const fetchWithRetry = async <T>(
fetchFn: () => Promise<T>,
identifier: string,
logger: Logger,
maxAttempts: number = 3
): Promise<T> => {
let attempts = 0;
while (true) {
try {
return await fetchFn();
} catch (e: any) {
attempts++;
if ((e.status === 403 || e.status === 429 || e.status === 443) && attempts < maxAttempts) {
const computedWaitTime = 3000 * Math.pow(2, attempts - 1);
const resetTime = e.response?.headers?.['x-ratelimit-reset'] ? parseInt(e.response.headers['x-ratelimit-reset']) * 1000 : Date.now() + computedWaitTime;
const waitTime = resetTime - Date.now();
logger.warn(`Rate limit exceeded for ${identifier}. Waiting ${waitTime}ms before retry ${attempts}/${maxAttempts}...`);
await new Promise(resolve => setTimeout(resolve, waitTime));
continue;
}
throw e;
}
}
} }

View file

@ -11,7 +11,7 @@ import { githubSchema } from "@sourcebot/schemas/v3/github.schema";
import { gitlabSchema } from "@sourcebot/schemas/v3/gitlab.schema"; import { gitlabSchema } from "@sourcebot/schemas/v3/gitlab.schema";
import { giteaSchema } from "@sourcebot/schemas/v3/gitea.schema"; import { giteaSchema } from "@sourcebot/schemas/v3/gitea.schema";
import { gerritSchema } from "@sourcebot/schemas/v3/gerrit.schema"; import { gerritSchema } from "@sourcebot/schemas/v3/gerrit.schema";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type"; import { GithubConnectionConfig, GitlabConnectionConfig, GiteaConnectionConfig, GerritConnectionConfig, ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { encrypt } from "@sourcebot/crypto" import { encrypt } from "@sourcebot/crypto"
import { getConnection, getLinkedRepos } from "./data/connection"; import { getConnection, getLinkedRepos } from "./data/connection";
import { ConnectionSyncStatus, Prisma, Invite, OrgRole, Connection, Repo, Org } from "@sourcebot/db"; import { ConnectionSyncStatus, Prisma, Invite, OrgRole, Connection, Repo, Org } from "@sourcebot/db";
@ -19,7 +19,7 @@ import { headers } from "next/headers"
import { getStripe } from "@/lib/stripe" import { getStripe } from "@/lib/stripe"
import { getUser } from "@/data/user"; import { getUser } from "@/data/user";
import { Session } from "next-auth"; import { Session } from "next-auth";
import { STRIPE_PRODUCT_ID } from "@/lib/environment"; import { STRIPE_PRODUCT_ID, CONFIG_MAX_REPOS_NO_TOKEN } from "@/lib/environment";
import { StripeSubscriptionStatus } from "@sourcebot/db"; import { StripeSubscriptionStatus } from "@sourcebot/db";
import Stripe from "stripe"; import Stripe from "stripe";
const ajv = new Ajv({ const ajv = new Ajv({
@ -81,7 +81,7 @@ export const withOwner = async <T>(session: Session, domain: string, fn: (orgId:
}, },
}); });
if (!userRole || userRole.role !== OrgRole.OWNER) { if (!userRole || userRole.role !== OrgRole.OWNER) {
return { return {
statusCode: StatusCodes.FORBIDDEN, statusCode: StatusCodes.FORBIDDEN,
errorCode: ErrorCode.MEMBER_NOT_OWNER, errorCode: ErrorCode.MEMBER_NOT_OWNER,
@ -350,7 +350,7 @@ export const flagConnectionForSync = async (connectionId: number, domain: string
} }
await prisma.connection.update({ await prisma.connection.update({
where: { where: {
id: connection.id, id: connection.id,
}, },
data: { data: {
@ -400,7 +400,7 @@ export const getCurrentUserRole = async (domain: string): Promise<OrgRole | Serv
} }
return userRole.role; return userRole.role;
}) })
); );
export const createInvite = async (email: string, userId: string, domain: string): Promise<{ success: boolean } | ServiceError> => export const createInvite = async (email: string, userId: string, domain: string): Promise<{ success: boolean } | ServiceError> =>
@ -520,7 +520,7 @@ export const makeOwner = async (newOwnerId: string, domain: string): Promise<{ s
} }
const newOwner = await prisma.userToOrg.findUnique({ const newOwner = await prisma.userToOrg.findUnique({
where: { where: {
orgId_userId: { orgId_userId: {
userId: newOwnerId, userId: newOwnerId,
orgId, orgId,
@ -600,6 +600,48 @@ const parseConnectionConfig = (connectionType: string, config: string) => {
} satisfies ServiceError; } satisfies ServiceError;
} }
const { numRepos, hasToken } = (() => {
switch (connectionType) {
case "github":
const githubConfig = parsedConfig as GithubConnectionConfig;
return {
numRepos: githubConfig.repos?.length,
hasToken: !!githubConfig.token,
}
case "gitlab":
const gitlabConfig = parsedConfig as GitlabConnectionConfig;
return {
numRepos: gitlabConfig.projects?.length,
hasToken: !!gitlabConfig.token,
}
case "gitea":
const giteaConfig = parsedConfig as GiteaConnectionConfig;
return {
numRepos: giteaConfig.repos?.length,
hasToken: !!giteaConfig.token,
}
case "gerrit":
const gerritConfig = parsedConfig as GerritConnectionConfig;
return {
numRepos: gerritConfig.projects?.length,
hasToken: true, // gerrit doesn't use a token atm
}
default:
return {
numRepos: undefined,
hasToken: true
}
}
})();
if (!hasToken && numRepos && numRepos > CONFIG_MAX_REPOS_NO_TOKEN) {
return {
statusCode: StatusCodes.BAD_REQUEST,
errorCode: ErrorCode.INVALID_REQUEST_BODY,
message: `You must provide a token to sync more than ${CONFIG_MAX_REPOS_NO_TOKEN} repositories.`,
} satisfies ServiceError;
}
const isValidConfig = ajv.validate(schema, parsedConfig); const isValidConfig = ajv.validate(schema, parsedConfig);
if (!isValidConfig) { if (!isValidConfig) {
return { return {

View file

@ -17,3 +17,5 @@ export const AUTH_URL = getEnv(process.env.AUTH_URL)!;
export const STRIPE_SECRET_KEY = getEnv(process.env.STRIPE_SECRET_KEY); export const STRIPE_SECRET_KEY = getEnv(process.env.STRIPE_SECRET_KEY);
export const STRIPE_PRODUCT_ID = getEnv(process.env.STRIPE_PRODUCT_ID); export const STRIPE_PRODUCT_ID = getEnv(process.env.STRIPE_PRODUCT_ID);
export const STRIPE_WEBHOOK_SECRET = getEnv(process.env.STRIPE_WEBHOOK_SECRET); export const STRIPE_WEBHOOK_SECRET = getEnv(process.env.STRIPE_WEBHOOK_SECRET);
export const CONFIG_MAX_REPOS_NO_TOKEN = getEnvNumber(process.env.CONFIG_MAX_REPOS_NO_TOKEN, 500);

View file

@ -139,7 +139,16 @@ export const getEnv = (env: string | undefined, defaultValue?: string) => {
} }
export const getEnvNumber = (env: string | undefined, defaultValue: number = 0) => { export const getEnvNumber = (env: string | undefined, defaultValue: number = 0) => {
return Number(env) ?? defaultValue; if (!env) {
return defaultValue;
}
const num = Number(env);
if (isNaN(num)) {
return defaultValue;
}
return num;
} }
export const getEnvBoolean = (env: string | undefined, defaultValue: boolean) => { export const getEnvBoolean = (env: string | undefined, defaultValue: boolean) => {