pr-agent/pr_agent/servers/gitea_app.py
2025-10-19 16:00:01 +02:00

221 lines
9.4 KiB
Python

import copy
import os
import re
from typing import Any, Dict
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
from starlette.background import BackgroundTasks
from starlette.middleware import Middleware
from starlette_context import context
from starlette_context.middleware import RawContextMiddleware
from pr_agent.agent.pr_agent import PRAgent
from pr_agent.algo.utils import update_settings_from_args
from pr_agent.config_loader import get_settings, global_settings
from pr_agent.git_providers.utils import apply_repo_settings
from pr_agent.log import LoggingFormat, get_logger, setup_logger
from pr_agent.servers.utils import verify_signature
# Setup logging and router
setup_logger(fmt=LoggingFormat.JSON, level=get_settings().get("CONFIG.LOG_LEVEL", "DEBUG"))
router = APIRouter()
@router.post("/api/v1/gitea_webhooks")
async def handle_gitea_webhooks(background_tasks: BackgroundTasks, request: Request, response: Response):
"""Handle incoming Gitea webhook requests"""
get_logger().debug("Received a Gitea webhook")
body = await get_body(request)
# Set context for the request
context["settings"] = copy.deepcopy(global_settings)
context["git_provider"] = {}
# Handle the webhook in background
background_tasks.add_task(handle_request, body, event=request.headers.get("X-Gitea-Event", None))
return {}
async def get_body(request: Request):
"""Parse and verify webhook request body"""
try:
body = await request.json()
except Exception as e:
get_logger().error("Error parsing request body", artifact={'error': e})
raise HTTPException(status_code=400, detail="Error parsing request body") from e
# Verify webhook signature
webhook_secret = getattr(get_settings().gitea, 'webhook_secret', None)
if webhook_secret:
body_bytes = await request.body()
signature_header = request.headers.get('x-gitea-signature', None)
if not signature_header:
get_logger().error("Missing signature header")
raise HTTPException(status_code=400, detail="Missing signature header")
try:
verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}")
except Exception as ex:
get_logger().error(f"Invalid signature: {ex}")
raise HTTPException(status_code=401, detail="Invalid signature")
return body
async def handle_request(body: Dict[str, Any], event: str):
"""Process Gitea webhook events"""
action = body.get("action")
if not action:
get_logger().debug("No action found in request body")
return {}
agent = PRAgent()
# Handle different event types
if event == "pull_request":
if not should_process_pr_logic(body):
get_logger().debug(f"Request ignored: PR logic filtering")
return {}
if action in ["opened", "reopened", "synchronized"]:
await handle_pr_event(body, event, action, agent)
elif event == "issue_comment":
if action == "created":
await handle_comment_event(body, event, action, agent)
return {}
async def handle_pr_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
"""Handle pull request events"""
pr = body.get("pull_request", {})
if not pr:
return
api_url = pr.get("url")
if not api_url:
return
# Handle PR based on action
if action in ["opened", "reopened"]:
# commands = get_settings().get("gitea.pr_commands", [])
await _perform_commands_gitea("pr_commands", agent, body, api_url)
# for command in commands:
# await agent.handle_request(api_url, command)
elif action == "synchronized":
# Handle push to PR
commands_on_push = get_settings().get(f"gitea.push_commands", {})
handle_push_trigger = get_settings().get(f"gitea.handle_push_trigger", False)
if not commands_on_push or not handle_push_trigger:
get_logger().info("Push event, but no push commands found or push trigger is disabled")
return
get_logger().debug(f'A push event has been received: {api_url}')
await _perform_commands_gitea("push_commands", agent, body, api_url)
# for command in commands_on_push:
# await agent.handle_request(api_url, command)
async def handle_comment_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
"""Handle comment events"""
comment = body.get("comment", {})
if not comment:
return
comment_body = comment.get("body", "")
if not comment_body or not comment_body.startswith("/"):
return
pr_url = body.get("pull_request", {}).get("url")
if not pr_url:
return
await agent.handle_request(pr_url, comment_body)
async def _perform_commands_gitea(commands_conf: str, agent: PRAgent, body: dict, api_url: str):
apply_repo_settings(api_url)
if commands_conf == "pr_commands" and get_settings().config.disable_auto_feedback: # auto commands for PR, and auto feedback is disabled
get_logger().info(f"Auto feedback is disabled, skipping auto commands for PR {api_url=}")
return
if not should_process_pr_logic(body): # Here we already updated the configuration with the repo settings
return {}
commands = get_settings().get(f"gitea.{commands_conf}")
if not commands:
get_logger().info(f"New PR, but no auto commands configured")
return
get_settings().set("config.is_auto_command", True)
for command in commands:
split_command = command.split(" ")
command = split_command[0]
args = split_command[1:]
other_args = update_settings_from_args(args)
new_command = ' '.join([command] + other_args)
get_logger().info(f"{commands_conf}. Performing auto command '{new_command}', for {api_url=}")
await agent.handle_request(api_url, new_command)
def should_process_pr_logic(body) -> bool:
try:
pull_request = body.get("pull_request", {})
title = pull_request.get("title", "")
pr_labels = pull_request.get("labels", [])
source_branch = pull_request.get("head", {}).get("ref", "")
target_branch = pull_request.get("base", {}).get("ref", "")
sender = body.get("sender", {}).get("login")
repo_full_name = body.get("repository", {}).get("full_name", "")
# logic to ignore PRs from specific repositories
ignore_repos = get_settings().get("CONFIG.IGNORE_REPOSITORIES", [])
if ignore_repos and repo_full_name:
if any(re.search(regex, repo_full_name) for regex in ignore_repos):
get_logger().info(f"Ignoring PR from repository '{repo_full_name}' due to 'config.ignore_repositories' setting")
return False
# logic to ignore PRs from specific users
ignore_pr_users = get_settings().get("CONFIG.IGNORE_PR_AUTHORS", [])
if ignore_pr_users and sender:
if any(re.search(regex, sender) for regex in ignore_pr_users):
get_logger().info(f"Ignoring PR from user '{sender}' due to 'config.ignore_pr_authors' setting")
return False
# logic to ignore PRs with specific titles
if title:
ignore_pr_title_re = get_settings().get("CONFIG.IGNORE_PR_TITLE", [])
if not isinstance(ignore_pr_title_re, list):
ignore_pr_title_re = [ignore_pr_title_re]
if ignore_pr_title_re and any(re.search(regex, title) for regex in ignore_pr_title_re):
get_logger().info(f"Ignoring PR with title '{title}' due to config.ignore_pr_title setting")
return False
# logic to ignore PRs with specific labels or source branches or target branches.
ignore_pr_labels = get_settings().get("CONFIG.IGNORE_PR_LABELS", [])
if pr_labels and ignore_pr_labels:
labels = [label['name'] for label in pr_labels]
if any(label in ignore_pr_labels for label in labels):
labels_str = ", ".join(labels)
get_logger().info(f"Ignoring PR with labels '{labels_str}' due to config.ignore_pr_labels settings")
return False
# logic to ignore PRs with specific source or target branches
ignore_pr_source_branches = get_settings().get("CONFIG.IGNORE_PR_SOURCE_BRANCHES", [])
ignore_pr_target_branches = get_settings().get("CONFIG.IGNORE_PR_TARGET_BRANCHES", [])
if pull_request and (ignore_pr_source_branches or ignore_pr_target_branches):
if any(re.search(regex, source_branch) for regex in ignore_pr_source_branches):
get_logger().info(
f"Ignoring PR with source branch '{source_branch}' due to config.ignore_pr_source_branches settings")
return False
if any(re.search(regex, target_branch) for regex in ignore_pr_target_branches):
get_logger().info(
f"Ignoring PR with target branch '{target_branch}' due to config.ignore_pr_target_branches settings")
return False
except Exception as e:
get_logger().error(f"Failed 'should_process_pr_logic': {e}")
return True
# FastAPI app setup
middleware = [Middleware(RawContextMiddleware)]
app = FastAPI(middleware=middleware)
app.include_router(router)
def start():
"""Start the Gitea webhook server"""
port = int(os.environ.get("PORT", "3000"))
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
start()