mirror of
https://github.com/qodo-ai/pr-agent.git
synced 2025-07-04 12:50:38 +08:00
Create webhook server implement for
This commit is contained in:
120
pr_agent/servers/gitea_app.py
Normal file
120
pr_agent/servers/gitea_app.py
Normal file
@ -0,0 +1,120 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import os
|
||||
from typing import Any, Dict
|
||||
|
||||
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
|
||||
from starlette.background import BackgroundTasks
|
||||
from starlette.middleware import Middleware
|
||||
from starlette_context import context
|
||||
from starlette_context.middleware import RawContextMiddleware
|
||||
|
||||
from pr_agent.agent.pr_agent import PRAgent
|
||||
from pr_agent.config_loader import get_settings, global_settings
|
||||
from pr_agent.log import LoggingFormat, get_logger, setup_logger
|
||||
from pr_agent.servers.utils import verify_signature
|
||||
|
||||
# Setup logging and router
|
||||
setup_logger(fmt=LoggingFormat.JSON, level=get_settings().get("CONFIG.LOG_LEVEL", "DEBUG"))
|
||||
router = APIRouter()
|
||||
|
||||
@router.post("/api/v1/gitea_webhooks")
|
||||
async def handle_gitea_webhooks(background_tasks: BackgroundTasks, request: Request, response: Response):
|
||||
"""Handle incoming Gitea webhook requests"""
|
||||
get_logger().debug("Received a Gitea webhook")
|
||||
|
||||
body = await get_body(request)
|
||||
|
||||
# Set context for the request
|
||||
context["settings"] = copy.deepcopy(global_settings)
|
||||
context["git_provider"] = {}
|
||||
|
||||
# Handle the webhook in background
|
||||
background_tasks.add_task(handle_request, body, event=request.headers.get("X-Gitea-Event", None))
|
||||
return {}
|
||||
|
||||
async def get_body(request: Request):
|
||||
"""Parse and verify webhook request body"""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception as e:
|
||||
get_logger().error("Error parsing request body", artifact={'error': e})
|
||||
raise HTTPException(status_code=400, detail="Error parsing request body") from e
|
||||
|
||||
|
||||
# Verify webhook signature
|
||||
webhook_secret = getattr(get_settings().gitea, 'webhook_secret', None)
|
||||
if webhook_secret:
|
||||
body_bytes = await request.body()
|
||||
signature_header = request.headers.get('x-gitea-signature', None)
|
||||
verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}")
|
||||
|
||||
return body
|
||||
|
||||
async def handle_request(body: Dict[str, Any], event: str):
|
||||
"""Process Gitea webhook events"""
|
||||
action = body.get("action")
|
||||
if not action:
|
||||
get_logger().debug("No action found in request body")
|
||||
return {}
|
||||
|
||||
agent = PRAgent()
|
||||
|
||||
# Handle different event types
|
||||
if event == "pull_request":
|
||||
if action in ["opened", "reopened", "synchronized"]:
|
||||
await handle_pr_event(body, event, action, agent)
|
||||
elif event == "issue_comment":
|
||||
if action == "created":
|
||||
await handle_comment_event(body, event, action, agent)
|
||||
|
||||
return {}
|
||||
|
||||
async def handle_pr_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
|
||||
"""Handle pull request events"""
|
||||
pr = body.get("pull_request", {})
|
||||
if not pr:
|
||||
return
|
||||
|
||||
api_url = pr.get("url")
|
||||
if not api_url:
|
||||
return
|
||||
|
||||
# Handle PR based on action
|
||||
if action in ["opened", "reopened"]:
|
||||
commands = get_settings().get("gitea.pr_commands", [])
|
||||
for command in commands:
|
||||
await agent.handle_request(api_url, command)
|
||||
elif action == "synchronized":
|
||||
# Handle push to PR
|
||||
await agent.handle_request(api_url, "/review --incremental")
|
||||
|
||||
async def handle_comment_event(body: Dict[str, Any], event: str, action: str, agent: PRAgent):
|
||||
"""Handle comment events"""
|
||||
comment = body.get("comment", {})
|
||||
if not comment:
|
||||
return
|
||||
|
||||
comment_body = comment.get("body", "")
|
||||
if not comment_body or not comment_body.startswith("/"):
|
||||
return
|
||||
|
||||
pr_url = body.get("pull_request", {}).get("url")
|
||||
if not pr_url:
|
||||
return
|
||||
|
||||
await agent.handle_request(pr_url, comment_body)
|
||||
|
||||
# FastAPI app setup
|
||||
middleware = [Middleware(RawContextMiddleware)]
|
||||
app = FastAPI(middleware=middleware)
|
||||
app.include_router(router)
|
||||
|
||||
def start():
|
||||
"""Start the Gitea webhook server"""
|
||||
port = int(os.environ.get("PORT", "3000"))
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=port)
|
||||
|
||||
if __name__ == "__main__":
|
||||
start()
|
Reference in New Issue
Block a user