From 36386662487ac8445a1db005632974ad00d8d5c9 Mon Sep 17 00:00:00 2001 From: bkellam Date: Wed, 17 Sep 2025 20:34:10 -0700 Subject: [PATCH] wip --- packages/backend/src/permissionSyncer.ts | 128 ++++++++++++++---- .../migration.sql | 6 + packages/db/prisma/schema.prisma | 12 ++ .../[...path]/components/codePreviewPanel.tsx | 5 +- .../app/[domain]/browse/[...path]/page.tsx | 3 - .../components/codePreviewPanel/index.tsx | 6 +- .../web/src/app/api/(server)/search/route.ts | 14 +- .../web/src/app/api/(server)/source/route.ts | 14 +- .../review-agent/nodes/fetchFileContent.ts | 3 +- packages/web/src/features/chat/agent.ts | 3 +- packages/web/src/features/chat/tools.ts | 4 +- packages/web/src/features/codeNav/actions.ts | 4 +- .../web/src/features/search/fileSourceApi.ts | 88 ++++++------ 13 files changed, 178 insertions(+), 112 deletions(-) create mode 100644 packages/db/prisma/migrations/20250918030650_add_permission_sync_tracking_to_repo_table/migration.sql diff --git a/packages/backend/src/permissionSyncer.ts b/packages/backend/src/permissionSyncer.ts index bb93d2b0..baf85f05 100644 --- a/packages/backend/src/permissionSyncer.ts +++ b/packages/backend/src/permissionSyncer.ts @@ -1,4 +1,5 @@ -import { PrismaClient } from "@sourcebot/db"; +import * as Sentry from "@sentry/node"; +import { PrismaClient, Repo, RepoPermissionSyncStatus } from "@sourcebot/db"; import { createLogger } from "@sourcebot/logger"; import { BitbucketConnectionConfig } from "@sourcebot/schemas/v3/bitbucket.type"; import { GiteaConnectionConfig } from "@sourcebot/schemas/v3/gitea.type"; @@ -6,6 +7,7 @@ import { GithubConnectionConfig } from "@sourcebot/schemas/v3/github.type"; import { GitlabConnectionConfig } from "@sourcebot/schemas/v3/gitlab.type"; import { Job, Queue, Worker } from 'bullmq'; import { Redis } from 'ioredis'; +import { env } from "./env.js"; import { createOctokitFromConfig, getUserIdsWithReadAccessToRepo } from "./github.js"; import { RepoWithConnections } from "./types.js"; @@ -17,6 +19,8 @@ const QUEUE_NAME = 'repoPermissionSyncQueue'; const logger = createLogger('permission-syncer'); +const SUPPORTED_CODE_HOST_TYPES = ['github']; + export class RepoPermissionSyncer { private queue: Queue; private worker: Worker; @@ -30,36 +34,59 @@ export class RepoPermissionSyncer { }); this.worker = new Worker(QUEUE_NAME, this.runJob.bind(this), { connection: redis, + concurrency: 1, }); this.worker.on('completed', this.onJobCompleted.bind(this)); this.worker.on('failed', this.onJobFailed.bind(this)); } - public async scheduleJob(repoId: number) { - await this.queue.add(QUEUE_NAME, { - repoId, - }); - } - public startScheduler() { logger.debug('Starting scheduler'); - // @todo: we should only sync permissions for a repository if it has been at least ~24 hours since the last sync. return setInterval(async () => { + // @todo: make this configurable + const thresholdDate = new Date(Date.now() - 1000 * 60 * 60 * 24); const repos = await this.db.repo.findMany({ + // Repos need their permissions to be synced against the code host when... where: { - external_codeHostType: { - in: ['github'], - } + // They belong to a code host that supports permissions syncing + AND: [ + { + external_codeHostType: { + in: SUPPORTED_CODE_HOST_TYPES, + } + }, + // and, they either require a sync (SYNC_NEEDED) or have been in a completed state (SYNCED or FAILED) + // for > some duration (default 24 hours) + { + OR: [ + { + permissionSyncStatus: RepoPermissionSyncStatus.SYNC_NEEDED + }, + { + AND: [ + { + OR: [ + { permissionSyncStatus: RepoPermissionSyncStatus.SYNCED }, + { permissionSyncStatus: RepoPermissionSyncStatus.FAILED }, + ] + }, + { + OR: [ + { permissionSyncJobLastCompletedAt: null }, + { permissionSyncJobLastCompletedAt: { lt: thresholdDate } } + ] + } + ] + } + ] + }, + ] } }); - for (const repo of repos) { - await this.scheduleJob(repo.id); - } - - // @todo: make this configurable - }, 1000 * 60); + await this.schedulePermissionSync(repos); + }, 1000 * 30); } public dispose() { @@ -67,11 +94,34 @@ export class RepoPermissionSyncer { this.queue.close(); } + private async schedulePermissionSync(repos: Repo[]) { + await this.db.$transaction(async (tx) => { + await tx.repo.updateMany({ + where: { id: { in: repos.map(repo => repo.id) } }, + data: { permissionSyncStatus: RepoPermissionSyncStatus.IN_SYNC_QUEUE }, + }); + + await this.queue.addBulk(repos.map(repo => ({ + name: 'repoPermissionSyncJob', + data: { + repoId: repo.id, + }, + opts: { + removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE, + removeOnFail: env.REDIS_REMOVE_ON_FAIL, + } + }))) + }); + } + private async runJob(job: Job) { const id = job.data.repoId; - const repo = await this.db.repo.findUnique({ + const repo = await this.db.repo.update({ where: { - id, + id + }, + data: { + permissionSyncStatus: RepoPermissionSyncStatus.SYNCING, }, include: { connections: { @@ -86,6 +136,8 @@ export class RepoPermissionSyncer { throw new Error(`Repo ${id} not found`); } + logger.info(`Syncing permissions for repo ${repo.displayName}...`); + const connection = getFirstConnectionWithToken(repo); if (!connection) { throw new Error(`No connection with token found for repo ${id}`); @@ -119,8 +171,6 @@ export class RepoPermissionSyncer { return []; })(); - logger.info(`User IDs with read access to repo ${id}: ${userIds}`); - await this.db.repo.update({ where: { id: repo.id, @@ -141,11 +191,43 @@ export class RepoPermissionSyncer { } private async onJobCompleted(job: Job) { - logger.info(`Repo permission sync job completed for repo ${job.data.repoId}`); + const repo = await this.db.repo.update({ + where: { + id: job.data.repoId, + }, + data: { + permissionSyncStatus: RepoPermissionSyncStatus.SYNCED, + permissionSyncJobLastCompletedAt: new Date(), + }, + }); + + logger.info(`Permissions synced for repo ${repo.displayName ?? repo.name}`); } private async onJobFailed(job: Job | undefined, err: Error) { - logger.error(`Repo permission sync job failed for repo ${job?.data.repoId}: ${err}`); + Sentry.captureException(err, { + tags: { + repoId: job?.data.repoId, + queue: QUEUE_NAME, + } + }); + + const errorMessage = (repoName: string) => `Repo permission sync job failed for repo ${repoName}: ${err}`; + + if (job) { + const repo = await this.db.repo.update({ + where: { + id: job?.data.repoId, + }, + data: { + permissionSyncStatus: RepoPermissionSyncStatus.FAILED, + permissionSyncJobLastCompletedAt: new Date(), + }, + }); + logger.error(errorMessage(repo.displayName ?? repo.name)); + } else { + logger.error(errorMessage('unknown repo (id not found)')); + } } } diff --git a/packages/db/prisma/migrations/20250918030650_add_permission_sync_tracking_to_repo_table/migration.sql b/packages/db/prisma/migrations/20250918030650_add_permission_sync_tracking_to_repo_table/migration.sql new file mode 100644 index 00000000..ffbe376e --- /dev/null +++ b/packages/db/prisma/migrations/20250918030650_add_permission_sync_tracking_to_repo_table/migration.sql @@ -0,0 +1,6 @@ +-- CreateEnum +CREATE TYPE "RepoPermissionSyncStatus" AS ENUM ('SYNC_NEEDED', 'IN_SYNC_QUEUE', 'SYNCING', 'SYNCED', 'FAILED'); + +-- AlterTable +ALTER TABLE "Repo" ADD COLUMN "permissionSyncJobLastCompletedAt" TIMESTAMP(3), +ADD COLUMN "permissionSyncStatus" "RepoPermissionSyncStatus" NOT NULL DEFAULT 'SYNC_NEEDED'; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index a50a4eb2..feaaff02 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -30,6 +30,14 @@ enum ConnectionSyncStatus { FAILED } +enum RepoPermissionSyncStatus { + SYNC_NEEDED + IN_SYNC_QUEUE + SYNCING + SYNCED + FAILED +} + enum StripeSubscriptionStatus { ACTIVE INACTIVE @@ -59,6 +67,10 @@ model Repo { repoIndexingStatus RepoIndexingStatus @default(NEW) permittedUsers UserToRepoPermission[] + permissionSyncStatus RepoPermissionSyncStatus @default(SYNC_NEEDED) + /// When the repo permissions were last synced, either successfully or unsuccessfully. + permissionSyncJobLastCompletedAt DateTime? + // The id of the repo in the external service external_id String // The type of the external service (e.g., github, gitlab, etc.) diff --git a/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx b/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx index 091122f8..01a84447 100644 --- a/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx +++ b/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx @@ -10,16 +10,15 @@ interface CodePreviewPanelProps { path: string; repoName: string; revisionName?: string; - domain: string; } -export const CodePreviewPanel = async ({ path, repoName, revisionName, domain }: CodePreviewPanelProps) => { +export const CodePreviewPanel = async ({ path, repoName, revisionName }: CodePreviewPanelProps) => { const [fileSourceResponse, repoInfoResponse] = await Promise.all([ getFileSource({ fileName: path, repository: repoName, branch: revisionName, - }, domain), + }), getRepoInfoByName(repoName), ]); diff --git a/packages/web/src/app/[domain]/browse/[...path]/page.tsx b/packages/web/src/app/[domain]/browse/[...path]/page.tsx index 20e54327..84c87912 100644 --- a/packages/web/src/app/[domain]/browse/[...path]/page.tsx +++ b/packages/web/src/app/[domain]/browse/[...path]/page.tsx @@ -7,7 +7,6 @@ import { TreePreviewPanel } from "./components/treePreviewPanel"; interface BrowsePageProps { params: Promise<{ path: string[]; - domain: string; }>; } @@ -16,7 +15,6 @@ export default async function BrowsePage(props: BrowsePageProps) { const { path: _rawPath, - domain } = params; const rawPath = _rawPath.join('/'); @@ -35,7 +33,6 @@ export default async function BrowsePage(props: BrowsePageProps) { path={path} repoName={repoName} revisionName={revisionName} - domain={domain} /> ) : ( { - const domain = useDomain(); // If there are multiple branches pointing to the same revision of this file, it doesn't // matter which branch we use here, so use the first one. @@ -31,13 +29,13 @@ export const CodePreviewPanel = ({ }, [previewedFile]); const { data: file, isLoading, isPending, isError } = useQuery({ - queryKey: ["source", previewedFile, branch, domain], + queryKey: ["source", previewedFile, branch], queryFn: () => unwrapServiceError( getFileSource({ fileName: previewedFile.fileName.text, repository: previewedFile.repository, branch, - }, domain) + }) ), select: (data) => { return { diff --git a/packages/web/src/app/api/(server)/search/route.ts b/packages/web/src/app/api/(server)/search/route.ts index 145d3fa9..83a5e6a0 100644 --- a/packages/web/src/app/api/(server)/search/route.ts +++ b/packages/web/src/app/api/(server)/search/route.ts @@ -5,20 +5,8 @@ import { isServiceError } from "@/lib/utils"; import { NextRequest } from "next/server"; import { schemaValidationError, serviceErrorResponse } from "@/lib/serviceError"; import { searchRequestSchema } from "@/features/search/schemas"; -import { ErrorCode } from "@/lib/errorCodes"; -import { StatusCodes } from "http-status-codes"; export const POST = async (request: NextRequest) => { - const domain = request.headers.get("X-Org-Domain"); - const apiKey = request.headers.get("X-Sourcebot-Api-Key") ?? undefined; - if (!domain) { - return serviceErrorResponse({ - statusCode: StatusCodes.BAD_REQUEST, - errorCode: ErrorCode.MISSING_ORG_DOMAIN_HEADER, - message: "Missing X-Org-Domain header", - }); - } - const body = await request.json(); const parsed = await searchRequestSchema.safeParseAsync(body); if (!parsed.success) { @@ -27,7 +15,7 @@ export const POST = async (request: NextRequest) => { ); } - const response = await search(parsed.data, domain, apiKey); + const response = await search(parsed.data); if (isServiceError(response)) { return serviceErrorResponse(response); } diff --git a/packages/web/src/app/api/(server)/source/route.ts b/packages/web/src/app/api/(server)/source/route.ts index a6364b36..d64d701d 100644 --- a/packages/web/src/app/api/(server)/source/route.ts +++ b/packages/web/src/app/api/(server)/source/route.ts @@ -5,20 +5,8 @@ import { schemaValidationError, serviceErrorResponse } from "@/lib/serviceError" import { isServiceError } from "@/lib/utils"; import { NextRequest } from "next/server"; import { fileSourceRequestSchema } from "@/features/search/schemas"; -import { ErrorCode } from "@/lib/errorCodes"; -import { StatusCodes } from "http-status-codes"; export const POST = async (request: NextRequest) => { - const domain = request.headers.get("X-Org-Domain"); - const apiKey = request.headers.get("X-Sourcebot-Api-Key") ?? undefined; - if (!domain) { - return serviceErrorResponse({ - statusCode: StatusCodes.BAD_REQUEST, - errorCode: ErrorCode.MISSING_ORG_DOMAIN_HEADER, - message: "Missing X-Org-Domain header", - }); - } - const body = await request.json(); const parsed = await fileSourceRequestSchema.safeParseAsync(body); if (!parsed.success) { @@ -27,7 +15,7 @@ export const POST = async (request: NextRequest) => { ); } - const response = await getFileSource(parsed.data, domain, apiKey); + const response = await getFileSource(parsed.data); if (isServiceError(response)) { return serviceErrorResponse(response); } diff --git a/packages/web/src/features/agents/review-agent/nodes/fetchFileContent.ts b/packages/web/src/features/agents/review-agent/nodes/fetchFileContent.ts index 06cc7a44..7617c959 100644 --- a/packages/web/src/features/agents/review-agent/nodes/fetchFileContent.ts +++ b/packages/web/src/features/agents/review-agent/nodes/fetchFileContent.ts @@ -2,7 +2,6 @@ import { sourcebot_context, sourcebot_pr_payload } from "@/features/agents/revie import { getFileSource } from "@/features/search/fileSourceApi"; import { fileSourceResponseSchema } from "@/features/search/schemas"; import { isServiceError } from "@/lib/utils"; -import { env } from "@/env.mjs"; import { createLogger } from "@sourcebot/logger"; const logger = createLogger('fetch-file-content'); @@ -17,7 +16,7 @@ export const fetchFileContent = async (pr_payload: sourcebot_pr_payload, filenam } logger.debug(JSON.stringify(fileSourceRequest, null, 2)); - const response = await getFileSource(fileSourceRequest, "~", env.REVIEW_AGENT_API_KEY); + const response = await getFileSource(fileSourceRequest); if (isServiceError(response)) { throw new Error(`Failed to fetch file content for ${filename} from ${repoPath}: ${response.message}`); } diff --git a/packages/web/src/features/chat/agent.ts b/packages/web/src/features/chat/agent.ts index 7010151f..7df5a6d4 100644 --- a/packages/web/src/features/chat/agent.ts +++ b/packages/web/src/features/chat/agent.ts @@ -1,6 +1,5 @@ import { env } from "@/env.mjs"; import { getFileSource } from "@/features/search/fileSourceApi"; -import { SINGLE_TENANT_ORG_DOMAIN } from "@/lib/constants"; import { isServiceError } from "@/lib/utils"; import { ProviderOptions } from "@ai-sdk/provider-utils"; import { createLogger } from "@sourcebot/logger"; @@ -252,7 +251,7 @@ const resolveFileSource = async ({ path, repo, revision }: FileSource) => { repository: repo, branch: revision, // @todo: handle multi-tenancy. - }, SINGLE_TENANT_ORG_DOMAIN); + }); if (isServiceError(fileSource)) { // @todo: handle this diff --git a/packages/web/src/features/chat/tools.ts b/packages/web/src/features/chat/tools.ts index 608c1213..f69c5f34 100644 --- a/packages/web/src/features/chat/tools.ts +++ b/packages/web/src/features/chat/tools.ts @@ -114,7 +114,7 @@ export const readFilesTool = tool({ repository, branch: revision, // @todo(mt): handle multi-tenancy. - }, SINGLE_TENANT_ORG_DOMAIN); + }); })); if (responses.some(isServiceError)) { @@ -187,7 +187,7 @@ Multiple expressions can be or'd together with or, negated with -, or grouped wi contextLines: 3, whole: false, // @todo(mt): handle multi-tenancy. - }, SINGLE_TENANT_ORG_DOMAIN); + }); if (isServiceError(response)) { return response; diff --git a/packages/web/src/features/codeNav/actions.ts b/packages/web/src/features/codeNav/actions.ts index f342de6e..b55cfa30 100644 --- a/packages/web/src/features/codeNav/actions.ts +++ b/packages/web/src/features/codeNav/actions.ts @@ -34,7 +34,7 @@ export const findSearchBasedSymbolReferences = async ( query, matches: MAX_REFERENCE_COUNT, contextLines: 0, - }, domain); + }); if (isServiceError(searchResult)) { return searchResult; @@ -67,7 +67,7 @@ export const findSearchBasedSymbolDefinitions = async ( query, matches: MAX_REFERENCE_COUNT, contextLines: 0, - }, domain); + }); if (isServiceError(searchResult)) { return searchResult; diff --git a/packages/web/src/features/search/fileSourceApi.ts b/packages/web/src/features/search/fileSourceApi.ts index aa3ae07a..249abb42 100644 --- a/packages/web/src/features/search/fileSourceApi.ts +++ b/packages/web/src/features/search/fileSourceApi.ts @@ -5,60 +5,58 @@ import { fileNotFound, ServiceError, unexpectedError } from "../../lib/serviceEr import { FileSourceRequest, FileSourceResponse } from "./types"; import { isServiceError } from "../../lib/utils"; import { search } from "./searchApi"; -import { sew, withAuth, withOrgMembership } from "@/actions"; -import { OrgRole } from "@sourcebot/db"; +import { sew } from "@/actions"; +import { withOptionalAuthV2 } from "@/withAuthV2"; // @todo (bkellam) : We should really be using `git show :` to fetch file contents here. // This will allow us to support permalinks to files at a specific revision that may not be indexed // by zoekt. -export const getFileSource = async ({ fileName, repository, branch }: FileSourceRequest, domain: string, apiKey: string | undefined = undefined): Promise => sew(() => - withAuth((userId, _apiKeyHash) => - withOrgMembership(userId, domain, async () => { - const escapedFileName = escapeStringRegexp(fileName); - const escapedRepository = escapeStringRegexp(repository); +export const getFileSource = async ({ fileName, repository, branch }: FileSourceRequest): Promise => sew(() => + withOptionalAuthV2(async () => { + const escapedFileName = escapeStringRegexp(fileName); + const escapedRepository = escapeStringRegexp(repository); - let query = `file:${escapedFileName} repo:^${escapedRepository}$`; - if (branch) { - query = query.concat(` branch:${branch}`); - } + let query = `file:${escapedFileName} repo:^${escapedRepository}$`; + if (branch) { + query = query.concat(` branch:${branch}`); + } - const searchResponse = await search({ - query, - matches: 1, - whole: true, - }, domain, apiKey); + const searchResponse = await search({ + query, + matches: 1, + whole: true, + }); - if (isServiceError(searchResponse)) { - return searchResponse; - } + if (isServiceError(searchResponse)) { + return searchResponse; + } - const files = searchResponse.files; + const files = searchResponse.files; - if (!files || files.length === 0) { - return fileNotFound(fileName, repository); - } + if (!files || files.length === 0) { + return fileNotFound(fileName, repository); + } - const file = files[0]; - const source = file.content ?? ''; - const language = file.language; + const file = files[0]; + const source = file.content ?? ''; + const language = file.language; - const repoInfo = searchResponse.repositoryInfo.find((repo) => repo.id === file.repositoryId); - if (!repoInfo) { - // This should never happen. - return unexpectedError("Repository info not found"); - } - - return { - source, - language, - path: fileName, - repository, - repositoryCodeHostType: repoInfo.codeHostType, - repositoryDisplayName: repoInfo.displayName, - repositoryWebUrl: repoInfo.webUrl, - branch, - webUrl: file.webUrl, - } satisfies FileSourceResponse; + const repoInfo = searchResponse.repositoryInfo.find((repo) => repo.id === file.repositoryId); + if (!repoInfo) { + // This should never happen. + return unexpectedError("Repository info not found"); + } - }, /* minRequiredRole = */ OrgRole.GUEST), /* allowAnonymousAccess = */ true, apiKey ? { apiKey, domain } : undefined) -); + return { + source, + language, + path: fileName, + repository, + repositoryCodeHostType: repoInfo.codeHostType, + repositoryDisplayName: repoInfo.displayName, + repositoryWebUrl: repoInfo.webUrl, + branch, + webUrl: file.webUrl, + } satisfies FileSourceResponse; + + }));