mirror of
https://github.com/sourcebot-dev/sourcebot.git
synced 2025-12-12 20:35:24 +00:00
user syncing + represent sync job status in a seperate table
This commit is contained in:
parent
3638666248
commit
e5c8caadb8
11 changed files with 488 additions and 107 deletions
|
|
@ -14,6 +14,7 @@
|
||||||
"watch:mcp": "yarn workspace @sourcebot/mcp build:watch",
|
"watch:mcp": "yarn workspace @sourcebot/mcp build:watch",
|
||||||
"watch:schemas": "yarn workspace @sourcebot/schemas watch",
|
"watch:schemas": "yarn workspace @sourcebot/schemas watch",
|
||||||
"dev:prisma:migrate:dev": "yarn with-env yarn workspace @sourcebot/db prisma:migrate:dev",
|
"dev:prisma:migrate:dev": "yarn with-env yarn workspace @sourcebot/db prisma:migrate:dev",
|
||||||
|
"dev:prisma:generate": "yarn with-env yarn workspace @sourcebot/db prisma:generate",
|
||||||
"dev:prisma:studio": "yarn with-env yarn workspace @sourcebot/db prisma:studio",
|
"dev:prisma:studio": "yarn with-env yarn workspace @sourcebot/db prisma:studio",
|
||||||
"dev:prisma:migrate:reset": "yarn with-env yarn workspace @sourcebot/db prisma:migrate:reset",
|
"dev:prisma:migrate:reset": "yarn with-env yarn workspace @sourcebot/db prisma:migrate:reset",
|
||||||
"dev:prisma:db:push": "yarn with-env yarn workspace @sourcebot/db prisma:db:push",
|
"dev:prisma:db:push": "yarn with-env yarn workspace @sourcebot/db prisma:db:push",
|
||||||
|
|
|
||||||
|
|
@ -17,3 +17,7 @@ export const DEFAULT_SETTINGS: Settings = {
|
||||||
repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours
|
repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours
|
||||||
enablePublicAccess: false // deprected, use FORCE_ENABLE_ANONYMOUS_ACCESS instead
|
enablePublicAccess: false // deprected, use FORCE_ENABLE_ANONYMOUS_ACCESS instead
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES = [
|
||||||
|
'github',
|
||||||
|
];
|
||||||
|
|
@ -129,6 +129,17 @@ export const getUserIdsWithReadAccessToRepo = async (owner: string, repo: string
|
||||||
return collaborators.map(collaborator => collaborator.id.toString());
|
return collaborators.map(collaborator => collaborator.id.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const getReposThatAuthenticatedUserHasReadAccessTo = async (octokit: Octokit) => {
|
||||||
|
const fetchFn = () => octokit.paginate(octokit.repos.listForAuthenticatedUser, {
|
||||||
|
per_page: 100,
|
||||||
|
// @todo: do we need to set a visibility to private only?
|
||||||
|
// visibility: 'private'
|
||||||
|
});
|
||||||
|
|
||||||
|
const repos = await fetchWithRetry(fetchFn, `authenticated user`, logger);
|
||||||
|
return repos.map(repo => repo.id.toString());
|
||||||
|
}
|
||||||
|
|
||||||
export const createOctokitFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient): Promise<{ octokit: Octokit, isAuthenticated: boolean }> => {
|
export const createOctokitFromConfig = async (config: GithubConnectionConfig, orgId: number, db: PrismaClient): Promise<{ octokit: Octokit, isAuthenticated: boolean }> => {
|
||||||
const hostname = config.url ?
|
const hostname = config.url ?
|
||||||
new URL(config.url).hostname :
|
new URL(config.url).hostname :
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,11 @@ import path from 'path';
|
||||||
import { ConnectionManager } from './connectionManager.js';
|
import { ConnectionManager } from './connectionManager.js';
|
||||||
import { DEFAULT_SETTINGS } from './constants.js';
|
import { DEFAULT_SETTINGS } from './constants.js';
|
||||||
import { env } from "./env.js";
|
import { env } from "./env.js";
|
||||||
import { RepoPermissionSyncer } from './permissionSyncer.js';
|
import { RepoPermissionSyncer } from './repoPermissionSyncer.js';
|
||||||
import { PromClient } from './promClient.js';
|
import { PromClient } from './promClient.js';
|
||||||
import { RepoManager } from './repoManager.js';
|
import { RepoManager } from './repoManager.js';
|
||||||
import { AppContext } from "./types.js";
|
import { AppContext } from "./types.js";
|
||||||
|
import { UserPermissionSyncer } from "./userPermissionSyncer.js";
|
||||||
|
|
||||||
|
|
||||||
const logger = createLogger('backend-entrypoint');
|
const logger = createLogger('backend-entrypoint');
|
||||||
|
|
@ -68,20 +69,25 @@ const settings = await getSettings(env.CONFIG_PATH);
|
||||||
|
|
||||||
const connectionManager = new ConnectionManager(prisma, settings, redis);
|
const connectionManager = new ConnectionManager(prisma, settings, redis);
|
||||||
const repoManager = new RepoManager(prisma, settings, redis, promClient, context);
|
const repoManager = new RepoManager(prisma, settings, redis, promClient, context);
|
||||||
const permissionSyncer = new RepoPermissionSyncer(prisma, redis);
|
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, redis);
|
||||||
|
const userPermissionSyncer = new UserPermissionSyncer(prisma, redis);
|
||||||
|
|
||||||
await repoManager.validateIndexedReposHaveShards();
|
await repoManager.validateIndexedReposHaveShards();
|
||||||
|
|
||||||
const connectionManagerInterval = connectionManager.startScheduler();
|
const connectionManagerInterval = connectionManager.startScheduler();
|
||||||
const repoManagerInterval = repoManager.startScheduler();
|
const repoManagerInterval = repoManager.startScheduler();
|
||||||
const permissionSyncerInterval = env.EXPERIMENT_PERMISSION_SYNC_ENABLED === 'true' ? permissionSyncer.startScheduler() : null;
|
const repoPermissionSyncerInterval = env.EXPERIMENT_PERMISSION_SYNC_ENABLED === 'true' ? repoPermissionSyncer.startScheduler() : null;
|
||||||
|
const userPermissionSyncerInterval = env.EXPERIMENT_PERMISSION_SYNC_ENABLED === 'true' ? userPermissionSyncer.startScheduler() : null;
|
||||||
|
|
||||||
|
|
||||||
const cleanup = async (signal: string) => {
|
const cleanup = async (signal: string) => {
|
||||||
logger.info(`Recieved ${signal}, cleaning up...`);
|
logger.info(`Recieved ${signal}, cleaning up...`);
|
||||||
|
|
||||||
if (permissionSyncerInterval) {
|
if (userPermissionSyncerInterval) {
|
||||||
clearInterval(permissionSyncerInterval);
|
clearInterval(userPermissionSyncerInterval);
|
||||||
|
}
|
||||||
|
if (repoPermissionSyncerInterval) {
|
||||||
|
clearInterval(repoPermissionSyncerInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
clearInterval(connectionManagerInterval);
|
clearInterval(connectionManagerInterval);
|
||||||
|
|
@ -89,7 +95,8 @@ const cleanup = async (signal: string) => {
|
||||||
|
|
||||||
connectionManager.dispose();
|
connectionManager.dispose();
|
||||||
repoManager.dispose();
|
repoManager.dispose();
|
||||||
permissionSyncer.dispose();
|
repoPermissionSyncer.dispose();
|
||||||
|
userPermissionSyncer.dispose();
|
||||||
|
|
||||||
await prisma.$disconnect();
|
await prisma.$disconnect();
|
||||||
await redis.quit();
|
await redis.quit();
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
import * as Sentry from "@sentry/node";
|
import * as Sentry from "@sentry/node";
|
||||||
import { PrismaClient, Repo, RepoPermissionSyncStatus } from "@sourcebot/db";
|
import { PrismaClient, Repo, RepoPermissionSyncJobStatus } from "@sourcebot/db";
|
||||||
import { createLogger } from "@sourcebot/logger";
|
import { createLogger } from "@sourcebot/logger";
|
||||||
import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type";
|
import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type";
|
||||||
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type";
|
import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type";
|
||||||
|
|
@ -10,16 +10,16 @@ import { Redis } from 'ioredis';
|
||||||
import { env } from "./env.js";
|
import { env } from "./env.js";
|
||||||
import { createOctokitFromConfig, getUserIdsWithReadAccessToRepo } from "./github.js";
|
import { createOctokitFromConfig, getUserIdsWithReadAccessToRepo } from "./github.js";
|
||||||
import { RepoWithConnections } from "./types.js";
|
import { RepoWithConnections } from "./types.js";
|
||||||
|
import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "./constants.js";
|
||||||
|
|
||||||
type RepoPermissionSyncJob = {
|
type RepoPermissionSyncJob = {
|
||||||
repoId: number;
|
jobId: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
const QUEUE_NAME = 'repoPermissionSyncQueue';
|
const QUEUE_NAME = 'repoPermissionSyncQueue';
|
||||||
|
|
||||||
const logger = createLogger('permission-syncer');
|
const logger = createLogger('repo-permission-syncer');
|
||||||
|
|
||||||
const SUPPORTED_CODE_HOST_TYPES = ['github'];
|
|
||||||
|
|
||||||
export class RepoPermissionSyncer {
|
export class RepoPermissionSyncer {
|
||||||
private queue: Queue<RepoPermissionSyncJob>;
|
private queue: Queue<RepoPermissionSyncJob>;
|
||||||
|
|
@ -46,6 +46,7 @@ export class RepoPermissionSyncer {
|
||||||
return setInterval(async () => {
|
return setInterval(async () => {
|
||||||
// @todo: make this configurable
|
// @todo: make this configurable
|
||||||
const thresholdDate = new Date(Date.now() - 1000 * 60 * 60 * 24);
|
const thresholdDate = new Date(Date.now() - 1000 * 60 * 60 * 24);
|
||||||
|
|
||||||
const repos = await this.db.repo.findMany({
|
const repos = await this.db.repo.findMany({
|
||||||
// Repos need their permissions to be synced against the code host when...
|
// Repos need their permissions to be synced against the code host when...
|
||||||
where: {
|
where: {
|
||||||
|
|
@ -53,40 +54,47 @@ export class RepoPermissionSyncer {
|
||||||
AND: [
|
AND: [
|
||||||
{
|
{
|
||||||
external_codeHostType: {
|
external_codeHostType: {
|
||||||
in: SUPPORTED_CODE_HOST_TYPES,
|
in: PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// and, they either require a sync (SYNC_NEEDED) or have been in a completed state (SYNCED or FAILED)
|
|
||||||
// for > some duration (default 24 hours)
|
|
||||||
{
|
{
|
||||||
OR: [
|
OR: [
|
||||||
{
|
{ permissionSyncedAt: null },
|
||||||
permissionSyncStatus: RepoPermissionSyncStatus.SYNC_NEEDED
|
{ permissionSyncedAt: { lt: thresholdDate } },
|
||||||
},
|
],
|
||||||
{
|
},
|
||||||
AND: [
|
{
|
||||||
{
|
NOT: {
|
||||||
OR: [
|
permissionSyncJobs: {
|
||||||
{ permissionSyncStatus: RepoPermissionSyncStatus.SYNCED },
|
some: {
|
||||||
{ permissionSyncStatus: RepoPermissionSyncStatus.FAILED },
|
OR: [
|
||||||
]
|
// Don't schedule if there are active jobs
|
||||||
},
|
{
|
||||||
{
|
status: {
|
||||||
OR: [
|
in: [
|
||||||
{ permissionSyncJobLastCompletedAt: null },
|
RepoPermissionSyncJobStatus.PENDING,
|
||||||
{ permissionSyncJobLastCompletedAt: { lt: thresholdDate } }
|
RepoPermissionSyncJobStatus.IN_PROGRESS,
|
||||||
]
|
],
|
||||||
}
|
}
|
||||||
]
|
},
|
||||||
|
// Don't schedule if there are recent failed jobs (within the threshold date). Note `gt` is used here since this is a inverse condition.
|
||||||
|
{
|
||||||
|
AND: [
|
||||||
|
{ status: RepoPermissionSyncJobStatus.FAILED },
|
||||||
|
{ completedAt: { gt: thresholdDate } },
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
}
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.schedulePermissionSync(repos);
|
await this.schedulePermissionSync(repos);
|
||||||
}, 1000 * 30);
|
}, 1000 * 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
public dispose() {
|
public dispose() {
|
||||||
|
|
@ -96,15 +104,16 @@ export class RepoPermissionSyncer {
|
||||||
|
|
||||||
private async schedulePermissionSync(repos: Repo[]) {
|
private async schedulePermissionSync(repos: Repo[]) {
|
||||||
await this.db.$transaction(async (tx) => {
|
await this.db.$transaction(async (tx) => {
|
||||||
await tx.repo.updateMany({
|
const jobs = await tx.repoPermissionSyncJob.createManyAndReturn({
|
||||||
where: { id: { in: repos.map(repo => repo.id) } },
|
data: repos.map(repo => ({
|
||||||
data: { permissionSyncStatus: RepoPermissionSyncStatus.IN_SYNC_QUEUE },
|
repoId: repo.id,
|
||||||
|
})),
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.queue.addBulk(repos.map(repo => ({
|
await this.queue.addBulk(jobs.map((job) => ({
|
||||||
name: 'repoPermissionSyncJob',
|
name: 'repoPermissionSyncJob',
|
||||||
data: {
|
data: {
|
||||||
repoId: repo.id,
|
jobId: job.id,
|
||||||
},
|
},
|
||||||
opts: {
|
opts: {
|
||||||
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
|
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
|
||||||
|
|
@ -115,21 +124,25 @@ export class RepoPermissionSyncer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async runJob(job: Job<RepoPermissionSyncJob>) {
|
private async runJob(job: Job<RepoPermissionSyncJob>) {
|
||||||
const id = job.data.repoId;
|
const id = job.data.jobId;
|
||||||
const repo = await this.db.repo.update({
|
const { repo } = await this.db.repoPermissionSyncJob.update({
|
||||||
where: {
|
where: {
|
||||||
id
|
id,
|
||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
permissionSyncStatus: RepoPermissionSyncStatus.SYNCING,
|
status: RepoPermissionSyncJobStatus.IN_PROGRESS,
|
||||||
},
|
},
|
||||||
include: {
|
select: {
|
||||||
connections: {
|
repo: {
|
||||||
include: {
|
include: {
|
||||||
connection: true,
|
connections: {
|
||||||
},
|
include: {
|
||||||
},
|
connection: true,
|
||||||
},
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!repo) {
|
if (!repo) {
|
||||||
|
|
@ -171,34 +184,43 @@ export class RepoPermissionSyncer {
|
||||||
return [];
|
return [];
|
||||||
})();
|
})();
|
||||||
|
|
||||||
await this.db.repo.update({
|
await this.db.$transaction([
|
||||||
where: {
|
this.db.repo.update({
|
||||||
id: repo.id,
|
where: {
|
||||||
},
|
id: repo.id,
|
||||||
data: {
|
},
|
||||||
permittedUsers: {
|
data: {
|
||||||
deleteMany: {},
|
permittedUsers: {
|
||||||
|
deleteMany: {},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}),
|
||||||
});
|
this.db.userToRepoPermission.createMany({
|
||||||
|
data: userIds.map(userId => ({
|
||||||
await this.db.userToRepoPermission.createMany({
|
userId,
|
||||||
data: userIds.map(userId => ({
|
repoId: repo.id,
|
||||||
userId,
|
})),
|
||||||
repoId: repo.id,
|
})
|
||||||
})),
|
]);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async onJobCompleted(job: Job<RepoPermissionSyncJob>) {
|
private async onJobCompleted(job: Job<RepoPermissionSyncJob>) {
|
||||||
const repo = await this.db.repo.update({
|
const { repo } = await this.db.repoPermissionSyncJob.update({
|
||||||
where: {
|
where: {
|
||||||
id: job.data.repoId,
|
id: job.data.jobId,
|
||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
permissionSyncStatus: RepoPermissionSyncStatus.SYNCED,
|
status: RepoPermissionSyncJobStatus.COMPLETED,
|
||||||
permissionSyncJobLastCompletedAt: new Date(),
|
repo: {
|
||||||
|
update: {
|
||||||
|
permissionSyncedAt: new Date(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
completedAt: new Date(),
|
||||||
},
|
},
|
||||||
|
select: {
|
||||||
|
repo: true
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.info(`Permissions synced for repo ${repo.displayName ?? repo.name}`);
|
logger.info(`Permissions synced for repo ${repo.displayName ?? repo.name}`);
|
||||||
|
|
@ -207,21 +229,25 @@ export class RepoPermissionSyncer {
|
||||||
private async onJobFailed(job: Job<RepoPermissionSyncJob> | undefined, err: Error) {
|
private async onJobFailed(job: Job<RepoPermissionSyncJob> | undefined, err: Error) {
|
||||||
Sentry.captureException(err, {
|
Sentry.captureException(err, {
|
||||||
tags: {
|
tags: {
|
||||||
repoId: job?.data.repoId,
|
jobId: job?.data.jobId,
|
||||||
queue: QUEUE_NAME,
|
queue: QUEUE_NAME,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const errorMessage = (repoName: string) => `Repo permission sync job failed for repo ${repoName}: ${err}`;
|
const errorMessage = (repoName: string) => `Repo permission sync job failed for repo ${repoName}: ${err.message}`;
|
||||||
|
|
||||||
if (job) {
|
if (job) {
|
||||||
const repo = await this.db.repo.update({
|
const { repo } = await this.db.repoPermissionSyncJob.update({
|
||||||
where: {
|
where: {
|
||||||
id: job?.data.repoId,
|
id: job.data.jobId,
|
||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
permissionSyncStatus: RepoPermissionSyncStatus.FAILED,
|
status: RepoPermissionSyncJobStatus.FAILED,
|
||||||
permissionSyncJobLastCompletedAt: new Date(),
|
completedAt: new Date(),
|
||||||
|
errorMessage: err.message,
|
||||||
|
},
|
||||||
|
select: {
|
||||||
|
repo: true
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
logger.error(errorMessage(repo.displayName ?? repo.name));
|
logger.error(errorMessage(repo.displayName ?? repo.name));
|
||||||
249
packages/backend/src/userPermissionSyncer.ts
Normal file
249
packages/backend/src/userPermissionSyncer.ts
Normal file
|
|
@ -0,0 +1,249 @@
|
||||||
|
import { Octokit } from "@octokit/rest";
|
||||||
|
import * as Sentry from "@sentry/node";
|
||||||
|
import { PrismaClient, User, UserPermissionSyncJobStatus } from "@sourcebot/db";
|
||||||
|
import { createLogger } from "@sourcebot/logger";
|
||||||
|
import { Job, Queue, Worker } from "bullmq";
|
||||||
|
import { Redis } from "ioredis";
|
||||||
|
import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "./constants.js";
|
||||||
|
import { env } from "./env.js";
|
||||||
|
import { getReposThatAuthenticatedUserHasReadAccessTo } from "./github.js";
|
||||||
|
|
||||||
|
const logger = createLogger('user-permission-syncer');
|
||||||
|
|
||||||
|
const QUEUE_NAME = 'userPermissionSyncQueue';
|
||||||
|
|
||||||
|
type UserPermissionSyncJob = {
|
||||||
|
jobId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export class UserPermissionSyncer {
|
||||||
|
private queue: Queue<UserPermissionSyncJob>;
|
||||||
|
private worker: Worker<UserPermissionSyncJob>;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private db: PrismaClient,
|
||||||
|
redis: Redis,
|
||||||
|
) {
|
||||||
|
this.queue = new Queue<UserPermissionSyncJob>(QUEUE_NAME, {
|
||||||
|
connection: redis,
|
||||||
|
});
|
||||||
|
this.worker = new Worker<UserPermissionSyncJob>(QUEUE_NAME, this.runJob.bind(this), {
|
||||||
|
connection: redis,
|
||||||
|
concurrency: 1,
|
||||||
|
});
|
||||||
|
this.worker.on('completed', this.onJobCompleted.bind(this));
|
||||||
|
this.worker.on('failed', this.onJobFailed.bind(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
public startScheduler() {
|
||||||
|
logger.debug('Starting scheduler');
|
||||||
|
|
||||||
|
return setInterval(async () => {
|
||||||
|
const thresholdDate = new Date(Date.now() - 1000 * 60 * 60 * 24);
|
||||||
|
|
||||||
|
const users = await this.db.user.findMany({
|
||||||
|
where: {
|
||||||
|
AND: [
|
||||||
|
{
|
||||||
|
accounts: {
|
||||||
|
some: {
|
||||||
|
provider: {
|
||||||
|
in: PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
OR: [
|
||||||
|
{ permissionSyncedAt: null },
|
||||||
|
{ permissionSyncedAt: { lt: thresholdDate } },
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
NOT: {
|
||||||
|
permissionSyncJobs: {
|
||||||
|
some: {
|
||||||
|
OR: [
|
||||||
|
// Don't schedule if there are active jobs
|
||||||
|
{
|
||||||
|
status: {
|
||||||
|
in: [
|
||||||
|
UserPermissionSyncJobStatus.PENDING,
|
||||||
|
UserPermissionSyncJobStatus.IN_PROGRESS,
|
||||||
|
],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Don't schedule if there are recent failed jobs (within the threshold date). Note `gt` is used here since this is a inverse condition.
|
||||||
|
{
|
||||||
|
AND: [
|
||||||
|
{ status: UserPermissionSyncJobStatus.FAILED },
|
||||||
|
{ completedAt: { gt: thresholdDate } },
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.schedulePermissionSync(users);
|
||||||
|
}, 1000 * 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
public dispose() {
|
||||||
|
this.worker.close();
|
||||||
|
this.queue.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async schedulePermissionSync(users: User[]) {
|
||||||
|
await this.db.$transaction(async (tx) => {
|
||||||
|
const jobs = await tx.userPermissionSyncJob.createManyAndReturn({
|
||||||
|
data: users.map(user => ({
|
||||||
|
userId: user.id,
|
||||||
|
})),
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.queue.addBulk(jobs.map((job) => ({
|
||||||
|
name: 'userPermissionSyncJob',
|
||||||
|
data: {
|
||||||
|
jobId: job.id,
|
||||||
|
},
|
||||||
|
opts: {
|
||||||
|
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
|
||||||
|
removeOnFail: env.REDIS_REMOVE_ON_FAIL,
|
||||||
|
}
|
||||||
|
})))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async runJob(job: Job<UserPermissionSyncJob>) {
|
||||||
|
const id = job.data.jobId;
|
||||||
|
const { user } = await this.db.userPermissionSyncJob.update({
|
||||||
|
where: {
|
||||||
|
id,
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
status: UserPermissionSyncJobStatus.IN_PROGRESS,
|
||||||
|
},
|
||||||
|
select: {
|
||||||
|
user: {
|
||||||
|
include: {
|
||||||
|
accounts: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!user) {
|
||||||
|
throw new Error(`User ${id} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Syncing permissions for user ${user.email}...`);
|
||||||
|
|
||||||
|
for (const account of user.accounts) {
|
||||||
|
const repoIds = await (async () => {
|
||||||
|
if (account.provider === 'github') {
|
||||||
|
// @todo: we will need to provide some mechanism for the user to provide a custom
|
||||||
|
// URL here. This will correspond to the host URL they are using for their GitHub
|
||||||
|
// instance.
|
||||||
|
const octokit = new Octokit({
|
||||||
|
auth: account.access_token,
|
||||||
|
// baseUrl: /* todo */
|
||||||
|
});
|
||||||
|
|
||||||
|
const repoIds = await getReposThatAuthenticatedUserHasReadAccessTo(octokit);
|
||||||
|
|
||||||
|
const repos = await this.db.repo.findMany({
|
||||||
|
where: {
|
||||||
|
external_codeHostType: 'github',
|
||||||
|
external_id: {
|
||||||
|
in: repoIds,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return repos.map(repo => repo.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return [];
|
||||||
|
})();
|
||||||
|
|
||||||
|
|
||||||
|
await this.db.$transaction([
|
||||||
|
this.db.user.update({
|
||||||
|
where: {
|
||||||
|
id: user.id,
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
accessibleRepos: {
|
||||||
|
deleteMany: {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
this.db.userToRepoPermission.createMany({
|
||||||
|
data: repoIds.map(repoId => ({
|
||||||
|
userId: user.id,
|
||||||
|
repoId,
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async onJobCompleted(job: Job<UserPermissionSyncJob>) {
|
||||||
|
const { user } = await this.db.userPermissionSyncJob.update({
|
||||||
|
where: {
|
||||||
|
id: job.data.jobId,
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
status: UserPermissionSyncJobStatus.COMPLETED,
|
||||||
|
user: {
|
||||||
|
update: {
|
||||||
|
permissionSyncedAt: new Date(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
completedAt: new Date(),
|
||||||
|
},
|
||||||
|
select: {
|
||||||
|
user: true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info(`Permissions synced for user ${user.email}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async onJobFailed(job: Job<UserPermissionSyncJob> | undefined, err: Error) {
|
||||||
|
Sentry.captureException(err, {
|
||||||
|
tags: {
|
||||||
|
jobId: job?.data.jobId,
|
||||||
|
queue: QUEUE_NAME,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const errorMessage = (email: string) => `User permission sync job failed for user ${email}: ${err.message}`;
|
||||||
|
|
||||||
|
if (job) {
|
||||||
|
const { user } = await this.db.userPermissionSyncJob.update({
|
||||||
|
where: {
|
||||||
|
id: job.data.jobId,
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
status: UserPermissionSyncJobStatus.FAILED,
|
||||||
|
completedAt: new Date(),
|
||||||
|
errorMessage: err.message,
|
||||||
|
},
|
||||||
|
select: {
|
||||||
|
user: true,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.error(errorMessage(user.email ?? user.id));
|
||||||
|
} else {
|
||||||
|
logger.error(errorMessage('unknown user (id not found)'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
-- CreateTable
|
|
||||||
CREATE TABLE "UserToRepoPermission" (
|
|
||||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
"repoId" INTEGER NOT NULL,
|
|
||||||
"userId" TEXT NOT NULL,
|
|
||||||
|
|
||||||
CONSTRAINT "UserToRepoPermission_pkey" PRIMARY KEY ("repoId","userId")
|
|
||||||
);
|
|
||||||
|
|
||||||
-- AddForeignKey
|
|
||||||
ALTER TABLE "UserToRepoPermission" ADD CONSTRAINT "UserToRepoPermission_repoId_fkey" FOREIGN KEY ("repoId") REFERENCES "Repo"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
|
||||||
|
|
||||||
-- AddForeignKey
|
|
||||||
ALTER TABLE "UserToRepoPermission" ADD CONSTRAINT "UserToRepoPermission_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
|
||||||
|
|
@ -1,6 +0,0 @@
|
||||||
-- CreateEnum
|
|
||||||
CREATE TYPE "RepoPermissionSyncStatus" AS ENUM ('SYNC_NEEDED', 'IN_SYNC_QUEUE', 'SYNCING', 'SYNCED', 'FAILED');
|
|
||||||
|
|
||||||
-- AlterTable
|
|
||||||
ALTER TABLE "Repo" ADD COLUMN "permissionSyncJobLastCompletedAt" TIMESTAMP(3),
|
|
||||||
ADD COLUMN "permissionSyncStatus" "RepoPermissionSyncStatus" NOT NULL DEFAULT 'SYNC_NEEDED';
|
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
-- CreateEnum
|
||||||
|
CREATE TYPE "RepoPermissionSyncJobStatus" AS ENUM ('PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED');
|
||||||
|
|
||||||
|
-- CreateEnum
|
||||||
|
CREATE TYPE "UserPermissionSyncJobStatus" AS ENUM ('PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED');
|
||||||
|
|
||||||
|
-- AlterTable
|
||||||
|
ALTER TABLE "Repo" ADD COLUMN "permissionSyncedAt" TIMESTAMP(3);
|
||||||
|
|
||||||
|
-- AlterTable
|
||||||
|
ALTER TABLE "User" ADD COLUMN "permissionSyncedAt" TIMESTAMP(3);
|
||||||
|
|
||||||
|
-- CreateTable
|
||||||
|
CREATE TABLE "RepoPermissionSyncJob" (
|
||||||
|
"id" TEXT NOT NULL,
|
||||||
|
"status" "RepoPermissionSyncJobStatus" NOT NULL DEFAULT 'PENDING',
|
||||||
|
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||||
|
"completedAt" TIMESTAMP(3),
|
||||||
|
"errorMessage" TEXT,
|
||||||
|
"repoId" INTEGER NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT "RepoPermissionSyncJob_pkey" PRIMARY KEY ("id")
|
||||||
|
);
|
||||||
|
|
||||||
|
-- CreateTable
|
||||||
|
CREATE TABLE "UserPermissionSyncJob" (
|
||||||
|
"id" TEXT NOT NULL,
|
||||||
|
"status" "UserPermissionSyncJobStatus" NOT NULL DEFAULT 'PENDING',
|
||||||
|
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||||
|
"completedAt" TIMESTAMP(3),
|
||||||
|
"errorMessage" TEXT,
|
||||||
|
"userId" TEXT NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT "UserPermissionSyncJob_pkey" PRIMARY KEY ("id")
|
||||||
|
);
|
||||||
|
|
||||||
|
-- CreateTable
|
||||||
|
CREATE TABLE "UserToRepoPermission" (
|
||||||
|
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
"repoId" INTEGER NOT NULL,
|
||||||
|
"userId" TEXT NOT NULL,
|
||||||
|
|
||||||
|
CONSTRAINT "UserToRepoPermission_pkey" PRIMARY KEY ("repoId","userId")
|
||||||
|
);
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "RepoPermissionSyncJob" ADD CONSTRAINT "RepoPermissionSyncJob_repoId_fkey" FOREIGN KEY ("repoId") REFERENCES "Repo"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "UserPermissionSyncJob" ADD CONSTRAINT "UserPermissionSyncJob_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "UserToRepoPermission" ADD CONSTRAINT "UserToRepoPermission_repoId_fkey" FOREIGN KEY ("repoId") REFERENCES "Repo"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||||
|
|
||||||
|
-- AddForeignKey
|
||||||
|
ALTER TABLE "UserToRepoPermission" ADD CONSTRAINT "UserToRepoPermission_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||||
|
|
@ -30,14 +30,6 @@ enum ConnectionSyncStatus {
|
||||||
FAILED
|
FAILED
|
||||||
}
|
}
|
||||||
|
|
||||||
enum RepoPermissionSyncStatus {
|
|
||||||
SYNC_NEEDED
|
|
||||||
IN_SYNC_QUEUE
|
|
||||||
SYNCING
|
|
||||||
SYNCED
|
|
||||||
FAILED
|
|
||||||
}
|
|
||||||
|
|
||||||
enum StripeSubscriptionStatus {
|
enum StripeSubscriptionStatus {
|
||||||
ACTIVE
|
ACTIVE
|
||||||
INACTIVE
|
INACTIVE
|
||||||
|
|
@ -67,9 +59,9 @@ model Repo {
|
||||||
repoIndexingStatus RepoIndexingStatus @default(NEW)
|
repoIndexingStatus RepoIndexingStatus @default(NEW)
|
||||||
permittedUsers UserToRepoPermission[]
|
permittedUsers UserToRepoPermission[]
|
||||||
|
|
||||||
permissionSyncStatus RepoPermissionSyncStatus @default(SYNC_NEEDED)
|
permissionSyncJobs RepoPermissionSyncJob[]
|
||||||
/// When the repo permissions were last synced, either successfully or unsuccessfully.
|
/// When the permissions were last synced successfully.
|
||||||
permissionSyncJobLastCompletedAt DateTime?
|
permissionSyncedAt DateTime?
|
||||||
|
|
||||||
// The id of the repo in the external service
|
// The id of the repo in the external service
|
||||||
external_id String
|
external_id String
|
||||||
|
|
@ -87,6 +79,26 @@ model Repo {
|
||||||
@@index([orgId])
|
@@index([orgId])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum RepoPermissionSyncJobStatus {
|
||||||
|
PENDING
|
||||||
|
IN_PROGRESS
|
||||||
|
COMPLETED
|
||||||
|
FAILED
|
||||||
|
}
|
||||||
|
|
||||||
|
model RepoPermissionSyncJob {
|
||||||
|
id String @id @default(cuid())
|
||||||
|
status RepoPermissionSyncJobStatus @default(PENDING)
|
||||||
|
createdAt DateTime @default(now())
|
||||||
|
updatedAt DateTime @updatedAt
|
||||||
|
completedAt DateTime?
|
||||||
|
|
||||||
|
errorMessage String?
|
||||||
|
|
||||||
|
repo Repo @relation(fields: [repoId], references: [id], onDelete: Cascade)
|
||||||
|
repoId Int
|
||||||
|
}
|
||||||
|
|
||||||
model SearchContext {
|
model SearchContext {
|
||||||
id Int @id @default(autoincrement())
|
id Int @id @default(autoincrement())
|
||||||
|
|
||||||
|
|
@ -301,6 +313,29 @@ model User {
|
||||||
|
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
|
|
||||||
|
permissionSyncJobs UserPermissionSyncJob[]
|
||||||
|
permissionSyncedAt DateTime?
|
||||||
|
}
|
||||||
|
|
||||||
|
enum UserPermissionSyncJobStatus {
|
||||||
|
PENDING
|
||||||
|
IN_PROGRESS
|
||||||
|
COMPLETED
|
||||||
|
FAILED
|
||||||
|
}
|
||||||
|
|
||||||
|
model UserPermissionSyncJob {
|
||||||
|
id String @id @default(cuid())
|
||||||
|
status UserPermissionSyncJobStatus @default(PENDING)
|
||||||
|
createdAt DateTime @default(now())
|
||||||
|
updatedAt DateTime @updatedAt
|
||||||
|
completedAt DateTime?
|
||||||
|
|
||||||
|
errorMessage String?
|
||||||
|
|
||||||
|
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||||
|
userId String
|
||||||
}
|
}
|
||||||
|
|
||||||
model UserToRepoPermission {
|
model UserToRepoPermission {
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,17 @@ export const getSSOProviders = (): Provider[] => {
|
||||||
authorization: {
|
authorization: {
|
||||||
url: `${baseUrl}/login/oauth/authorize`,
|
url: `${baseUrl}/login/oauth/authorize`,
|
||||||
params: {
|
params: {
|
||||||
scope: "read:user user:email",
|
scope: [
|
||||||
|
'read:user',
|
||||||
|
'user:email',
|
||||||
|
// Permission syncing requires the `repo` in order to fetch repositories
|
||||||
|
// for the authenticated user.
|
||||||
|
// @see: https://docs.github.com/en/rest/repos/repos?apiVersion=2022-11-28#list-repositories-for-the-authenticated-user
|
||||||
|
...(env.EXPERIMENT_PERMISSION_SYNC_ENABLED === 'true' ?
|
||||||
|
['repo'] :
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
].join(' '),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
token: {
|
token: {
|
||||||
|
|
@ -103,7 +113,7 @@ export const getSSOProviders = (): Provider[] => {
|
||||||
}
|
}
|
||||||
|
|
||||||
const oauth2Client = new OAuth2Client();
|
const oauth2Client = new OAuth2Client();
|
||||||
|
|
||||||
const { pubkeys } = await oauth2Client.getIapPublicKeys();
|
const { pubkeys } = await oauth2Client.getIapPublicKeys();
|
||||||
const ticket = await oauth2Client.verifySignedJwtWithCertsAsync(
|
const ticket = await oauth2Client.verifySignedJwtWithCertsAsync(
|
||||||
iapAssertion,
|
iapAssertion,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue