import copy import datetime import traceback from collections import OrderedDict from functools import partial from typing import List, Tuple from jinja2 import Environment, StrictUndefined from pr_agent.algo.ai_handlers.base_ai_handler import BaseAiHandler from pr_agent.algo.ai_handlers.litellm_ai_handler import LiteLLMAIHandler from pr_agent.algo.pr_processing import get_pr_diff, retry_with_fallback_models, add_ai_metadata_to_diff_files from pr_agent.algo.token_handler import TokenHandler from pr_agent.algo.utils import github_action_output, load_yaml, ModelType, \ show_relevant_configurations, convert_to_markdown_v2, PRReviewHeader from pr_agent.config_loader import get_settings from pr_agent.git_providers import get_git_provider, get_git_provider_with_context from pr_agent.git_providers.git_provider import IncrementalPR, get_main_pr_language from pr_agent.log import get_logger from pr_agent.servers.help import HelpMessage from pr_agent.tools.ticket_pr_compliance_check import extract_tickets, extract_and_cache_pr_tickets class PRReviewer: """ The PRReviewer class is responsible for reviewing a pull request and generating feedback using an AI model. """ def __init__(self, pr_url: str, is_answer: bool = False, is_auto: bool = False, args: list = None, ai_handler: partial[BaseAiHandler,] = LiteLLMAIHandler): """ Initialize the PRReviewer object with the necessary attributes and objects to review a pull request. Args: pr_url (str): The URL of the pull request to be reviewed. is_answer (bool, optional): Indicates whether the review is being done in answer mode. Defaults to False. is_auto (bool, optional): Indicates whether the review is being done in automatic mode. Defaults to False. ai_handler (BaseAiHandler): The AI handler to be used for the review. Defaults to None. args (list, optional): List of arguments passed to the PRReviewer class. Defaults to None. """ self.git_provider = get_git_provider_with_context(pr_url) self.args = args self.incremental = self.parse_incremental(args) # -i command if self.incremental and self.incremental.is_incremental: self.git_provider.get_incremental_commits(self.incremental) self.main_language = get_main_pr_language( self.git_provider.get_languages(), self.git_provider.get_files() ) self.pr_url = pr_url self.is_answer = is_answer self.is_auto = is_auto if self.is_answer and not self.git_provider.is_supported("get_issue_comments"): raise Exception(f"Answer mode is not supported for {get_settings().config.git_provider} for now") self.ai_handler = ai_handler() self.ai_handler.main_pr_language = self.main_language self.patches_diff = None self.prediction = None answer_str, question_str = self._get_user_answers() self.pr_description, self.pr_description_files = ( self.git_provider.get_pr_description(split_changes_walkthrough=True)) if (self.pr_description_files and get_settings().get("config.is_auto_command", False) and get_settings().get("config.enable_ai_metadata", False)): add_ai_metadata_to_diff_files(self.git_provider, self.pr_description_files) get_logger().debug(f"AI metadata added to the this command") else: get_settings().set("config.enable_ai_metadata", False) get_logger().debug(f"AI metadata is disabled for this command") self.vars = { "title": self.git_provider.pr.title, "branch": self.git_provider.get_pr_branch(), "description": self.pr_description, "language": self.main_language, "diff": "", # empty diff for initial calculation "num_pr_files": self.git_provider.get_num_of_files(), "require_score": get_settings().pr_reviewer.require_score_review, "require_tests": get_settings().pr_reviewer.require_tests_review, "require_estimate_effort_to_review": get_settings().pr_reviewer.require_estimate_effort_to_review, 'require_can_be_split_review': get_settings().pr_reviewer.require_can_be_split_review, 'require_security_review': get_settings().pr_reviewer.require_security_review, 'num_code_suggestions': get_settings().pr_reviewer.num_code_suggestions, 'question_str': question_str, 'answer_str': answer_str, "extra_instructions": get_settings().pr_reviewer.extra_instructions, "commit_messages_str": self.git_provider.get_commit_messages(), "custom_labels": "", "enable_custom_labels": get_settings().config.enable_custom_labels, "is_ai_metadata": get_settings().get("config.enable_ai_metadata", False), "related_tickets": get_settings().get('related_tickets', []), } self.token_handler = TokenHandler( self.git_provider.pr, self.vars, get_settings().pr_review_prompt.system, get_settings().pr_review_prompt.user ) def parse_incremental(self, args: List[str]): is_incremental = False if args and len(args) >= 1: arg = args[0] if arg == "-i": is_incremental = True incremental = IncrementalPR(is_incremental) return incremental async def run(self) -> None: try: if not self.git_provider.get_files(): get_logger().info(f"PR has no files: {self.pr_url}, skipping review") return None if self.incremental.is_incremental and not self._can_run_incremental_review(): return None if isinstance(self.args, list) and self.args and self.args[0] == 'auto_approve': get_logger().info(f'Auto approve flow PR: {self.pr_url} ...') self.auto_approve_logic() return None get_logger().info(f'Reviewing PR: {self.pr_url} ...') relevant_configs = {'pr_reviewer': dict(get_settings().pr_reviewer), 'config': dict(get_settings().config)} get_logger().debug("Relevant configs", artifacts=relevant_configs) # ticket extraction if exists await extract_and_cache_pr_tickets(self.git_provider, self.vars) if self.incremental.is_incremental and hasattr(self.git_provider, "unreviewed_files_set") and not self.git_provider.unreviewed_files_set: get_logger().info(f"Incremental review is enabled for {self.pr_url} but there are no new files") previous_review_url = "" if hasattr(self.git_provider, "previous_review"): previous_review_url = self.git_provider.previous_review.html_url if get_settings().config.publish_output: self.git_provider.publish_comment(f"Incremental Review Skipped\n" f"No files were changed since the [previous PR Review]({previous_review_url})") return None if get_settings().config.publish_output and not get_settings().config.get('is_auto_command', False): self.git_provider.publish_comment("Preparing review...", is_temporary=True) await retry_with_fallback_models(self._prepare_prediction) if not self.prediction: self.git_provider.remove_initial_comment() return None pr_review = self._prepare_pr_review() get_logger().debug(f"PR output", artifact=pr_review) if get_settings().config.publish_output: # publish the review if get_settings().pr_reviewer.persistent_comment and not self.incremental.is_incremental: final_update_message = get_settings().pr_reviewer.final_update_message self.git_provider.publish_persistent_comment(pr_review, initial_header=f"{PRReviewHeader.REGULAR.value} 🔍", update_header=True, final_update_message=final_update_message, ) else: self.git_provider.publish_comment(pr_review) self.git_provider.remove_initial_comment() if get_settings().pr_reviewer.inline_code_comments: self._publish_inline_code_comments() except Exception as e: get_logger().error(f"Failed to review PR: {e}") async def _prepare_prediction(self, model: str) -> None: self.patches_diff = get_pr_diff(self.git_provider, self.token_handler, model, add_line_numbers_to_hunks=True, disable_extra_lines=False,) if self.patches_diff: get_logger().debug(f"PR diff", diff=self.patches_diff) self.prediction = await self._get_prediction(model) else: get_logger().warning(f"Empty diff for PR: {self.pr_url}") self.prediction = None async def _get_prediction(self, model: str) -> str: """ Generate an AI prediction for the pull request review. Args: model: A string representing the AI model to be used for the prediction. Returns: A string representing the AI prediction for the pull request review. """ variables = copy.deepcopy(self.vars) variables["diff"] = self.patches_diff # update diff environment = Environment(undefined=StrictUndefined) system_prompt = environment.from_string(get_settings().pr_review_prompt.system).render(variables) user_prompt = environment.from_string(get_settings().pr_review_prompt.user).render(variables) response, finish_reason = await self.ai_handler.chat_completion( model=model, temperature=get_settings().config.temperature, system=system_prompt, user=user_prompt ) return response def _prepare_pr_review(self) -> str: """ Prepare the PR review by processing the AI prediction and generating a markdown-formatted text that summarizes the feedback. """ first_key = 'review' last_key = 'security_concerns' data = load_yaml(self.prediction.strip(), keys_fix_yaml=["ticket_compliance_check", "estimated_effort_to_review_[1-5]:", "security_concerns:", "key_issues_to_review:", "relevant_file:", "relevant_line:", "suggestion:"], first_key=first_key, last_key=last_key) github_action_output(data, 'review') # move data['review'] 'key_issues_to_review' key to the end of the dictionary if 'key_issues_to_review' in data['review']: key_issues_to_review = data['review'].pop('key_issues_to_review') data['review']['key_issues_to_review'] = key_issues_to_review if 'code_feedback' in data: code_feedback = data['code_feedback'] # Filter out code suggestions that can be submitted as inline comments if get_settings().pr_reviewer.inline_code_comments: del data['code_feedback'] else: for suggestion in code_feedback: if ('relevant_file' in suggestion) and (not suggestion['relevant_file'].startswith('``')): suggestion['relevant_file'] = f"``{suggestion['relevant_file']}``" if 'relevant_line' not in suggestion: suggestion['relevant_line'] = '' relevant_line_str = suggestion['relevant_line'].split('\n')[0] # removing '+' suggestion['relevant_line'] = relevant_line_str.lstrip('+').strip() # try to add line numbers link to code suggestions if hasattr(self.git_provider, 'generate_link_to_relevant_line_number'): link = self.git_provider.generate_link_to_relevant_line_number(suggestion) if link: suggestion['relevant_line'] = f"[{suggestion['relevant_line']}]({link})" else: pass incremental_review_markdown_text = None # Add incremental review section if self.incremental.is_incremental: last_commit_url = f"{self.git_provider.get_pr_url()}/commits/" \ f"{self.git_provider.incremental.first_new_commit_sha}" incremental_review_markdown_text = f"Starting from commit {last_commit_url}" markdown_text = convert_to_markdown_v2(data, self.git_provider.is_supported("gfm_markdown"), incremental_review_markdown_text, git_provider=self.git_provider) # Add help text if gfm_markdown is supported if self.git_provider.is_supported("gfm_markdown") and get_settings().pr_reviewer.enable_help_text: markdown_text += "