Refactor github_app.py to improve handling of PR events and comments

This commit is contained in:
mrT23
2024-02-22 17:26:47 +02:00
parent ee59d34e39
commit b884920ef2
2 changed files with 143 additions and 148 deletions

View File

@ -64,42 +64,21 @@ async def get_body(request):
return body
_duplicate_requests_cache = DefaultDictWithTimeout(ttl=get_settings().github_app.duplicate_requests_cache_ttl)
_duplicate_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_trigger_pending_tasks_ttl)
_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_trigger_pending_tasks_ttl)
async def handle_request(body: Dict[str, Any], event: str):
"""
Handle incoming GitHub webhook requests.
Args:
body: The request body.
event: The GitHub event type.
"""
action = body.get("action")
if not action:
return {}
agent = PRAgent()
bot_user = get_settings().github_app.bot_user
sender = body.get("sender", {}).get("login")
log_context = {"action": action, "event": event, "sender": sender, "server_type": "github_app"}
if get_settings().github_app.duplicate_requests_cache and _is_duplicate_request(body):
return {}
# handle all sorts of comment events (e.g. issue_comment)
if action == 'created':
async def handle_comments_on_pr(body: Dict[str, Any],
event: str,
sender: str,
action: str,
log_context: Dict[str, Any],
agent: PRAgent):
if "comment" not in body:
return {}
comment_body = body.get("comment", {}).get("body")
if comment_body and isinstance(comment_body, str) and not comment_body.lstrip().startswith("/"):
get_logger().info("Ignoring comment not starting with /")
return {}
if sender and bot_user in sender:
get_logger().info(f"Ignoring comment from {bot_user} user")
return {}
get_logger().info(f"Processing comment from {sender} user")
disable_eyes = False
if "issue" in body and "pull_request" in body["issue"] and "url" in body["issue"]["pull_request"]:
api_url = body["issue"]["pull_request"]["url"]
@ -108,46 +87,50 @@ async def handle_request(body: Dict[str, Any], event: str):
try:
if ('/ask' in comment_body and
'subject_type' in body["comment"] and body["comment"]["subject_type"] == "line"):
# comment on a code line in the "files changed" tab
comment_body = handle_line_comments(body, comment_body)
disable_eyes = True
except Exception as e:
get_logger().error(f"Failed to handle line comments: {e}")
else:
return {}
log_context["api_url"] = api_url
get_logger().info(body)
get_logger().info(f"Handling comment because of event={event} and action={action}")
comment_id = body.get("comment", {}).get("id")
provider = get_git_provider()(pr_url=api_url)
with get_logger().contextualize(**log_context):
get_logger().info(f"Processing comment on PR {api_url=}, comment_body={comment_body}")
await agent.handle_request(api_url, comment_body,
notify=lambda: provider.add_eyes_reaction(comment_id, disable_eyes=disable_eyes))
# handle pull_request event:
# automatically review opened/reopened/ready_for_review PRs as long as they're not in draft,
# as well as direct review requests from the bot
elif event == 'pull_request' and action != 'synchronize':
async def handle_new_pr_opened(body: Dict[str, Any],
event: str,
sender: str,
action: str,
log_context: Dict[str, Any],
agent: PRAgent):
title = body.get("pull_request", {}).get("title", "")
# logic to ignore PRs with specific titles (e.g. "[Auto] ...")
ignore_pr_title_re = get_settings().get("GITHUB_APP.IGNORE_PR_TITLE", [])
if not isinstance(ignore_pr_title_re, list):
ignore_pr_title_re = [ignore_pr_title_re]
if ignore_pr_title_re and any(re.search(regex, title) for regex in ignore_pr_title_re):
get_logger().info(f"Ignoring PR with title '{title}' due to github_app.ignore_pr_title setting")
return {}
pull_request, api_url = _check_pull_request_event(action, body, log_context, bot_user)
pull_request, api_url = _check_pull_request_event(action, body, log_context)
if not (pull_request and api_url):
return {}
if action in get_settings().github_app.handle_pr_actions:
if action == "review_requested":
if body.get("requested_reviewer", {}).get("login", "") != bot_user:
return {}
get_logger().info(f"Performing review for {api_url=} because of {event=} and {action=}")
await _perform_commands("pr_commands", agent, body, api_url, log_context)
if action in get_settings().github_app.handle_pr_actions: # ['opened', 'reopened', 'ready_for_review', 'review_requested']
await _perform_auto_commands_github("pr_commands", agent, body, api_url, log_context)
# handle pull_request event with synchronize action - "push trigger" for new commits
elif event == 'pull_request' and action == 'synchronize':
pull_request, api_url = _check_pull_request_event(action, body, log_context, bot_user)
async def handle_push_trigger_for_new_commits(body: Dict[str, Any],
event: str,
sender: str,
action: str,
log_context: Dict[str, Any],
agent: PRAgent):
pull_request, api_url = _check_pull_request_event(action, body, log_context)
if not (pull_request and api_url):
return {}
@ -163,8 +146,6 @@ async def handle_request(body: Dict[str, Any], event: str):
return {}
if get_settings().github_app.push_trigger_ignore_merge_commits and after_sha == merge_commit_sha:
return {}
if get_settings().github_app.push_trigger_ignore_bot_commits and body.get("sender", {}).get("login", "") == bot_user:
return {}
# Prevent triggering multiple times for subsequent push triggers when one is enough:
# The first push will trigger the processing, and if there's a second push in the meanwhile it will wait.
@ -195,11 +176,13 @@ async def handle_request(body: Dict[str, Any], event: str):
get_logger().info(f"Finished waiting to process push trigger for {api_url=} - continue with flow")
try:
if get_settings().github_app.push_trigger_wait_for_initial_review and not get_git_provider()(api_url, incremental=IncrementalPR(True)).previous_review:
if get_settings().github_app.push_trigger_wait_for_initial_review and not get_git_provider()(api_url,
incremental=IncrementalPR(
True)).previous_review:
get_logger().info(f"Skipping incremental review because there was no initial review for {api_url=} yet")
return {}
get_logger().info(f"Performing incremental review for {api_url=} because of {event=} and {action=}")
await _perform_commands("push_commands", agent, body, api_url, log_context)
await _perform_auto_commands_github("push_commands", agent, body, api_url, log_context)
finally:
# release the waiting task block
@ -207,12 +190,40 @@ async def handle_request(body: Dict[str, Any], event: str):
_pending_task_duplicate_push_conditions[api_url].notify(1)
_duplicate_push_triggers[api_url] -= 1
async def handle_request(body: Dict[str, Any], event: str):
"""
Handle incoming GitHub webhook requests.
Args:
body: The request body.
event: The GitHub event type (e.g. "pull_request", "issue_comment", etc.).
"""
action = body.get("action") # "created", "opened", "reopened", "ready_for_review", "review_requested", "synchronize"
if not action:
return {}
agent = PRAgent()
sender = body.get("sender", {}).get("login")
log_context = {"action": action, "event": event, "sender": sender, "server_type": "github_app"}
# handle all sorts of comment events (e.g. issue_comment)
if action == 'created':
await handle_comments_on_pr(body, event, sender, action, log_context, agent)
# handle pull_request event:
# automatically review opened/reopened/ready_for_review PRs as long as they're not in draft
elif event == 'pull_request' and action != 'synchronize':
await handle_new_pr_opened(body, event, sender, action, log_context, agent)
# handle pull_request event with synchronize action - "push trigger" for new commits
elif event == 'pull_request' and action == 'synchronize':
await handle_push_trigger_for_new_commits(body, event, sender, action, log_context, agent)
else:
get_logger().info("event or action does not require handling")
return {}
def handle_line_comments(body, comment_body):
# handle line comments
def handle_line_comments(body: Dict, comment_body: [str, Any]) -> str:
if not comment_body:
return ""
start_line = body["comment"]["start_line"]
end_line = body["comment"]["line"]
start_line = end_line if not start_line else start_line
@ -227,7 +238,7 @@ def handle_line_comments(body, comment_body):
return comment_body
def _check_pull_request_event(action: str, body: dict, log_context: dict, bot_user: str) -> Tuple[Dict[str, Any], str]:
def _check_pull_request_event(action: str, body: dict, log_context: dict) -> Tuple[Dict[str, Any], str]:
invalid_result = {}, ""
pull_request = body.get("pull_request")
if not pull_request:
@ -236,7 +247,7 @@ def _check_pull_request_event(action: str, body: dict, log_context: dict, bot_us
if not api_url:
return invalid_result
log_context["api_url"] = api_url
if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user:
if pull_request.get("draft", True) or pull_request.get("state") != "open":
return invalid_result
if action in ("review_requested", "synchronize") and pull_request.get("created_at") == pull_request.get("updated_at"):
# avoid double reviews when opening a PR for the first time
@ -244,35 +255,24 @@ def _check_pull_request_event(action: str, body: dict, log_context: dict, bot_us
return pull_request, api_url
async def _perform_commands(commands_conf: str, agent: PRAgent, body: dict, api_url: str, log_context: dict):
async def _perform_auto_commands_github(commands_conf: str, agent: PRAgent, body: dict, api_url: str, log_context: dict):
apply_repo_settings(api_url)
commands = get_settings().get(f"github_app.{commands_conf}")
if not commands:
with get_logger().contextualize(**log_context):
get_logger().info(f"New PR, but no auto commands configured")
return
for command in commands:
split_command = command.split(" ")
command = split_command[0]
args = split_command[1:]
other_args = update_settings_from_args(args)
new_command = ' '.join([command] + other_args)
get_logger().info(body)
get_logger().info(f"Performing command: {new_command}")
with get_logger().contextualize(**log_context):
get_logger().info(f"New PR opened. Performing auto command '{new_command}', for {api_url=}")
await agent.handle_request(api_url, new_command)
def _is_duplicate_request(body: Dict[str, Any]) -> bool:
"""
In some deployments its possible to get duplicate requests if the handling is long,
This function checks if the request is duplicate and if so - ignores it.
"""
request_hash = hash(str(body))
get_logger().info(f"request_hash: {request_hash}")
is_duplicate = _duplicate_requests_cache.get(request_hash, False)
_duplicate_requests_cache[request_hash] = True
if is_duplicate:
get_logger().info(f"Ignoring duplicate request {request_hash}")
return is_duplicate
@router.get("/")
async def root():
return {"status": "ok"}

View File

@ -133,12 +133,7 @@ try_fix_invalid_inline_comments = true
[github_app]
# these toggles allows running the github app from custom deployments
bot_user = "github-actions[bot]"
override_deployment_type = true
# in some deployments it's possible to get duplicate requests if the handling is long,
# these settings are used to avoid handling duplicate requests.
duplicate_requests_cache = false
duplicate_requests_cache_ttl = 60 # in seconds
# settings for "pull_request" event
handle_pr_actions = ['opened', 'reopened', 'ready_for_review', 'review_requested']
pr_commands = [