Files
pr-agent/pr_agent/servers/github_app.py
zmeir 65bb70a1dd Added support for automatic review on push event
The new feature can be enabled via the new configuration `github_app.handle_push_event`. To avoid any unwanted side-effects, the current default of this configuration is set to `false`.

The high level flow (assuming the configuration is enabled):
1. receive push event from GitHub
2. extract branch and commits from event
3. find PR url for branch (currently does not support PRs from forks)
4. perform configured commands (e.g. `/describe`, `/review -i`)

The push event flow is guarded by a backlog queue so that multiple push events on the same branch won't trigger multiple duplicate runs of the PR-Agent commands.
Example timeline:
1. push 1 - start handling event
2. push 2 - waiting to be handled while push 1 event is still running
3. push 3 - event is dropped since handling it and handling push 2 is the same, so it is redundant
4. push 1 finished being handled
5. push 2 awakens from wait and continues handling (potentially reviewing the commits of both push 2 and push 3)

All of these options are configurable and can be enabled/disabled as per the user's desire.

Additional minor changes in this PR:
1. Created `DefaultDictWithTimeout` utility class to avoid too much boilerplate code in managing caches for outdated triggers.
2. Guard against running increment review when there are no new commits.
3. Minor styling changes for incremented review text.
2023-10-25 11:15:23 +03:00

283 lines
13 KiB
Python

