From cf2b95b7663346fa0492d6d43512b32b4db1488c Mon Sep 17 00:00:00 2001 From: Pinyoo Thotaboot Date: Fri, 16 May 2025 16:30:50 +0700 Subject: [PATCH] Create webhook server implement for --- pr_agent/servers/gitea_app.py | 120 ++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 pr_agent/servers/gitea_app.py diff --git a/pr_agent/servers/gitea_app.py b/pr_agent/servers/gitea_app.py new file mode 100644 index 00000000..4df8b84c --- /dev/null +++ b/pr_agent/servers/gitea_app.py @@ -0,0 +1,120 @@ +import asyncio +import copy +import os +from typing import Any, Dict + +from fastapi import APIRouter, FastAPI, HTTPException, Request, Response +from starlette.background import BackgroundTasks +from starlette.middleware import Middleware +from starlette_context import context +from starlette_context.middleware import RawContextMiddleware + +from pr_agent.agent.pr_agent import PRAgent +from pr_agent.config_loader import get_settings, global_settings +from pr_agent.log import LoggingFormat, get_logger, setup_logger +from pr_agent.servers.utils import verify_signature + +# Setup logging and router +setup_logger(fmt=LoggingFormat.JSON, level=get_settings().get("CONFIG.LOG_LEVEL", "DEBUG")) +router = APIRouter() + +@router.post("/api/v1/gitea_webhooks") +async def handle_gitea_webhooks(background_tasks: BackgroundTasks, request: Request, response: Response): + """Handle incoming Gitea webhook requests""" + get_logger().debug("Received a Gitea webhook") + + body = await get_body(request) + + # Set context for the request + context["settings"] = copy.deepcopy(global_settings) + context["git_provider"] = {} + + # Handle the webhook in background + background_tasks.add_task(handle_request, body, event=request.headers.get("X-Gitea-Event", None)) + return {} + +async def get_body(request: Request): + """Parse and verify webhook request body""" + try: + body = await request.json() + except Exception as e: + get_logger().error("Error parsing request body", artifact={'error': e}) + raise HTTPException(status_code=400, detail="Error parsing request body") from e + + + # Verify webhook signature + webhook_secret = getattr(get_settings().gitea, 'webhook_secret', None) + if webhook_secret: + body_bytes = await request.body() + signature_header = request.headers.get('x-gitea-signature', None) + verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}") + + return body + +async def handle_request(body: Dict[str, Any], event: str): + """Process Gitea webhook events""" + action = body.get("action") + if not action: + get_logger().debug("No action found in request body") + return {} + + agent = PRAgent() + + # Handle different event types + if event == "pull_request": + if action in ["opened", "reopened", "synchronized"]: + await handle_pr_event(body, event, action, agent) + elif event == "issue_comment": + if action == "created": + await handle_comment_event(body, event, action, agent) + + return {} + +async def handle_pr_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent): + """Handle pull request events""" + pr = body.get("pull_request", {}) + if not pr: + return + + api_url = pr.get("url") + if not api_url: + return + + # Handle PR based on action + if action in ["opened", "reopened"]: + commands = get_settings().get("gitea.pr_commands", []) + for command in commands: + await agent.handle_request(api_url, command) + elif action == "synchronized": + # Handle push to PR + await agent.handle_request(api_url, "/review --incremental") + +async def handle_comment_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent): + """Handle comment events""" + comment = body.get("comment", {}) + if not comment: + return + + comment_body = comment.get("body", "") + if not comment_body or not comment_body.startswith("/"): + return + + pr_url = body.get("pull_request", {}).get("url") + if not pr_url: + return + + await agent.handle_request(pr_url, comment_body) + +# FastAPI app setup +middleware = [Middleware(RawContextMiddleware)] +app = FastAPI(middleware=middleware) +app.include_router(router) + +def start(): + """Start the Gitea webhook server""" + port = int(os.environ.get("PORT", "3000")) + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=port) + +if __name__ == "__main__": + start()