Refactor to use pull_request synchronize event

This commit is contained in:
zmeir
2023-10-26 14:45:57 +03:00
parent 02570ea797
commit 6541575a0e
2 changed files with 66 additions and 96 deletions

View File

@ -1,7 +1,7 @@
import copy
import os
import asyncio.locks
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple
import uvicorn
from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
@ -64,8 +64,8 @@ async def get_body(request):
_duplicate_requests_cache = DefaultDictWithTimeout(ttl=get_settings().github_app.duplicate_requests_cache_ttl)
_duplicate_branch_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_event_pending_triggers_ttl)
_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_event_pending_triggers_ttl)
_duplicate_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_trigger_pending_tasks_ttl)
_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_trigger_pending_tasks_ttl)
async def handle_request(body: Dict[str, Any], event: str):
@ -77,7 +77,7 @@ async def handle_request(body: Dict[str, Any], event: str):
event: The GitHub event type.
"""
action = body.get("action")
if not (action or event == "push"):
if not action:
return {}
agent = PRAgent()
bot_user = get_settings().github_app.bot_user
@ -113,126 +113,96 @@ async def handle_request(body: Dict[str, Any], event: str):
# handle pull_request event:
# automatically review opened/reopened/ready_for_review PRs as long as they're not in draft,
# as well as direct review requests from the bot
elif event == 'pull_request':
pull_request = body.get("pull_request")
if not pull_request:
return {}
api_url = pull_request.get("url")
if not api_url:
return {}
log_context["api_url"] = api_url
if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user:
elif event == 'pull_request' and action != 'synchronize':
pull_request, api_url = _check_pull_request_event(action, body, log_context, bot_user)
if not (pull_request and api_url):
return {}
if action in get_settings().github_app.handle_pr_actions:
if action == "review_requested":
if body.get("requested_reviewer", {}).get("login", "") != bot_user:
return {}
if pull_request.get("created_at") == pull_request.get("updated_at"):
# avoid double reviews when opening a PR for the first time
return {}
get_logger().info(f"Performing review because of event={event} and action={action}")
get_logger().info(f"Performing review for {api_url=} because of {event=} and {action=}")
await _perform_commands(get_settings().github_app.pr_commands, agent, body, api_url, log_context)
# handle push event for new commits
elif event == "push" and get_settings().github_app.handle_push_event:
get_logger().debug(f"[PUSH] {body=}")
# get the branch name
ref = body.get("ref")
get_logger().debug(f"[PUSH] {ref=}")
if not (ref and ref.startswith("refs/heads/")):
# handle pull_request event with synchronize action - "push trigger" for new commits
elif event == 'pull_request' and action == 'synchronize' and get_settings().github_app.handle_push_trigger:
pull_request, api_url = _check_pull_request_event(action, body, log_context, bot_user)
if not (pull_request and api_url):
return {}
branch = ref.removeprefix("refs/heads/")
get_logger().debug(f"[PUSH] {branch=}")
# skip first push (PR will follow)
if body.get("created"):
get_logger().debug("[PUSH] skipping first push")
return {}
# skip if no relevant commits (e.g. no commits at all, only bot/merge commits)
commits = body.get("commits", [])
get_logger().debug(f"[PUSH] {len(commits)} {commits=}")
bot_commits = [commit for commit in commits if commit.get("author", {}).get("username", "") == bot_user]
get_logger().debug(f"[PUSH] {len(bot_commits)} {bot_commits=}")
merge_commits = [commit for commit in commits if commit.get("message", "").startswith("Merge branch ")]
get_logger().debug(f"[PUSH] {len(merge_commits)} {merge_commits=}")
commit_ids_to_ignore = set()
if get_settings().github_app.push_event_ignore_bot_commits:
commit_ids_to_ignore.update({commit.get("id") for commit in bot_commits})
if get_settings().github_app.push_event_ignore_bot_commits:
commit_ids_to_ignore.update({commit.get("id") for commit in merge_commits})
commits = [commit for commit in commits if commit.get("id") not in commit_ids_to_ignore]
if not commits:
return {}
# TODO: consider adding some custom prompt to instruct the PR-Agent how to address bot commits
# Prevent triggering multiple times for subsequent push events when one is enough:
# The first event will trigger the processing, and if there's a second event in the meanwhile it will wait.
# TODO: do we still want to get the list of commits to filter bot/merge commits?
before_sha = body.get("before")
after_sha = body.get("after")
merge_commit_sha = pull_request.get("merge_commit_sha")
if before_sha == after_sha:
return {}
if get_settings().github_app.push_trigger_ignore_merge_commits and after_sha == merge_commit_sha:
return {}
if get_settings().github_app.push_trigger_ignore_bot_commits and body.get("sender", {}).get("login", "") == bot_user:
return {}
# Prevent triggering multiple times for subsequent push triggers when one is enough:
# The first push will trigger the processing, and if there's a second push in the meanwhile it will wait.
# Any more events will be discarded, because they will all trigger the exact same processing on the PR.
# We let the second event wait instead of discarding it because while the first event was being processed,
# more commits may have been pushed that led to the subsequent push events,
# more commits may have been pushed that led to the subsequent events,
# so we keep just one waiting as a delegate to trigger the processing for the new commits when done waiting.
current_active_tasks = _duplicate_branch_push_triggers.setdefault(branch, 0)
max_active_tasks = 2 if get_settings().github_app.push_event_pending_triggers_backlog else 1
current_active_tasks = _duplicate_push_triggers.setdefault(api_url, 0)
max_active_tasks = 2 if get_settings().github_app.push_trigger_pending_tasks_backlog else 1
if current_active_tasks < max_active_tasks:
# first and second tasks can enter
# first task can enter, and second tasks too if backlog is enabled
get_logger().info(
f"Continue processing push event for {branch=} because there are {current_active_tasks} active tasks"
f"Continue processing push trigger for {api_url=} because there are {current_active_tasks} active tasks"
)
_duplicate_branch_push_triggers[branch] += 1
_duplicate_push_triggers[api_url] += 1
else:
get_logger().info(
f"Skipping push event for {branch=} because another event already triggered the same processing"
f"Skipping push trigger for {api_url=} because another event already triggered the same processing"
)
return {}
async with _pending_task_duplicate_push_conditions[branch]:
async with _pending_task_duplicate_push_conditions[api_url]:
if current_active_tasks == 1:
# second task waits
get_logger().info(
f"Waiting to process push event for {branch=} because the first task is still in progress"
f"Waiting to process push trigger for {api_url=} because the first task is still in progress"
)
await _pending_task_duplicate_push_conditions[branch].wait()
get_logger().info(f"Finished waiting to process push event for {branch=} - continue with flow")
await _pending_task_duplicate_push_conditions[api_url].wait()
get_logger().info(f"Finished waiting to process push trigger for {api_url=} - continue with flow")
try:
# get PR info for branch
provider = get_git_provider()()
provider.repo, _ = provider._parse_pr_url(body["repository"]["html_url"].rstrip("/") + "/pull/1")
get_logger().debug(f"[PUSH] {provider.repo=}")
github_repo = provider._get_repo()
default_branch = body["repository"]["default_branch"]
get_logger().debug(f"[PUSH] {default_branch=}")
org = body["repository"]["organization"]
get_logger().debug(f"[PUSH] {org=}")
pull_requests = list(github_repo.get_pulls(state="open", base=default_branch, head=":".join((org, branch))))
get_logger().debug(f"[PUSH] {pull_requests=}")
if not pull_requests:
if get_settings().github_app.push_trigger_wait_for_initial_review and not get_git_provider()(api_url, incremental=IncrementalPR(True)).previous_review:
get_logger().info(f"Skipping incremental review because there was no initial review for {api_url=} yet")
return {}
pull_request = pull_requests[0].raw_data
# check that the PR is valid to run the agent like in the pull_request event
if not pull_request:
return {}
api_url = pull_request.get("url")
get_logger().debug(f"[PUSH] {api_url=}")
if not api_url:
return {}
log_context["api_url"] = api_url
if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user:
return {}
if get_settings().github_app.push_event_wait_for_initial_review and not get_git_provider()(api_url, incremental=IncrementalPR(True)).previous_review:
get_logger().info(f"Skipping incremental review because there was no initial review for PR={api_url} yet")
return {}
get_logger().info(f"Performing incremental review because of event={event} and branch={branch} with PR={api_url}")
get_logger().info(f"Performing incremental review for {api_url=} because of {event=} and {action=}")
await _perform_commands(get_settings().github_app.push_commands, agent, body, api_url, log_context)
finally:
# release the waiting task block
async with _pending_task_duplicate_push_conditions[branch]:
_pending_task_duplicate_push_conditions[branch].notify(1)
_duplicate_branch_push_triggers[branch] -= 1
async with _pending_task_duplicate_push_conditions[api_url]:
_pending_task_duplicate_push_conditions[api_url].notify(1)
_duplicate_push_triggers[api_url] -= 1
get_logger().info("event or action does not require handling")
return {}
def _check_pull_request_event(action: str, body: dict, log_context: dict, bot_user: str) -> Tuple[Dict[str, Any], str]:
invalid_result = {}, ""
pull_request = body.get("pull_request")
if not pull_request:
return invalid_result
api_url = pull_request.get("url")
if not api_url:
return invalid_result
log_context["api_url"] = api_url
if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user:
return invalid_result
if action in ("review_requested", "synchronize") and pull_request.get("created_at") == pull_request.get("updated_at"):
# avoid double reviews when opening a PR for the first time
return invalid_result
return pull_request, api_url
async def _perform_commands(commands: List[str], agent: PRAgent, body: dict, api_url: str, log_context: dict):
apply_repo_settings(api_url)
for command in commands:

View File

@ -84,13 +84,13 @@ pr_commands = [
"/describe --pr_description.add_original_user_description=true --pr_description.keep_original_user_title=true",
"/auto_review",
]
# settings for "push" event
handle_push_event = false
push_event_ignore_bot_commits = true
push_event_ignore_merge_commits = true
push_event_wait_for_initial_review = true
push_event_pending_triggers_backlog = true
push_event_pending_triggers_ttl = 300
# settings for "pull_request" event with "synchronize" action - used to detect and handle push triggers for new commits
handle_push_trigger = false
push_trigger_ignore_bot_commits = true
push_trigger_ignore_merge_commits = true
push_trigger_wait_for_initial_review = true
push_trigger_pending_tasks_backlog = true
push_trigger_pending_tasks_ttl = 300
push_commands = [
"/describe --pr_description.add_original_user_description=true --pr_description.keep_original_user_title=true",
"""/auto_review -i \