" markdown_text += f"{emoji} {key_nice}: {value}" markdown_text += f" |
" if is_value_no(value): markdown_text += f"{emoji} No relevant tests" else: markdown_text += f"{emoji} PR contains tests" markdown_text += f" |
"
if is_value_no(value):
markdown_text += f"{emoji} No security concerns identified"
else:
markdown_text += f"{emoji} Security concerns \n\n" value = emphasize_header(value.strip()) markdown_text += f"{value}" markdown_text += f" |
" markdown_text += process_can_be_split(emoji, value) markdown_text += f" |
" markdown_text += f"{emoji} No key issues to review" markdown_text += f" |
"
markdown_text += f"{emoji} {key_nice} \n\n" else: markdown_text += f"### {emoji} Key issues to review\n\n#### \n" for i, issue in enumerate(issues): try: if not issue: continue relevant_file = issue.get('relevant_file', '').strip() issue_header = issue.get('issue_header', '').strip() issue_content = issue.get('issue_content', '').strip() start_line = int(str(issue.get('start_line', 0)).strip()) end_line = int(str(issue.get('end_line', 0)).strip()) reference_link = git_provider.get_line_link(relevant_file, start_line, end_line) if gfm_supported: if get_settings().pr_reviewer.extra_issue_links: issue_content_linked =copy.deepcopy(issue_content) referenced_variables_list = issue.get('referenced_variables', []) for component in referenced_variables_list: name = component['variable_name'].strip().strip('`') ind = issue_content.find(name) if ind != -1: reference_link_component = git_provider.get_line_link(relevant_file, component['relevant_line'], component['relevant_line']) issue_content_linked = issue_content_linked[:ind-1] + f"[`{name}`]({reference_link_component})" + issue_content_linked[ind+len(name)+1:] else: get_logger().info(f"Failed to find variable in issue content: {component['variable_name'].strip()}") issue_content = issue_content_linked issue_str = f"{issue_header} {issue_content}" else: issue_str = f"[**{issue_header}**]({reference_link})\n\n{issue_content}\n\n" markdown_text += f"{issue_str}\n\n" except Exception as e: get_logger().exception(f"Failed to process key issues to review: {e}") if gfm_supported: markdown_text += f" |
" markdown_text += f"{emoji} {key_nice}: {value}" markdown_text += f" |
relevant file | {relevant_file} | |
{sub_key} | " f"\n\n\n\n{sub_value.strip()}\n\n\n | |
relevant line | " sub_value_list = sub_value.split('](') relevant_line = sub_value_list[0].lstrip('`').lstrip('[') if len(sub_value_list) > 1: link = sub_value_list[1].rstrip(')').strip('`') markdown_text += f"{relevant_line} | " else: markdown_text += f"{relevant_line} | " markdown_text += "
and even instances of ` with
"""
text = html.escape(text)
parts = text.split('`')
for i in range(1, len(parts), 2):
parts[i] = '' + parts[i] + '
'
return ''.join(parts)
def find_line_number_of_relevant_line_in_file(diff_files: List[FilePatchInfo],
relevant_file: str,
relevant_line_in_file: str,
absolute_position: int = None) -> Tuple[int, int]:
position = -1
if absolute_position is None:
absolute_position = -1
re_hunk_header = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
if not diff_files:
return position, absolute_position
for file in diff_files:
if file.filename and (file.filename.strip() == relevant_file):
patch = file.patch
patch_lines = patch.splitlines()
delta = 0
start1, size1, start2, size2 = 0, 0, 0, 0
if absolute_position != -1: # matching absolute to relative
for i, line in enumerate(patch_lines):
# new hunk
if line.startswith('@@'):
delta = 0
match = re_hunk_header.match(line)
start1, size1, start2, size2 = map(int, match.groups()[:4])
elif not line.startswith('-'):
delta += 1
#
absolute_position_curr = start2 + delta - 1
if absolute_position_curr == absolute_position:
position = i
break
else:
# try to find the line in the patch using difflib, with some margin of error
matches_difflib: list[str | Any] = difflib.get_close_matches(relevant_line_in_file,
patch_lines, n=3, cutoff=0.93)
if len(matches_difflib) == 1 and matches_difflib[0].startswith('+'):
relevant_line_in_file = matches_difflib[0]
for i, line in enumerate(patch_lines):
if line.startswith('@@'):
delta = 0
match = re_hunk_header.match(line)
start1, size1, start2, size2 = map(int, match.groups()[:4])
elif not line.startswith('-'):
delta += 1
if relevant_line_in_file in line and line[0] != '-':
position = i
absolute_position = start2 + delta - 1
break
if position == -1 and relevant_line_in_file[0] == '+':
no_plus_line = relevant_line_in_file[1:].lstrip()
for i, line in enumerate(patch_lines):
if line.startswith('@@'):
delta = 0
match = re_hunk_header.match(line)
start1, size1, start2, size2 = map(int, match.groups()[:4])
elif not line.startswith('-'):
delta += 1
if no_plus_line in line and line[0] != '-':
# The model might add a '+' to the beginning of the relevant_line_in_file even if originally
# it's a context line
position = i
absolute_position = start2 + delta - 1
break
return position, absolute_position
def validate_and_await_rate_limit(rate_limit_status=None, git_provider=None, get_rate_limit_status_func=None):
if git_provider and not rate_limit_status:
rate_limit_status = {'resources': git_provider.github_client.get_rate_limit().raw_data}
if not rate_limit_status:
rate_limit_status = get_rate_limit_status_func()
# validate that the rate limit is not exceeded
is_rate_limit = False
for key, value in rate_limit_status['resources'].items():
if value['remaining'] == 0:
print(f"key: {key}, value: {value}")
is_rate_limit = True
sleep_time_sec = value['reset'] - datetime.now().timestamp()
sleep_time_hour = sleep_time_sec / 3600.0
print(f"Rate limit exceeded. Sleeping for {sleep_time_hour} hours")
if sleep_time_sec > 0:
time.sleep(sleep_time_sec+1)
if git_provider:
rate_limit_status = {'resources': git_provider.github_client.get_rate_limit().raw_data}
else:
rate_limit_status = get_rate_limit_status_func()
return is_rate_limit
def get_largest_component(pr_url):
from pr_agent.tools.pr_analyzer import PRAnalyzer
publish_output = get_settings().config.publish_output
get_settings().config.publish_output = False # disable publish output
analyzer = PRAnalyzer(pr_url)
methods_dict_files = analyzer.run_sync()
get_settings().config.publish_output = publish_output
max_lines_changed = 0
file_b = ""
component_name_b = ""
for file in methods_dict_files:
for method in methods_dict_files[file]:
try:
if methods_dict_files[file][method]['num_plus_lines'] > max_lines_changed:
max_lines_changed = methods_dict_files[file][method]['num_plus_lines']
file_b = file
component_name_b = method
except:
pass
if component_name_b:
get_logger().info(f"Using the largest changed component: '{component_name_b}'")
return component_name_b, file_b
else:
return None, None
def github_action_output(output_data: dict, key_name: str):
try:
if not get_settings().get('github_action_config.enable_output', False):
return
key_data = output_data.get(key_name, {})
with open(os.environ['GITHUB_OUTPUT'], 'a') as fh:
print(f"{key_name}={json.dumps(key_data, indent=None, ensure_ascii=False)}", file=fh)
except Exception as e:
get_logger().error(f"Failed to write to GitHub Action output: {e}")
return
def show_relevant_configurations(relevant_section: str) -> str:
forbidden_keys = ['ai_disclaimer', 'ai_disclaimer_title', 'ANALYTICS_FOLDER', 'secret_provider',
'trial_prefix_message', 'no_eligible_message', 'identity_provider', 'ALLOWED_REPOS','APP_NAME']
markdown_text = ""
markdown_text += "\n