diff --git a/pr_agent/git_providers/bitbucket_provider.py b/pr_agent/git_providers/bitbucket_provider.py index fef51794..fe416d5c 100644 --- a/pr_agent/git_providers/bitbucket_provider.py +++ b/pr_agent/git_providers/bitbucket_provider.py @@ -142,10 +142,15 @@ class BitbucketProvider(GitProvider): def remove_initial_comment(self): try: for comment in self.temp_comments: - self.pr.delete(f"comments/{comment}") + self.remove_comment(comment) except Exception as e: get_logger().exception(f"Failed to remove temp comments, error: {e}") + def remove_comment(self, comment): + try: + self.pr.delete(f"comments/{comment}") + except Exception as e: + get_logger().exception(f"Failed to remove comment, error: {e}") # funtion to create_inline_comment def create_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str): diff --git a/pr_agent/git_providers/codecommit_provider.py b/pr_agent/git_providers/codecommit_provider.py index 4e12f96e..a4836849 100644 --- a/pr_agent/git_providers/codecommit_provider.py +++ b/pr_agent/git_providers/codecommit_provider.py @@ -221,6 +221,9 @@ class CodeCommitProvider(GitProvider): def remove_initial_comment(self): return "" # not implemented yet + def remove_comment(self, comment): + return "" # not implemented yet + def publish_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codecommit/client/post_comment_for_compared_commit.html raise NotImplementedError("CodeCommit provider does not support publishing inline comments yet") diff --git a/pr_agent/git_providers/gerrit_provider.py b/pr_agent/git_providers/gerrit_provider.py index ae017fbd..fd67f5f0 100644 --- a/pr_agent/git_providers/gerrit_provider.py +++ b/pr_agent/git_providers/gerrit_provider.py @@ -396,5 +396,8 @@ class GerritProvider(GitProvider): # shutil.rmtree(self.repo_path) pass + def remove_comment(self, comment): + pass + def get_pr_branch(self): return self.repo.head diff --git a/pr_agent/git_providers/git_provider.py b/pr_agent/git_providers/git_provider.py index c6740467..6f986cf0 100644 --- a/pr_agent/git_providers/git_provider.py +++ b/pr_agent/git_providers/git_provider.py @@ -71,6 +71,10 @@ class GitProvider(ABC): def remove_initial_comment(self): pass + @abstractmethod + def remove_comment(self, comment): + pass + @abstractmethod def get_languages(self): pass diff --git a/pr_agent/git_providers/github_provider.py b/pr_agent/git_providers/github_provider.py index f868e482..40b0e121 100644 --- a/pr_agent/git_providers/github_provider.py +++ b/pr_agent/git_providers/github_provider.py @@ -50,7 +50,7 @@ class GithubProvider(GitProvider): def get_incremental_commits(self): self.commits = list(self.pr.get_commits()) - self.get_previous_review() + self.previous_review = self.get_previous_review(full=True, incremental=True) if self.previous_review: self.incremental.commits_range = self.get_commit_range() # Get all files changed during the commit range @@ -63,7 +63,7 @@ class GithubProvider(GitProvider): def get_commit_range(self): last_review_time = self.previous_review.created_at - first_new_commit_index = 0 + first_new_commit_index = None for index in range(len(self.commits) - 1, -1, -1): if self.commits[index].commit.author.date > last_review_time: self.incremental.first_new_commit_sha = self.commits[index].sha @@ -71,15 +71,21 @@ class GithubProvider(GitProvider): else: self.incremental.last_seen_commit_sha = self.commits[index].sha break - return self.commits[first_new_commit_index:] + return self.commits[first_new_commit_index:] if first_new_commit_index is not None else [] - def get_previous_review(self): - self.previous_review = None - self.comments = list(self.pr.get_issue_comments()) + def get_previous_review(self, *, full: bool, incremental: bool): + if not (full or incremental): + raise ValueError("At least one of full or incremental must be True") + if not getattr(self, "comments", None): + self.comments = list(self.pr.get_issue_comments()) + prefixes = [] + if full: + prefixes.append("## PR Analysis") + if incremental: + prefixes.append("## Incremental PR Review") for index in range(len(self.comments) - 1, -1, -1): - if self.comments[index].body.startswith("## PR Analysis") or self.comments[index].body.startswith("## Incremental PR Review"): - self.previous_review = self.comments[index] - break + if any(self.comments[index].body.startswith(prefix) for prefix in prefixes): + return self.comments[index] def get_files(self): if self.incremental.is_incremental and self.file_set: @@ -218,10 +224,16 @@ class GithubProvider(GitProvider): try: for comment in getattr(self.pr, 'comments_list', []): if comment.is_temporary: - comment.delete() + self.remove_comment(comment) except Exception as e: get_logger().exception(f"Failed to remove initial comment, error: {e}") + def remove_comment(self, comment): + try: + comment.delete() + except Exception as e: + get_logger().exception(f"Failed to remove comment, error: {e}") + def get_title(self): return self.pr.title diff --git a/pr_agent/git_providers/gitlab_provider.py b/pr_agent/git_providers/gitlab_provider.py index 0d09e622..d36fecf9 100644 --- a/pr_agent/git_providers/gitlab_provider.py +++ b/pr_agent/git_providers/gitlab_provider.py @@ -287,10 +287,16 @@ class GitLabProvider(GitProvider): def remove_initial_comment(self): try: for comment in self.temp_comments: - comment.delete() + self.remove_comment(comment) except Exception as e: get_logger().exception(f"Failed to remove temp comments, error: {e}") + def remove_comment(self, comment): + try: + comment.delete() + except Exception as e: + get_logger().exception(f"Failed to remove comment, error: {e}") + def get_title(self): return self.mr.title diff --git a/pr_agent/git_providers/local_git_provider.py b/pr_agent/git_providers/local_git_provider.py index 5fa9f0be..0ef11413 100644 --- a/pr_agent/git_providers/local_git_provider.py +++ b/pr_agent/git_providers/local_git_provider.py @@ -140,6 +140,9 @@ class LocalGitProvider(GitProvider): def remove_initial_comment(self): pass # Not applicable to the local git provider, but required by the interface + def remove_comment(self, comment): + pass # Not applicable to the local git provider, but required by the interface + def get_languages(self): """ Calculate percentage of languages in repository. Used for hunk prioritisation. diff --git a/pr_agent/servers/github_app.py b/pr_agent/servers/github_app.py index 37f96e2d..2bb85a3d 100644 --- a/pr_agent/servers/github_app.py +++ b/pr_agent/servers/github_app.py @@ -1,7 +1,7 @@ import copy import os -import time -from typing import Any, Dict +import asyncio.locks +from typing import Any, Dict, List, Tuple import uvicorn from fastapi import APIRouter, FastAPI, HTTPException, Request, Response @@ -14,8 +14,9 @@ from pr_agent.algo.utils import update_settings_from_args from pr_agent.config_loader import get_settings, global_settings from pr_agent.git_providers import get_git_provider from pr_agent.git_providers.utils import apply_repo_settings +from pr_agent.git_providers.git_provider import IncrementalPR from pr_agent.log import LoggingFormat, get_logger, setup_logger -from pr_agent.servers.utils import verify_signature +from pr_agent.servers.utils import verify_signature, DefaultDictWithTimeout setup_logger(fmt=LoggingFormat.JSON) @@ -47,6 +48,7 @@ async def handle_marketplace_webhooks(request: Request, response: Response): body = await get_body(request) get_logger().info(f'Request body:\n{body}') + async def get_body(request): try: body = await request.json() @@ -61,7 +63,9 @@ async def get_body(request): return body -_duplicate_requests_cache = {} +_duplicate_requests_cache = DefaultDictWithTimeout(ttl=get_settings().github_app.duplicate_requests_cache_ttl) +_duplicate_push_triggers = DefaultDictWithTimeout(ttl=get_settings().github_app.push_trigger_pending_tasks_ttl) +_pending_task_duplicate_push_conditions = DefaultDictWithTimeout(asyncio.locks.Condition, ttl=get_settings().github_app.push_trigger_pending_tasks_ttl) async def handle_request(body: Dict[str, Any], event: str): @@ -109,40 +113,110 @@ async def handle_request(body: Dict[str, Any], event: str): # handle pull_request event: # automatically review opened/reopened/ready_for_review PRs as long as they're not in draft, # as well as direct review requests from the bot - elif event == 'pull_request': - pull_request = body.get("pull_request") - if not pull_request: - return {} - api_url = pull_request.get("url") - if not api_url: - return {} - log_context["api_url"] = api_url - if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user: + elif event == 'pull_request' and action != 'synchronize': + pull_request, api_url = _check_pull_request_event(action, body, log_context, bot_user) + if not (pull_request and api_url): return {} if action in get_settings().github_app.handle_pr_actions: if action == "review_requested": if body.get("requested_reviewer", {}).get("login", "") != bot_user: return {} - if pull_request.get("created_at") == pull_request.get("updated_at"): - # avoid double reviews when opening a PR for the first time - return {} - get_logger().info(f"Performing review because of event={event} and action={action}") - apply_repo_settings(api_url) - for command in get_settings().github_app.pr_commands: - split_command = command.split(" ") - command = split_command[0] - args = split_command[1:] - other_args = update_settings_from_args(args) - new_command = ' '.join([command] + other_args) - get_logger().info(body) - get_logger().info(f"Performing command: {new_command}") - with get_logger().contextualize(**log_context): - await agent.handle_request(api_url, new_command) + get_logger().info(f"Performing review for {api_url=} because of {event=} and {action=}") + await _perform_commands(get_settings().github_app.pr_commands, agent, body, api_url, log_context) + + # handle pull_request event with synchronize action - "push trigger" for new commits + elif event == 'pull_request' and action == 'synchronize' and get_settings().github_app.handle_push_trigger: + pull_request, api_url = _check_pull_request_event(action, body, log_context, bot_user) + if not (pull_request and api_url): + return {} + + # TODO: do we still want to get the list of commits to filter bot/merge commits? + before_sha = body.get("before") + after_sha = body.get("after") + merge_commit_sha = pull_request.get("merge_commit_sha") + if before_sha == after_sha: + return {} + if get_settings().github_app.push_trigger_ignore_merge_commits and after_sha == merge_commit_sha: + return {} + if get_settings().github_app.push_trigger_ignore_bot_commits and body.get("sender", {}).get("login", "") == bot_user: + return {} + + # Prevent triggering multiple times for subsequent push triggers when one is enough: + # The first push will trigger the processing, and if there's a second push in the meanwhile it will wait. + # Any more events will be discarded, because they will all trigger the exact same processing on the PR. + # We let the second event wait instead of discarding it because while the first event was being processed, + # more commits may have been pushed that led to the subsequent events, + # so we keep just one waiting as a delegate to trigger the processing for the new commits when done waiting. + current_active_tasks = _duplicate_push_triggers.setdefault(api_url, 0) + max_active_tasks = 2 if get_settings().github_app.push_trigger_pending_tasks_backlog else 1 + if current_active_tasks < max_active_tasks: + # first task can enter, and second tasks too if backlog is enabled + get_logger().info( + f"Continue processing push trigger for {api_url=} because there are {current_active_tasks} active tasks" + ) + _duplicate_push_triggers[api_url] += 1 + else: + get_logger().info( + f"Skipping push trigger for {api_url=} because another event already triggered the same processing" + ) + return {} + async with _pending_task_duplicate_push_conditions[api_url]: + if current_active_tasks == 1: + # second task waits + get_logger().info( + f"Waiting to process push trigger for {api_url=} because the first task is still in progress" + ) + await _pending_task_duplicate_push_conditions[api_url].wait() + get_logger().info(f"Finished waiting to process push trigger for {api_url=} - continue with flow") + + try: + if get_settings().github_app.push_trigger_wait_for_initial_review and not get_git_provider()(api_url, incremental=IncrementalPR(True)).previous_review: + get_logger().info(f"Skipping incremental review because there was no initial review for {api_url=} yet") + return {} + get_logger().info(f"Performing incremental review for {api_url=} because of {event=} and {action=}") + await _perform_commands(get_settings().github_app.push_commands, agent, body, api_url, log_context) + + finally: + # release the waiting task block + async with _pending_task_duplicate_push_conditions[api_url]: + _pending_task_duplicate_push_conditions[api_url].notify(1) + _duplicate_push_triggers[api_url] -= 1 get_logger().info("event or action does not require handling") return {} +def _check_pull_request_event(action: str, body: dict, log_context: dict, bot_user: str) -> Tuple[Dict[str, Any], str]: + invalid_result = {}, "" + pull_request = body.get("pull_request") + if not pull_request: + return invalid_result + api_url = pull_request.get("url") + if not api_url: + return invalid_result + log_context["api_url"] = api_url + if pull_request.get("draft", True) or pull_request.get("state") != "open" or pull_request.get("user", {}).get("login", "") == bot_user: + return invalid_result + if action in ("review_requested", "synchronize") and pull_request.get("created_at") == pull_request.get("updated_at"): + # avoid double reviews when opening a PR for the first time + return invalid_result + return pull_request, api_url + + +async def _perform_commands(commands: List[str], agent: PRAgent, body: dict, api_url: str, log_context: dict): + apply_repo_settings(api_url) + for command in commands: + split_command = command.split(" ") + command = split_command[0] + args = split_command[1:] + other_args = update_settings_from_args(args) + new_command = ' '.join([command] + other_args) + get_logger().info(body) + get_logger().info(f"Performing command: {new_command}") + with get_logger().contextualize(**log_context): + await agent.handle_request(api_url, new_command) + + def _is_duplicate_request(body: Dict[str, Any]) -> bool: """ In some deployments its possible to get duplicate requests if the handling is long, @@ -150,13 +224,8 @@ def _is_duplicate_request(body: Dict[str, Any]) -> bool: """ request_hash = hash(str(body)) get_logger().info(f"request_hash: {request_hash}") - request_time = time.monotonic() - ttl = get_settings().github_app.duplicate_requests_cache_ttl # in seconds - to_delete = [key for key, key_time in _duplicate_requests_cache.items() if request_time - key_time > ttl] - for key in to_delete: - del _duplicate_requests_cache[key] - is_duplicate = request_hash in _duplicate_requests_cache - _duplicate_requests_cache[request_hash] = request_time + is_duplicate = _duplicate_requests_cache.get(request_hash, False) + _duplicate_requests_cache[request_hash] = True if is_duplicate: get_logger().info(f"Ignoring duplicate request {request_hash}") return is_duplicate diff --git a/pr_agent/servers/utils.py b/pr_agent/servers/utils.py index c24b880c..12dd85ae 100644 --- a/pr_agent/servers/utils.py +++ b/pr_agent/servers/utils.py @@ -1,5 +1,8 @@ import hashlib import hmac +import time +from collections import defaultdict +from typing import Callable, Any from fastapi import HTTPException @@ -25,3 +28,59 @@ def verify_signature(payload_body, secret_token, signature_header): class RateLimitExceeded(Exception): """Raised when the git provider API rate limit has been exceeded.""" pass + + +class DefaultDictWithTimeout(defaultdict): + """A defaultdict with a time-to-live (TTL).""" + + def __init__( + self, + default_factory: Callable[[], Any] = None, + ttl: int = None, + refresh_interval: int = 60, + update_key_time_on_get: bool = True, + *args, + **kwargs, + ): + """ + Args: + default_factory: The default factory to use for keys that are not in the dictionary. + ttl: The time-to-live (TTL) in seconds. + refresh_interval: How often to refresh the dict and delete items older than the TTL. + update_key_time_on_get: Whether to update the access time of a key also on get (or only when set). + """ + super().__init__(default_factory, *args, **kwargs) + self.__key_times = dict() + self.__ttl = ttl + self.__refresh_interval = refresh_interval + self.__update_key_time_on_get = update_key_time_on_get + self.__last_refresh = self.__time() - self.__refresh_interval + + @staticmethod + def __time(): + return time.monotonic() + + def __refresh(self): + if self.__ttl is None: + return + request_time = self.__time() + if request_time - self.__last_refresh > self.__refresh_interval: + return + to_delete = [key for key, key_time in self.__key_times.items() if request_time - key_time > self.__ttl] + for key in to_delete: + del self[key] + self.__last_refresh = request_time + + def __getitem__(self, __key): + if self.__update_key_time_on_get: + self.__key_times[__key] = self.__time() + self.__refresh() + return super().__getitem__(__key) + + def __setitem__(self, __key, __value): + self.__key_times[__key] = self.__time() + return super().__setitem__(__key, __value) + + def __delitem__(self, __key): + del self.__key_times[__key] + return super().__delitem__(__key) diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml index 9486c740..ffe6d39d 100644 --- a/pr_agent/settings/configuration.toml +++ b/pr_agent/settings/configuration.toml @@ -24,6 +24,7 @@ num_code_suggestions=4 inline_code_comments = false ask_and_reflect=false automatic_review=true +remove_previous_review_comment=false extra_instructions = "" [pr_description] # /describe # @@ -86,6 +87,27 @@ pr_commands = [ "/describe --pr_description.add_original_user_description=true --pr_description.keep_original_user_title=true", "/auto_review", ] +# settings for "pull_request" event with "synchronize" action - used to detect and handle push triggers for new commits +handle_push_trigger = false +push_trigger_ignore_bot_commits = true +push_trigger_ignore_merge_commits = true +push_trigger_wait_for_initial_review = true +push_trigger_pending_tasks_backlog = true +push_trigger_pending_tasks_ttl = 300 +push_commands = [ + "/describe --pr_description.add_original_user_description=true --pr_description.keep_original_user_title=true", + """/auto_review -i \ + --pr_reviewer.require_focused_review=false \ + --pr_reviewer.require_score_review=false \ + --pr_reviewer.require_tests_review=false \ + --pr_reviewer.require_security_review=false \ + --pr_reviewer.require_estimate_effort_to_review=false \ + --pr_reviewer.num_code_suggestions=0 \ + --pr_reviewer.inline_code_comments=false \ + --pr_reviewer.remove_previous_review_comment=true \ + --pr_reviewer.extra_instructions='' \ + """ +] [gitlab] # URL to the gitlab service diff --git a/pr_agent/tools/pr_reviewer.py b/pr_agent/tools/pr_reviewer.py index 78669d1a..be938b4a 100644 --- a/pr_agent/tools/pr_reviewer.py +++ b/pr_agent/tools/pr_reviewer.py @@ -100,6 +100,9 @@ class PRReviewer: if self.is_auto and not get_settings().pr_reviewer.automatic_review: get_logger().info(f'Automatic review is disabled {self.pr_url}') return None + if self.is_auto and self.incremental.is_incremental and not self.incremental.first_new_commit_sha: + get_logger().info(f"Incremental review is enabled for {self.pr_url} but there are no new commits") + return None get_logger().info(f'Reviewing PR: {self.pr_url} ...') @@ -113,9 +116,10 @@ class PRReviewer: if get_settings().config.publish_output: get_logger().info('Pushing PR review...') + previous_review_comment = self._get_previous_review_comment() self.git_provider.publish_comment(pr_comment) self.git_provider.remove_initial_comment() - + self._remove_previous_review_comment(previous_review_comment) if get_settings().pr_reviewer.inline_code_comments: get_logger().info('Pushing inline code comments...') self._publish_inline_code_comments() @@ -231,9 +235,13 @@ class PRReviewer: if self.incremental.is_incremental: last_commit_url = f"{self.git_provider.get_pr_url()}/commits/" \ f"{self.git_provider.incremental.first_new_commit_sha}" + last_commit_msg = self.incremental.commits_range[0].commit.message if self.incremental.commits_range else "" + incremental_review_markdown_text = f"Starting from commit {last_commit_url}" + if last_commit_msg: + incremental_review_markdown_text += f" \n_({last_commit_msg.splitlines(keepends=False)[0]})_" data = OrderedDict(data) data.update({'Incremental PR Review': { - "⏮️ Review for commits since previous PR-Agent review": f"Starting from commit {last_commit_url}"}}) + "⏮️ Review for commits since previous PR-Agent review": incremental_review_markdown_text}}) data.move_to_end('Incremental PR Review', last=False) markdown_text = convert_to_markdown(data, self.git_provider.is_supported("gfm_markdown")) @@ -314,3 +322,26 @@ class PRReviewer: break return question_str, answer_str + + def _get_previous_review_comment(self): + """ + Get the previous review comment if it exists. + """ + try: + if get_settings().pr_reviewer.remove_previous_review_comment and hasattr(self.git_provider, "get_previous_review"): + return self.git_provider.get_previous_review( + full=not self.incremental.is_incremental, + incremental=self.incremental.is_incremental, + ) + except Exception as e: + get_logger().exception(f"Failed to get previous review comment, error: {e}") + + def _remove_previous_review_comment(self, comment): + """ + Remove the previous review comment if it exists. + """ + try: + if get_settings().pr_reviewer.remove_previous_review_comment and comment: + self.git_provider.remove_comment(comment) + except Exception as e: + get_logger().exception(f"Failed to remove previous review comment, error: {e}")