mirror of
https://github.com/qodo-ai/pr-agent.git
synced 2025-12-12 02:45:18 +00:00
221 lines
9.4 KiB
Python
221 lines
9.4 KiB
Python
import copy
|
|
import os
|
|
import re
|
|
from typing import Any, Dict
|
|
|
|
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
|
|
from starlette.background import BackgroundTasks
|
|
from starlette.middleware import Middleware
|
|
from starlette_context import context
|
|
from starlette_context.middleware import RawContextMiddleware
|
|
|
|
from pr_agent.agent.pr_agent import PRAgent
|
|
from pr_agent.algo.utils import update_settings_from_args
|
|
from pr_agent.config_loader import get_settings, global_settings
|
|
from pr_agent.git_providers.utils import apply_repo_settings
|
|
from pr_agent.log import LoggingFormat, get_logger, setup_logger
|
|
from pr_agent.servers.utils import verify_signature
|
|
|
|
# Setup logging and router
|
|
setup_logger(fmt=LoggingFormat.JSON, level=get_settings().get("CONFIG.LOG_LEVEL", "DEBUG"))
|
|
router = APIRouter()
|
|
|
|
@router.post("/api/v1/gitea_webhooks")
|
|
async def handle_gitea_webhooks(background_tasks: BackgroundTasks, request: Request, response: Response):
|
|
"""Handle incoming Gitea webhook requests"""
|
|
get_logger().debug("Received a Gitea webhook")
|
|
|
|
body = await get_body(request)
|
|
|
|
# Set context for the request
|
|
context["settings"] = copy.deepcopy(global_settings)
|
|
context["git_provider"] = {}
|
|
|
|
# Handle the webhook in background
|
|
background_tasks.add_task(handle_request, body, event=request.headers.get("X-Gitea-Event", None))
|
|
return {}
|
|
|
|
async def get_body(request: Request):
|
|
"""Parse and verify webhook request body"""
|
|
try:
|
|
body = await request.json()
|
|
except Exception as e:
|
|
get_logger().error("Error parsing request body", artifact={'error': e})
|
|
raise HTTPException(status_code=400, detail="Error parsing request body") from e
|
|
|
|
|
|
# Verify webhook signature
|
|
webhook_secret = getattr(get_settings().gitea, 'webhook_secret', None)
|
|
if webhook_secret:
|
|
body_bytes = await request.body()
|
|
signature_header = request.headers.get('x-gitea-signature', None)
|
|
if not signature_header:
|
|
get_logger().error("Missing signature header")
|
|
raise HTTPException(status_code=400, detail="Missing signature header")
|
|
|
|
try:
|
|
verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}")
|
|
except Exception as ex:
|
|
get_logger().error(f"Invalid signature: {ex}")
|
|
raise HTTPException(status_code=401, detail="Invalid signature")
|
|
|
|
return body
|
|
|
|
async def handle_request(body: Dict[str, Any], event: str):
|
|
"""Process Gitea webhook events"""
|
|
action = body.get("action")
|
|
if not action:
|
|
get_logger().debug("No action found in request body")
|
|
return {}
|
|
|
|
agent = PRAgent()
|
|
|
|
# Handle different event types
|
|
if event == "pull_request":
|
|
if not should_process_pr_logic(body):
|
|
get_logger().debug(f"Request ignored: PR logic filtering")
|
|
return {}
|
|
if action in ["opened", "reopened", "synchronized"]:
|
|
await handle_pr_event(body, event, action, agent)
|
|
elif event == "issue_comment":
|
|
if action == "created":
|
|
await handle_comment_event(body, event, action, agent)
|
|
|
|
return {}
|
|
|
|
async def handle_pr_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
|
|
"""Handle pull request events"""
|
|
pr = body.get("pull_request", {})
|
|
if not pr:
|
|
return
|
|
|
|
api_url = pr.get("url")
|
|
if not api_url:
|
|
return
|
|
|
|
# Handle PR based on action
|
|
if action in ["opened", "reopened"]:
|
|
# commands = get_settings().get("gitea.pr_commands", [])
|
|
await _perform_commands_gitea("pr_commands", agent, body, api_url)
|
|
# for command in commands:
|
|
# await agent.handle_request(api_url, command)
|
|
elif action == "synchronized":
|
|
# Handle push to PR
|
|
commands_on_push = get_settings().get(f"gitea.push_commands", {})
|
|
handle_push_trigger = get_settings().get(f"gitea.handle_push_trigger", False)
|
|
if not commands_on_push or not handle_push_trigger:
|
|
get_logger().info("Push event, but no push commands found or push trigger is disabled")
|
|
return
|
|
get_logger().debug(f'A push event has been received: {api_url}')
|
|
await _perform_commands_gitea("push_commands", agent, body, api_url)
|
|
# for command in commands_on_push:
|
|
# await agent.handle_request(api_url, command)
|
|
|
|
async def handle_comment_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
|
|
"""Handle comment events"""
|
|
comment = body.get("comment", {})
|
|
if not comment:
|
|
return
|
|
|
|
comment_body = comment.get("body", "")
|
|
if not comment_body or not comment_body.startswith("/"):
|
|
return
|
|
|
|
pr_url = body.get("pull_request", {}).get("url")
|
|
if not pr_url:
|
|
return
|
|
|
|
await agent.handle_request(pr_url, comment_body)
|
|
|
|
async def _perform_commands_gitea(commands_conf: str, agent: PRAgent, body: dict, api_url: str):
|
|
apply_repo_settings(api_url)
|
|
if commands_conf == "pr_commands" and get_settings().config.disable_auto_feedback: # auto commands for PR, and auto feedback is disabled
|
|
get_logger().info(f"Auto feedback is disabled, skipping auto commands for PR {api_url=}")
|
|
return
|
|
if not should_process_pr_logic(body): # Here we already updated the configuration with the repo settings
|
|
return {}
|
|
commands = get_settings().get(f"gitea.{commands_conf}")
|
|
if not commands:
|
|
get_logger().info(f"New PR, but no auto commands configured")
|
|
return
|
|
get_settings().set("config.is_auto_command", True)
|
|
for command in commands:
|
|
split_command = command.split(" ")
|
|
command = split_command[0]
|
|
args = split_command[1:]
|
|
other_args = update_settings_from_args(args)
|
|
new_command = ' '.join([command] + other_args)
|
|
get_logger().info(f"{commands_conf}. Performing auto command '{new_command}', for {api_url=}")
|
|
await agent.handle_request(api_url, new_command)
|
|
|
|
def should_process_pr_logic(body) -> bool:
|
|
try:
|
|
pull_request = body.get("pull_request", {})
|
|
title = pull_request.get("title", "")
|
|
pr_labels = pull_request.get("labels", [])
|
|
source_branch = pull_request.get("head", {}).get("ref", "")
|
|
target_branch = pull_request.get("base", {}).get("ref", "")
|
|
sender = body.get("sender", {}).get("login")
|
|
repo_full_name = body.get("repository", {}).get("full_name", "")
|
|
|
|
# logic to ignore PRs from specific repositories
|
|
ignore_repos = get_settings().get("CONFIG.IGNORE_REPOSITORIES", [])
|
|
if ignore_repos and repo_full_name:
|
|
if any(re.search(regex, repo_full_name) for regex in ignore_repos):
|
|
get_logger().info(f"Ignoring PR from repository '{repo_full_name}' due to 'config.ignore_repositories' setting")
|
|
return False
|
|
|
|
# logic to ignore PRs from specific users
|
|
ignore_pr_users = get_settings().get("CONFIG.IGNORE_PR_AUTHORS", [])
|
|
if ignore_pr_users and sender:
|
|
if any(re.search(regex, sender) for regex in ignore_pr_users):
|
|
get_logger().info(f"Ignoring PR from user '{sender}' due to 'config.ignore_pr_authors' setting")
|
|
return False
|
|
|
|
# logic to ignore PRs with specific titles
|
|
if title:
|
|
ignore_pr_title_re = get_settings().get("CONFIG.IGNORE_PR_TITLE", [])
|
|
if not isinstance(ignore_pr_title_re, list):
|
|
ignore_pr_title_re = [ignore_pr_title_re]
|
|
if ignore_pr_title_re and any(re.search(regex, title) for regex in ignore_pr_title_re):
|
|
get_logger().info(f"Ignoring PR with title '{title}' due to config.ignore_pr_title setting")
|
|
return False
|
|
|
|
# logic to ignore PRs with specific labels or source branches or target branches.
|
|
ignore_pr_labels = get_settings().get("CONFIG.IGNORE_PR_LABELS", [])
|
|
if pr_labels and ignore_pr_labels:
|
|
labels = [label['name'] for label in pr_labels]
|
|
if any(label in ignore_pr_labels for label in labels):
|
|
labels_str = ", ".join(labels)
|
|
get_logger().info(f"Ignoring PR with labels '{labels_str}' due to config.ignore_pr_labels settings")
|
|
return False
|
|
|
|
# logic to ignore PRs with specific source or target branches
|
|
ignore_pr_source_branches = get_settings().get("CONFIG.IGNORE_PR_SOURCE_BRANCHES", [])
|
|
ignore_pr_target_branches = get_settings().get("CONFIG.IGNORE_PR_TARGET_BRANCHES", [])
|
|
if pull_request and (ignore_pr_source_branches or ignore_pr_target_branches):
|
|
if any(re.search(regex, source_branch) for regex in ignore_pr_source_branches):
|
|
get_logger().info(
|
|
f"Ignoring PR with source branch '{source_branch}' due to config.ignore_pr_source_branches settings")
|
|
return False
|
|
if any(re.search(regex, target_branch) for regex in ignore_pr_target_branches):
|
|
get_logger().info(
|
|
f"Ignoring PR with target branch '{target_branch}' due to config.ignore_pr_target_branches settings")
|
|
return False
|
|
except Exception as e:
|
|
get_logger().error(f"Failed 'should_process_pr_logic': {e}")
|
|
return True
|
|
|
|
# FastAPI app setup
|
|
middleware = [Middleware(RawContextMiddleware)]
|
|
app = FastAPI(middleware=middleware)
|
|
app.include_router(router)
|
|
|
|
def start():
|
|
"""Start the Gitea webhook server"""
|
|
port = int(os.environ.get("PORT", "3000"))
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=port)
|
|
|
|
if __name__ == "__main__":
|
|
start()
|