import copy
import os
import asyncio.locks
from typing import Any, Dict, List
import uvicorn
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
from starlette.middleware import Middleware
from starlette_context import context
from starlette_context.middleware import RawContextMiddleware
from pr_agent.agent.pr_agent import PRAgent
from pr_agent.algo.utils import update_settings_from_args
from pr_agent.config_loader import get_settings, global_settings
from pr_agent.git_providers import get_git_provider
from pr_agent.git_providers.utils import apply_repo_settings
from pr_agent.git_providers.git_provider import IncrementalPR
from pr_agent.log import LoggingFormat, get_logger, setup_logger
from pr_agent.servers.utils import verify_signature, DefaultDictWithTimeout
setup_logger(fmt=LoggingFormat.JSON)
router = APIRouter()
@router.post("/api/v1/github_webhooks")
async def handle_github_webhooks(request: Request, response: Response):
"""
Receives and processes incoming GitHub webhook requests.
Verifies the request signature, parses the request body, and passes it to the handle_request function for further
processing.
"""
get_logger().debug("Received a GitHub webhook")
body = await get_body(request)
get_logger().debug(f'Request body:\n{body}')
installation_id = body.get("installation", {}).get("id")
context["installation_id"] = installation_id
context["settings"] = copy.deepcopy(global_settings)
response = await handle_request(body, event=request.headers.get("X-GitHub-Event", None))
return response or {}
@router.post("/api/v1/marketplace_webhooks")
async def handle_marketplace_webhooks(request: Request, response: Response):
body = await get_body(request)
get_logger().info(f'Request body:\n{body}')
async def get_body(request):
try:
body = await request.json()
except Exception as e:
get_logger().error("Error parsing request body", e)
raise HTTPException(status_code=400, detail="Error parsing request body") from e
webhook_secret = getattr(get_settings().github, 'webhook_secret', None)
if webhook_secret:
body_bytes = await request.body()
signature_header = request.headers.get('x-hub-signature-256', None)
verify_signature(body_bytes, webhook_secret, signature_header)
return body
_duplicate_requests_cache = DefaultDictWithTimeout(ttl=get_settings().github_app.duplicate_requests_cache_ttl)
_duplicate_branch_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_event_pending_triggers_ttl)
_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_event_pending_triggers_ttl)
async def handle_request(body: Dict[str, Any], event: str):
"""
Handle incoming GitHub webhook requests.
Args:
body: The request body.
event: The GitHub event type.
"""
action = body.get("action")
if not (action or event == "push"):
return {}
agent = PRAgent()
bot_user = get_settings().github_app.bot_user
sender = body.get("sender", {}).get("login")
log_context = {"action": action, "event": event, "sender": sender, "server_type": "github_app"}
if get_settings().github_app.duplicate_requests_cache and _is_duplicate_request(body):
return {}
# handle all sorts of comment events (e.g. issue_comment)
if action == 'created':
if "comment" not in body:
return {}
comment_body = body.get("comment", {}).get("body")
if sender and bot_user in sender:
get_logger().info(f"Ignoring comment from {bot_user} user")
return {}
get_logger().info(f"Processing comment from {sender} user")
if "issue" in body and "pull_request" in body["issue"] and "url" in body["issue"]["pull_request"]:
api_url = body["issue"]["pull_request"]["url"]
elif "comment" in body and "pull_request_url" in body["comment"]:
api_url = body["comment"]["pull_request_url"]
else:
return {}
log_context["api_url"] = api_url
get_logger().info(body)
get_logger().info(f"Handling comment because of event={event} and action={action}")
comment_id = body.get("comment", {}).get("id")
provider = get_git_provider()(pr_url=api_url)
with get_logger().contextualize(**log_context):
await agent.handle_request(api_url, comment_body, notify=lambda: provider.add_eyes_reaction(comment_id))
# handle pull_request event:
# automatically review opened/reopened/ready_for_review PRs as long as they're not in draft,
# as well as direct review requests from the bot
elif event == 'pull_request':
pull_request = body.get("pull_request")
if not pull_request:
return {}
api_url = pull_request.get("url")
if not api_url:
return {}
log_context["api_url"] = api_url
if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user:
return {}
if action in get_settings().github_app.handle_pr_actions:
if action == "review_requested":
if body.get("requested_reviewer", {}).get("login", "") != bot_user:
return {}
if pull_request.get("created_at") == pull_request.get("updated_at"):
# avoid double reviews when opening a PR for the first time
return {}
get_logger().info(f"Performing review because of event={event} and action={action}")
await _perform_commands(get_settings().github_app.pr_commands, agent, body, api_url, log_context)
# handle push event for new commits
elif event == "push" and get_settings().github_app.handle_push_event:
get_logger().debug(f"[PUSH] {body=}")
# get the branch name
ref = body.get("ref")
get_logger().debug(f"[PUSH] {ref=}")
if not (ref and ref.startswith("refs/heads/")):
return {}
branch = ref.removeprefix("refs/heads/")
get_logger().debug(f"[PUSH] {branch=}")
# skip first push (PR will follow)
if body.get("created"):
get_logger().debug("[PUSH] skipping first push")
return {}
# skip if no relevant commits (e.g. no commits at all, only bot/merge commits)
commits = body.get("commits", [])
get_logger().debug(f"[PUSH] {len(commits)} {commits=}")
bot_commits = [commit for commit in commits if commit.get("author", {}).get("username", "") == bot_user]
get_logger().debug(f"[PUSH] {len(bot_commits)} {bot_commits=}")
merge_commits = [commit for commit in commits if commit.get("message", "").startswith("Merge branch ")]
get_logger().debug(f"[PUSH] {len(merge_commits)} {merge_commits=}")
commit_ids_to_ignore = set()
if get_settings().github_app.push_event_ignore_bot_commits:
commit_ids_to_ignore.update({commit.get("id") for commit in bot_commits})
if get_settings().github_app.push_event_ignore_bot_commits:
commit_ids_to_ignore.update({commit.get("id") for commit in merge_commits})
commits = [commit for commit in commits if commit.get("id") not in commit_ids_to_ignore]
if not commits:
return {}
# TODO: consider adding some custom prompt to instruct the PR-Agent how to address bot commits
# Prevent triggering multiple times for subsequent push events when one is enough:
# The first event will trigger the processing, and if there's a second event in the meanwhile it will wait.
# Any more events will be discarded, because they will all trigger the exact same processing on the PR.
# We let the second event wait instead of discarding it because while the first event was being processed,
# more commits may have been pushed that led to the subsequent push events,
# so we keep just one waiting as a delegate to trigger the processing for the new commits when done waiting.
current_active_tasks = _duplicate_branch_push_triggers.setdefault(branch, 0)
max_active_tasks = 2 if get_settings().github_app.push_event_pending_triggers_backlog else 1
if current_active_tasks < max_active_tasks:
# first and second tasks can enter
get_logger().info(
f"Continue processing push event for {branch=} because there are {current_active_tasks} active tasks"
)
_duplicate_branch_push_triggers[branch] += 1
else:
get_logger().info(
f"Skipping push event for {branch=} because another event already triggered the same processing"
)
return {}
async with _pending_task_duplicate_push_conditions[branch]:
if current_active_tasks == 1:
# second task waits
get_logger().info(
f"Waiting to process push event for {branch=} because the first task is still in progress"
)
await _pending_task_duplicate_push_conditions[branch].wait()
get_logger().info(f"Finished waiting to process push event for {branch=} - continue with flow")
try:
# get PR info for branch
provider = get_git_provider()()
provider.repo, _ = provider._parse_pr_url(body["repository"]["html_url"].rstrip("/") + "/pull/1")
get_logger().debug(f"[PUSH] {provider.repo=}")
github_repo = provider._get_repo()
default_branch = body["repository"]["default_branch"]
get_logger().debug(f"[PUSH] {default_branch=}")
org = body["repository"]["organization"]
get_logger().debug(f"[PUSH] {org=}")
pull_requests = list(github_repo.get_pulls(state="open", base=default_branch, head=":".join((org, branch))))
get_logger().debug(f"[PUSH] {pull_requests=}")
if not pull_requests:
return {}
pull_request = pull_requests[0].raw_data
# check that the PR is valid to run the agent like in the pull_request event
if not pull_request:
return {}
api_url = pull_request.get("url")
get_logger().debug(f"[PUSH] {api_url=}")
if not api_url:
return {}
log_context["api_url"] = api_url
if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user:
return {}
if get_settings().github_app.push_event_wait_for_initial_review and not get_git_provider()(api_url, incremental=IncrementalPR(True)).previous_review:
get_logger().info(f"Skipping incremental review because there was no initial review for PR={api_url} yet")
return {}
get_logger().info(f"Performing incremental review because of event={event} and branch={branch} with PR={api_url}")
await _perform_commands(get_settings().github_app.push_commands, agent, body, api_url, log_context)
finally:
# release the waiting task block
async with _pending_task_duplicate_push_conditions[branch]:
_pending_task_duplicate_push_conditions[branch].notify(1)
_duplicate_branch_push_triggers[branch] -= 1
get_logger().info("event or action does not require handling")
return {}
async def _perform_commands(commands: List[str], agent: PRAgent, body: dict, api_url: str, log_context: dict):
apply_repo_settings(api_url)
for command in commands:
split_command = command.split(" ")
command = split_command[0]
args = split_command[1:]
other_args = update_settings_from_args(args)
new_command = ' '.join([command] + other_args)
get_logger().info(body)
get_logger().info(f"Performing command: {new_command}")
with get_logger().contextualize(**log_context):
await agent.handle_request(api_url, new_command)
def _is_duplicate_request(body: Dict[str, Any]) -> bool:
"""
In some deployments its possible to get duplicate requests if the handling is long,
This function checks if the request is duplicate and if so - ignores it.
"""
request_hash = hash(str(body))
get_logger().info(f"request_hash: {request_hash}")
is_duplicate = _duplicate_requests_cache.get(request_hash, False)
_duplicate_requests_cache[request_hash] = True
if is_duplicate:
get_logger().info(f"Ignoring duplicate request {request_hash}")
return is_duplicate
@router.get("/")
async def root():
return {"status": "ok"}
def start():
if get_settings().github_app.override_deployment_type:
# Override the deployment type to app
get_settings().set("GITHUB.DEPLOYMENT_TYPE", "app")
get_settings().set("CONFIG.PUBLISH_OUTPUT_PROGRESS", False)
middleware = [Middleware(RawContextMiddleware)]
app = FastAPI(middleware=middleware)
app.include_router(router)
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "3000")))
if __name__ == '__main__':
start()