Files
pr-agent/pr_agent/servers/gitea_app.py
2025-05-26 11:31:40 +07:00

129 lines
4.2 KiB
Python

import asyncio
import copy
import os
from typing import Any, Dict
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
from starlette.background import BackgroundTasks
from starlette.middleware import Middleware
from starlette_context import context
from starlette_context.middleware import RawContextMiddleware
from pr_agent.agent.pr_agent import PRAgent
from pr_agent.config_loader import get_settings, global_settings
from pr_agent.log import LoggingFormat, get_logger, setup_logger
from pr_agent.servers.utils import verify_signature
# Setup logging and router
setup_logger(fmt=LoggingFormat.JSON, level=get_settings().get("CONFIG.LOG_LEVEL", "DEBUG"))
router = APIRouter()
@router.post("/api/v1/gitea_webhooks")
async def handle_gitea_webhooks(background_tasks: BackgroundTasks, request: Request, response: Response):
"""Handle incoming Gitea webhook requests"""
get_logger().debug("Received a Gitea webhook")
body = await get_body(request)
# Set context for the request
context["settings"] = copy.deepcopy(global_settings)
context["git_provider"] = {}
# Handle the webhook in background
background_tasks.add_task(handle_request, body, event=request.headers.get("X-Gitea-Event", None))
return {}
async def get_body(request: Request):
"""Parse and verify webhook request body"""
try:
body = await request.json()
except Exception as e:
get_logger().error("Error parsing request body", artifact={'error': e})
raise HTTPException(status_code=400, detail="Error parsing request body") from e
# Verify webhook signature
webhook_secret = getattr(get_settings().gitea, 'webhook_secret', None)
if webhook_secret:
body_bytes = await request.body()
signature_header = request.headers.get('x-gitea-signature', None)
if not signature_header:
get_logger().error("Missing signature header")
raise HTTPException(status_code=400, detail="Missing signature header")
try:
verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}")
except Exception as ex:
get_logger().error(f"Invalid signature: {ex}")
raise HTTPException(status_code=401, detail="Invalid signature")
return body
async def handle_request(body: Dict[str, Any], event: str):
"""Process Gitea webhook events"""
action = body.get("action")
if not action:
get_logger().debug("No action found in request body")
return {}
agent = PRAgent()
# Handle different event types
if event == "pull_request":
if action in ["opened", "reopened", "synchronized"]:
await handle_pr_event(body, event, action, agent)
elif event == "issue_comment":
if action == "created":
await handle_comment_event(body, event, action, agent)
return {}
async def handle_pr_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
"""Handle pull request events"""
pr = body.get("pull_request", {})
if not pr:
return
api_url = pr.get("url")
if not api_url:
return
# Handle PR based on action
if action in ["opened", "reopened"]:
commands = get_settings().get("gitea.pr_commands", [])
for command in commands:
await agent.handle_request(api_url, command)
elif action == "synchronized":
# Handle push to PR
await agent.handle_request(api_url, "/review --incremental")
async def handle_comment_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
"""Handle comment events"""
comment = body.get("comment", {})
if not comment:
return
comment_body = comment.get("body", "")
if not comment_body or not comment_body.startswith("/"):
return
pr_url = body.get("pull_request", {}).get("url")
if not pr_url:
return
await agent.handle_request(pr_url, comment_body)
# FastAPI app setup
middleware = [Middleware(RawContextMiddleware)]
app = FastAPI(middleware=middleware)
app.include_router(router)
def start():
"""Start the Gitea webhook server"""
port = int(os.environ.get("PORT", "3000"))
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
start()