diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4b2ea48d..e9bf79c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,10 +7,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
+### Fixed
+- Fixed spurious infinite loads with explore panel, file tree, and file search command. [#617](https://github.com/sourcebot-dev/sourcebot/pull/617)
+- Wipe search context on init if entitlement no longer exists [#618](https://github.com/sourcebot-dev/sourcebot/pull/618)
+- Fixed review agent so that it works with GHES instances [#611](https://github.com/sourcebot-dev/sourcebot/pull/611)
+
+## [4.9.2] - 2025-11-13
+
+### Changed
+- Bumped the default requested search result count from 5k to 10k after optimization pass. [#615](https://github.com/sourcebot-dev/sourcebot/pull/615)
+
### Fixed
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)
-- Fixed review agent so that it works with GHES instances [#611](https://github.com/sourcebot-dev/sourcebot/pull/611)
+- Fixed connection sync jobs getting stuck in pending or in progress after restarting the worker. [#612](https://github.com/sourcebot-dev/sourcebot/pull/612)
+- Fixed issue where connections would always sync on startup, regardless if they changed or not. [#613](https://github.com/sourcebot-dev/sourcebot/pull/613)
+- Fixed performance bottleneck in search api. Result is a order of magnitutde improvement to average search time according to benchmarks. [#615](https://github.com/sourcebot-dev/sourcebot/pull/615)
+
+### Added
+- Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)
+- Added environment variable to configure default search result count. [#616](https://github.com/sourcebot-dev/sourcebot/pull/616)
## [4.9.1] - 2025-11-07
diff --git a/docs/docs/configuration/environment-variables.mdx b/docs/docs/configuration/environment-variables.mdx
index fcf90eb4..87167858 100644
--- a/docs/docs/configuration/environment-variables.mdx
+++ b/docs/docs/configuration/environment-variables.mdx
@@ -34,6 +34,7 @@ The following environment variables allow you to configure your Sourcebot deploy
| `SOURCEBOT_STRUCTURED_LOGGING_ENABLED` | `false` |
Enables/disable structured JSON logging. See [this doc](/docs/configuration/structured-logging) for more info.
|
| `SOURCEBOT_STRUCTURED_LOGGING_FILE` | - |
Optional file to log to if structured logging is enabled
|
| `SOURCEBOT_TELEMETRY_DISABLED` | `false` |
Enables/disables telemetry collection in Sourcebot. See [this doc](/docs/overview.mdx#telemetry) for more info.
|
+| `DEFAULT_MAX_MATCH_COUNT` | `10000` |
The default maximum number of search results to return when using search in the web app.
|
### Enterprise Environment Variables
| Variable | Default | Description |
diff --git a/packages/backend/package.json b/packages/backend/package.json
index 5369bde1..a8ffb315 100644
--- a/packages/backend/package.json
+++ b/packages/backend/package.json
@@ -40,6 +40,8 @@
"cross-fetch": "^4.0.0",
"dotenv": "^16.4.5",
"express": "^4.21.2",
+ "express-async-errors": "^3.1.1",
+ "fast-deep-equal": "^3.1.3",
"git-url-parse": "^16.1.0",
"gitea-js": "^1.22.0",
"glob": "^11.0.0",
diff --git a/packages/backend/src/api.ts b/packages/backend/src/api.ts
new file mode 100644
index 00000000..5c7e2547
--- /dev/null
+++ b/packages/backend/src/api.ts
@@ -0,0 +1,103 @@
+import { PrismaClient, RepoIndexingJobType } from '@sourcebot/db';
+import { createLogger } from '@sourcebot/shared';
+import express, { Request, Response } from 'express';
+import 'express-async-errors';
+import * as http from "http";
+import z from 'zod';
+import { ConnectionManager } from './connectionManager.js';
+import { PromClient } from './promClient.js';
+import { RepoIndexManager } from './repoIndexManager.js';
+
+const logger = createLogger('api');
+const PORT = 3060;
+
+export class Api {
+ private server: http.Server;
+
+ constructor(
+ promClient: PromClient,
+ private prisma: PrismaClient,
+ private connectionManager: ConnectionManager,
+ private repoIndexManager: RepoIndexManager,
+ ) {
+ const app = express();
+ app.use(express.json());
+ app.use(express.urlencoded({ extended: true }));
+
+ // Prometheus metrics endpoint
+ app.use('/metrics', async (_req: Request, res: Response) => {
+ res.set('Content-Type', promClient.registry.contentType);
+ const metrics = await promClient.registry.metrics();
+ res.end(metrics);
+ });
+
+ app.post('/api/sync-connection', this.syncConnection.bind(this));
+ app.post('/api/index-repo', this.indexRepo.bind(this));
+
+ this.server = app.listen(PORT, () => {
+ logger.info(`API server is running on port ${PORT}`);
+ });
+ }
+
+ private async syncConnection(req: Request, res: Response) {
+ const schema = z.object({
+ connectionId: z.number(),
+ }).strict();
+
+ const parsed = schema.safeParse(req.body);
+ if (!parsed.success) {
+ res.status(400).json({ error: parsed.error.message });
+ return;
+ }
+
+ const { connectionId } = parsed.data;
+ const connection = await this.prisma.connection.findUnique({
+ where: {
+ id: connectionId,
+ }
+ });
+
+ if (!connection) {
+ res.status(404).json({ error: 'Connection not found' });
+ return;
+ }
+
+ const [jobId] = await this.connectionManager.createJobs([connection]);
+
+ res.status(200).json({ jobId });
+ }
+
+ private async indexRepo(req: Request, res: Response) {
+ const schema = z.object({
+ repoId: z.number(),
+ }).strict();
+
+ const parsed = schema.safeParse(req.body);
+ if (!parsed.success) {
+ res.status(400).json({ error: parsed.error.message });
+ return;
+ }
+
+ const { repoId } = parsed.data;
+ const repo = await this.prisma.repo.findUnique({
+ where: { id: repoId },
+ });
+
+ if (!repo) {
+ res.status(404).json({ error: 'Repo not found' });
+ return;
+ }
+
+ const [jobId] = await this.repoIndexManager.createJobs([repo], RepoIndexingJobType.INDEX);
+ res.status(200).json({ jobId });
+ }
+
+ public async dispose() {
+ return new Promise((resolve, reject) => {
+ this.server.close((err) => {
+ if (err) reject(err);
+ else resolve(undefined);
+ });
+ });
+ }
+}
\ No newline at end of file
diff --git a/packages/backend/src/configManager.ts b/packages/backend/src/configManager.ts
index 55dbd6ed..fed01863 100644
--- a/packages/backend/src/configManager.ts
+++ b/packages/backend/src/configManager.ts
@@ -6,6 +6,7 @@ import chokidar, { FSWatcher } from 'chokidar';
import { ConnectionManager } from "./connectionManager.js";
import { SINGLE_TENANT_ORG_ID } from "./constants.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
+import isEqual from 'fast-deep-equal';
const logger = createLogger('config-manager');
@@ -64,8 +65,8 @@ export class ConfigManager {
const existingConnectionConfig = existingConnection ? existingConnection.config as unknown as ConnectionConfig : undefined;
const connectionNeedsSyncing =
- !existingConnection ||
- (JSON.stringify(existingConnectionConfig) !== JSON.stringify(newConnectionConfig));
+ !existingConnectionConfig ||
+ !isEqual(existingConnectionConfig, newConnectionConfig);
// Either update the existing connection or create a new one.
const connection = existingConnection ?
@@ -93,8 +94,8 @@ export class ConfigManager {
});
if (connectionNeedsSyncing) {
- const [jobId] = await this.connectionManager.createJobs([connection]);
- logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`);
+ logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Creating sync job.`);
+ await this.connectionManager.createJobs([connection]);
}
}
}
diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts
index ee17543a..bfb414df 100644
--- a/packages/backend/src/connectionManager.ts
+++ b/packages/backend/src/connectionManager.ts
@@ -11,10 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js";
+import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js";
const LOG_TAG = 'connection-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
+const QUEUE_NAME = 'connection-sync-queue';
type JobPayload = {
jobId: string,
@@ -30,19 +32,19 @@ type JobResult = {
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout
export class ConnectionManager {
- private worker: Worker;
+ private worker: Worker;
private queue: Queue;
private interval?: NodeJS.Timeout;
constructor(
private db: PrismaClient,
private settings: Settings,
- redis: Redis,
+ private redis: Redis,
private promClient: PromClient,
) {
this.queue = new Queue({
redis,
- namespace: 'connection-sync-queue',
+ namespace: QUEUE_NAME,
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
@@ -62,6 +64,10 @@ export class ConnectionManager {
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
+ // graceful-timeout is triggered when a job is still processing after
+ // worker.close() is called and the timeout period has elapsed. In this case,
+ // we fail the job with no retry.
+ this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
}
public startScheduler() {
@@ -128,6 +134,7 @@ export class ConnectionManager {
});
for (const job of jobs) {
+ logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`);
await this.queue.add({
groupId: `connection:${job.connectionId}`,
data: {
@@ -150,6 +157,22 @@ export class ConnectionManager {
const logger = createJobLogger(jobId);
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
+ const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({
+ where: {
+ id: jobId,
+ },
+ select: {
+ status: true,
+ }
+ });
+
+ // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
+ // is in an invalid state and should be skipped.
+ if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) {
+ throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
+ }
+
+
this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName });
this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName });
@@ -178,7 +201,7 @@ export class ConnectionManager {
const result = await (async () => {
switch (config.type) {
case 'github': {
- return await compileGithubConfig(config, job.data.connectionId, abortController);
+ return await compileGithubConfig(config, job.data.connectionId, abortController.signal);
}
case 'gitlab': {
return await compileGitlabConfig(config, job.data.connectionId);
@@ -200,7 +223,7 @@ export class ConnectionManager {
}
}
})();
-
+
let { repoData, warnings } = result;
await this.db.connectionSyncJob.update({
@@ -383,6 +406,33 @@ export class ConnectionManager {
});
});
+ private onJobGracefulTimeout = async (job: Job) =>
+ groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
+ const logger = createJobLogger(job.id);
+
+ const { connection } = await this.db.connectionSyncJob.update({
+ where: { id: job.id },
+ data: {
+ status: ConnectionSyncJobStatus.FAILED,
+ completedAt: new Date(),
+ errorMessage: 'Job timed out',
+ },
+ select: {
+ connection: true,
+ }
+ });
+
+ this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
+ this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });
+
+ logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`);
+
+ captureEvent('backend_connection_sync_job_failed', {
+ connectionId: connection.id,
+ error: 'Job timed out',
+ });
+ });
+
private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Connection syncer worker error.`, error);
@@ -392,8 +442,28 @@ export class ConnectionManager {
if (this.interval) {
clearInterval(this.interval);
}
- await this.worker.close();
- await this.queue.close();
+
+ const inProgressJobs = this.worker.getCurrentJobs();
+ await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);
+
+ // Manually release group locks for in progress jobs to prevent deadlocks.
+ // @see: https://github.com/Openpanel-dev/groupmq/issues/8
+ for (const { job } of inProgressJobs) {
+ const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`;
+ logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
+ try {
+ await this.redis.del(lockKey);
+ } catch (error) {
+ Sentry.captureException(error);
+ logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error);
+ }
+ }
+
+ // @note: As of groupmq v1.0.0, queue.close() will just close the underlying
+ // redis connection. Since we share the same redis client between, skip this
+ // step and close the redis client directly in index.ts.
+ // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900
+ // await this.queue.close();
}
}
diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts
index a52d822e..b11f5102 100644
--- a/packages/backend/src/constants.ts
+++ b/packages/backend/src/constants.ts
@@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [
];
export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos');
-export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
\ No newline at end of file
+export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
+
+// Maximum time to wait for current job to finish
+export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds
+
+// List of shutdown signals
+export const SHUTDOWN_SIGNALS: string[] = [
+ 'SIGHUP',
+ 'SIGINT',
+ 'SIGQUIT',
+ 'SIGILL',
+ 'SIGTRAP',
+ 'SIGABRT',
+ 'SIGBUS',
+ 'SIGFPE',
+ 'SIGSEGV',
+ 'SIGUSR2',
+ 'SIGTERM',
+ // @note: SIGKILL and SIGSTOP cannot have listeners installed.
+ // @see: https://nodejs.org/api/process.html#signal-events
+];
diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts
index be0ddb01..c3674834 100644
--- a/packages/backend/src/index.ts
+++ b/packages/backend/src/index.ts
@@ -1,20 +1,22 @@
import "./instrument.js";
+import * as Sentry from "@sentry/node";
import { PrismaClient } from "@sourcebot/db";
-import { createLogger } from "@sourcebot/shared";
-import { env, getConfigSettings, hasEntitlement, getDBConnectionString } from '@sourcebot/shared';
+import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
+import 'express-async-errors';
import { existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { Redis } from 'ioredis';
+import { Api } from "./api.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
-import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
+import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js';
+import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { GithubAppManager } from "./ee/githubAppManager.js";
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
-import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
+import { shutdownPosthog } from "./posthog.js";
import { PromClient } from './promClient.js';
import { RepoIndexManager } from "./repoIndexManager.js";
-import { shutdownPosthog } from "./posthog.js";
const logger = createLogger('backend-entrypoint');
@@ -40,13 +42,14 @@ const prisma = new PrismaClient({
const redis = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null
});
-redis.ping().then(() => {
+
+try {
+ await redis.ping();
logger.info('Connected to redis');
-}).catch((err: unknown) => {
- logger.error('Failed to connect to redis');
- logger.error(err);
+} catch (err: unknown) {
+ logger.error('Failed to connect to redis. Error:', err);
process.exit(1);
-});
+}
const promClient = new PromClient();
@@ -74,47 +77,74 @@ else if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && hasEntitlement(
accountPermissionSyncer.startScheduler();
}
+const api = new Api(
+ promClient,
+ prisma,
+ connectionManager,
+ repoIndexManager,
+);
+
logger.info('Worker started.');
-const cleanup = async (signal: string) => {
- logger.info(`Received ${signal}, cleaning up...`);
+const listenToShutdownSignals = () => {
+ const signals = SHUTDOWN_SIGNALS;
- const shutdownTimeout = 30000; // 30 seconds
+ let receivedSignal = false;
- try {
- await Promise.race([
- Promise.all([
- repoIndexManager.dispose(),
- connectionManager.dispose(),
- repoPermissionSyncer.dispose(),
- accountPermissionSyncer.dispose(),
- promClient.dispose(),
- configManager.dispose(),
- ]),
- new Promise((_, reject) =>
- setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
- )
- ]);
- logger.info('All workers shut down gracefully');
- } catch (error) {
- logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error));
+ const cleanup = async (signal: string) => {
+ try {
+ if (receivedSignal) {
+ logger.debug(`Recieved repeat signal ${signal}, ignoring.`);
+ return;
+ }
+ receivedSignal = true;
+
+ logger.info(`Received ${signal}, cleaning up...`);
+
+ await repoIndexManager.dispose()
+ await connectionManager.dispose()
+ await repoPermissionSyncer.dispose()
+ await accountPermissionSyncer.dispose()
+ await configManager.dispose()
+
+ await prisma.$disconnect();
+ await redis.quit();
+ await api.dispose();
+ await shutdownPosthog();
+
+
+ logger.info('All workers shut down gracefully');
+ signals.forEach(sig => process.removeListener(sig, cleanup));
+ } catch (error) {
+ Sentry.captureException(error);
+ logger.error('Error shutting down worker:', error);
+ }
}
- await prisma.$disconnect();
- await redis.quit();
- await shutdownPosthog();
+ signals.forEach(signal => {
+ process.on(signal, (err) => {
+ cleanup(err).finally(() => {
+ process.kill(process.pid, signal);
+ });
+ });
+ });
+
+ // Register handlers for uncaught exceptions and unhandled rejections
+ process.on('uncaughtException', (err) => {
+ logger.error(`Uncaught exception: ${err.message}`);
+ cleanup('uncaughtException').finally(() => {
+ process.exit(1);
+ });
+ });
+
+ process.on('unhandledRejection', (reason, promise) => {
+ logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
+ cleanup('unhandledRejection').finally(() => {
+ process.exit(1);
+ });
+ });
+
+
}
-process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0)));
-process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0)));
-
-// Register handlers for uncaught exceptions and unhandled rejections
-process.on('uncaughtException', (err) => {
- logger.error(`Uncaught exception: ${err.message}`);
- cleanup('uncaughtException').finally(() => process.exit(1));
-});
-
-process.on('unhandledRejection', (reason, promise) => {
- logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
- cleanup('unhandledRejection').finally(() => process.exit(1));
-});
+listenToShutdownSignals();
diff --git a/packages/backend/src/promClient.ts b/packages/backend/src/promClient.ts
index 2fa7718f..7beaac84 100644
--- a/packages/backend/src/promClient.ts
+++ b/packages/backend/src/promClient.ts
@@ -1,14 +1,6 @@
-import express, { Request, Response } from 'express';
-import { Server } from 'http';
import client, { Registry, Counter, Gauge } from 'prom-client';
-import { createLogger } from "@sourcebot/shared";
-
-const logger = createLogger('prometheus-client');
-
export class PromClient {
- private registry: Registry;
- private app: express.Application;
- private server: Server;
+ public registry: Registry;
public activeRepoIndexJobs: Gauge;
public pendingRepoIndexJobs: Gauge;
@@ -22,8 +14,6 @@ export class PromClient {
public connectionSyncJobFailTotal: Counter;
public connectionSyncJobSuccessTotal: Counter;
- public readonly PORT = 3060;
-
constructor() {
this.registry = new Registry();
@@ -100,26 +90,5 @@ export class PromClient {
client.collectDefaultMetrics({
register: this.registry,
});
-
- this.app = express();
- this.app.get('/metrics', async (req: Request, res: Response) => {
- res.set('Content-Type', this.registry.contentType);
-
- const metrics = await this.registry.metrics();
- res.end(metrics);
- });
-
- this.server = this.app.listen(this.PORT, () => {
- logger.info(`Prometheus metrics server is running on port ${this.PORT}`);
- });
- }
-
- async dispose() {
- return new Promise((resolve, reject) => {
- this.server.close((err) => {
- if (err) reject(err);
- else resolve();
- });
- });
}
}
\ No newline at end of file
diff --git a/packages/backend/src/repoCompileUtils.ts b/packages/backend/src/repoCompileUtils.ts
index 10c748a8..5b2c0349 100644
--- a/packages/backend/src/repoCompileUtils.ts
+++ b/packages/backend/src/repoCompileUtils.ts
@@ -39,8 +39,8 @@ type CompileResult = {
export const compileGithubConfig = async (
config: GithubConnectionConfig,
connectionId: number,
- abortController: AbortController): Promise => {
- const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal);
+ signal: AbortSignal): Promise => {
+ const gitHubReposResult = await getGitHubReposFromConfig(config, signal);
const gitHubRepos = gitHubReposResult.repos;
const warnings = gitHubReposResult.warnings;
diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts
index 6be40d70..5a576d05 100644
--- a/packages/backend/src/repoIndexManager.ts
+++ b/packages/backend/src/repoIndexManager.ts
@@ -7,7 +7,7 @@ import { readdir, rm } from 'fs/promises';
import { Job, Queue, ReservedJob, Worker } from "groupmq";
import { Redis } from 'ioredis';
import micromatch from 'micromatch';
-import { INDEX_CACHE_DIR } from './constants.js';
+import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS, INDEX_CACHE_DIR } from './constants.js';
import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName, getTags, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
import { captureEvent } from './posthog.js';
import { PromClient } from './promClient.js';
@@ -45,7 +45,7 @@ export class RepoIndexManager {
constructor(
private db: PrismaClient,
private settings: Settings,
- redis: Redis,
+ private redis: Redis,
private promClient: PromClient,
) {
this.queue = new Queue({
@@ -70,6 +70,10 @@ export class RepoIndexManager {
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
+ // graceful-timeout is triggered when a job is still processing after
+ // worker.close() is called and the timeout period has elapsed. In this case,
+ // we fail the job with no retry.
+ this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
}
public startScheduler() {
@@ -192,7 +196,7 @@ export class RepoIndexManager {
}
}
- private async createJobs(repos: Repo[], type: RepoIndexingJobType) {
+ public async createJobs(repos: Repo[], type: RepoIndexingJobType) {
// @note: we don't perform this in a transaction because
// we want to avoid the situation where a job is created and run
// prior to the transaction being committed.
@@ -221,6 +225,8 @@ export class RepoIndexManager {
const jobTypeLabel = getJobTypePrometheusLabel(type);
this.promClient.pendingRepoIndexJobs.inc({ repo: job.repo.name, type: jobTypeLabel });
}
+
+ return jobs.map(job => job.id);
}
private async runJob(job: ReservedJob) {
@@ -228,6 +234,23 @@ export class RepoIndexManager {
const logger = createJobLogger(id);
logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
+ const currentStatus = await this.db.repoIndexingJob.findUniqueOrThrow({
+ where: {
+ id,
+ },
+ select: {
+ status: true,
+ }
+ });
+
+ // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
+ // is in an invalid state and should be skipped.
+ if (
+ currentStatus.status !== RepoIndexingJobStatus.PENDING &&
+ currentStatus.status !== RepoIndexingJobStatus.IN_PROGRESS
+ ) {
+ throw new Error(`Job ${id} is not in a valid state. Expected: ${RepoIndexingJobStatus.PENDING} or ${RepoIndexingJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
+ }
const { repo, type: jobType } = await this.db.repoIndexingJob.update({
where: {
@@ -538,6 +561,28 @@ export class RepoIndexManager {
logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`);
});
+ private onJobGracefulTimeout = async (job: Job) =>
+ groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
+ const logger = createJobLogger(job.data.jobId);
+ const jobTypeLabel = getJobTypePrometheusLabel(job.data.type);
+
+ const { repo } = await this.db.repoIndexingJob.update({
+ where: { id: job.data.jobId },
+ data: {
+ status: RepoIndexingJobStatus.FAILED,
+ completedAt: new Date(),
+ errorMessage: 'Job timed out',
+ },
+ select: { repo: true }
+ });
+
+ this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel });
+ this.promClient.repoIndexJobFailTotal.inc({ repo: job.data.repoName, type: jobTypeLabel });
+
+ logger.error(`Job ${job.data.jobId} timed out for repo ${repo.name} (id: ${repo.id}). Failing job.`);
+
+ });
+
private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Index syncer worker error.`, error);
@@ -547,8 +592,20 @@ export class RepoIndexManager {
if (this.interval) {
clearInterval(this.interval);
}
- await this.worker.close();
- await this.queue.close();
+ const inProgressJobs = this.worker.getCurrentJobs();
+ await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);
+ // Manually release group locks for in progress jobs to prevent deadlocks.
+ // @see: https://github.com/Openpanel-dev/groupmq/issues/8
+ for (const { job } of inProgressJobs) {
+ const lockKey = `groupmq:repo-index-queue:lock:${job.groupId}`;
+ logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
+ await this.redis.del(lockKey);
+ }
+
+ // @note: As of groupmq v1.0.0, queue.close() will just close the underlying
+ // redis connection. Since we share the same redis client between, skip this
+ // step and close the redis client directly in index.ts.
+ // await this.queue.close();
}
}
diff --git a/packages/shared/src/env.server.ts b/packages/shared/src/env.server.ts
index 87e48758..919a5884 100644
--- a/packages/shared/src/env.server.ts
+++ b/packages/shared/src/env.server.ts
@@ -216,6 +216,9 @@ export const env = createEnv({
SOURCEBOT_LOG_LEVEL: z.enum(["info", "debug", "warn", "error"]).default("info"),
SOURCEBOT_STRUCTURED_LOGGING_ENABLED: booleanSchema.default("false"),
SOURCEBOT_STRUCTURED_LOGGING_FILE: z.string().optional(),
+
+ // Configure the default maximum number of search results to return by default.
+ DEFAULT_MAX_MATCH_COUNT: numberSchema.default(10_000),
},
runtimeEnv,
emptyStringAsUndefined: true,
diff --git a/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx b/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx
index b38d140b..7fc2af07 100644
--- a/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx
+++ b/packages/web/src/app/[domain]/browse/[...path]/components/codePreviewPanel.tsx
@@ -1,10 +1,10 @@
import { getRepoInfoByName } from "@/actions";
import { PathHeader } from "@/app/[domain]/components/pathHeader";
import { Separator } from "@/components/ui/separator";
-import { getFileSource } from "@/features/search/fileSourceApi";
import { cn, getCodeHostInfoForRepo, isServiceError } from "@/lib/utils";
import Image from "next/image";
import { PureCodePreviewPanel } from "./pureCodePreviewPanel";
+import { getFileSource } from "@/features/search/fileSourceApi";
interface CodePreviewPanelProps {
path: string;
diff --git a/packages/web/src/app/[domain]/browse/[...path]/components/pureTreePreviewPanel.tsx b/packages/web/src/app/[domain]/browse/[...path]/components/pureTreePreviewPanel.tsx
index 83c9528e..26964736 100644
--- a/packages/web/src/app/[domain]/browse/[...path]/components/pureTreePreviewPanel.tsx
+++ b/packages/web/src/app/[domain]/browse/[...path]/components/pureTreePreviewPanel.tsx
@@ -1,12 +1,12 @@
'use client';
import { useRef } from "react";
-import { FileTreeItem } from "@/features/fileTree/actions";
import { FileTreeItemComponent } from "@/features/fileTree/components/fileTreeItemComponent";
import { getBrowsePath } from "../../hooks/utils";
import { ScrollArea } from "@/components/ui/scroll-area";
import { useBrowseParams } from "../../hooks/useBrowseParams";
import { useDomain } from "@/hooks/useDomain";
+import { FileTreeItem } from "@/features/fileTree/types";
interface PureTreePreviewPanelProps {
items: FileTreeItem[];
diff --git a/packages/web/src/app/[domain]/browse/[...path]/components/treePreviewPanel.tsx b/packages/web/src/app/[domain]/browse/[...path]/components/treePreviewPanel.tsx
index 4a0c3857..8d6b335c 100644
--- a/packages/web/src/app/[domain]/browse/[...path]/components/treePreviewPanel.tsx
+++ b/packages/web/src/app/[domain]/browse/[...path]/components/treePreviewPanel.tsx
@@ -2,7 +2,7 @@
import { Separator } from "@/components/ui/separator";
import { getRepoInfoByName } from "@/actions";
import { PathHeader } from "@/app/[domain]/components/pathHeader";
-import { getFolderContents } from "@/features/fileTree/actions";
+import { getFolderContents } from "@/features/fileTree/api";
import { isServiceError } from "@/lib/utils";
import { PureTreePreviewPanel } from "./pureTreePreviewPanel";
diff --git a/packages/web/src/app/[domain]/browse/components/fileSearchCommandDialog.tsx b/packages/web/src/app/[domain]/browse/components/fileSearchCommandDialog.tsx
index 0cfe720a..dd1014d1 100644
--- a/packages/web/src/app/[domain]/browse/components/fileSearchCommandDialog.tsx
+++ b/packages/web/src/app/[domain]/browse/components/fileSearchCommandDialog.tsx
@@ -5,7 +5,6 @@ import { useState, useRef, useMemo, useEffect, useCallback } from "react";
import { useHotkeys } from "react-hotkeys-hook";
import { useQuery } from "@tanstack/react-query";
import { unwrapServiceError } from "@/lib/utils";
-import { FileTreeItem, getFiles } from "@/features/fileTree/actions";
import { Dialog, DialogContent, DialogDescription, DialogTitle } from "@/components/ui/dialog";
import { useBrowseNavigation } from "../hooks/useBrowseNavigation";
import { useBrowseState } from "../hooks/useBrowseState";
@@ -13,6 +12,8 @@ import { useBrowseParams } from "../hooks/useBrowseParams";
import { FileTreeItemIcon } from "@/features/fileTree/components/fileTreeItemIcon";
import { useLocalStorage } from "usehooks-ts";
import { Skeleton } from "@/components/ui/skeleton";
+import { FileTreeItem } from "@/features/fileTree/types";
+import { getFiles } from "@/app/api/(client)/client";
const MAX_RESULTS = 100;
diff --git a/packages/web/src/app/[domain]/components/searchBar/useSuggestionsData.ts b/packages/web/src/app/[domain]/components/searchBar/useSuggestionsData.ts
index 8c48e9d1..69e1040c 100644
--- a/packages/web/src/app/[domain]/components/searchBar/useSuggestionsData.ts
+++ b/packages/web/src/app/[domain]/components/searchBar/useSuggestionsData.ts
@@ -55,7 +55,7 @@ export const useSuggestionsData = ({
query: `file:${suggestionQuery}`,
matches: 15,
contextLines: 1,
- }, domain),
+ }),
select: (data): Suggestion[] => {
if (isServiceError(data)) {
return [];
@@ -75,7 +75,7 @@ export const useSuggestionsData = ({
query: `sym:${suggestionQuery.length > 0 ? suggestionQuery : ".*"}`,
matches: 15,
contextLines: 1,
- }, domain),
+ }),
select: (data): Suggestion[] => {
if (isServiceError(data)) {
return [];
diff --git a/packages/web/src/app/[domain]/repos/[id]/page.tsx b/packages/web/src/app/[domain]/repos/[id]/page.tsx
index 8986f7f6..0c2ddfa1 100644
--- a/packages/web/src/app/[domain]/repos/[id]/page.tsx
+++ b/packages/web/src/app/[domain]/repos/[id]/page.tsx
@@ -1,4 +1,4 @@
-import { sew } from "@/actions"
+import { getCurrentUserRole, sew } from "@/actions"
import { Badge } from "@/components/ui/badge"
import { Button } from "@/components/ui/button"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
@@ -19,6 +19,7 @@ import { BackButton } from "../../components/backButton"
import { DisplayDate } from "../../components/DisplayDate"
import { RepoBranchesTable } from "../components/repoBranchesTable"
import { RepoJobsTable } from "../components/repoJobsTable"
+import { OrgRole } from "@sourcebot/db"
export default async function RepoDetailPage({ params }: { params: Promise<{ id: string }> }) {
const { id } = await params
@@ -51,6 +52,11 @@ export default async function RepoDetailPage({ params }: { params: Promise<{ id:
const repoMetadata = repoMetadataSchema.parse(repo.metadata);
+ const userRole = await getCurrentUserRole(SINGLE_TENANT_ORG_DOMAIN);
+ if (isServiceError(userRole)) {
+ throw new ServiceErrorException(userRole);
+ }
+
return (
<>