2023-07-06 00:21:08 +03:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2023-07-18 23:14:47 +03:00
|
|
|
import difflib
|
2023-07-11 22:11:42 +03:00
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
import re
|
2023-07-06 00:21:08 +03:00
|
|
|
import textwrap
|
2023-08-01 14:43:26 +03:00
|
|
|
from datetime import datetime
|
|
|
|
from typing import Any, List
|
2023-07-06 00:21:08 +03:00
|
|
|
|
2023-08-01 14:43:26 +03:00
|
|
|
from starlette_context import context
|
2023-07-18 23:14:47 +03:00
|
|
|
|
2023-08-01 14:43:26 +03:00
|
|
|
from pr_agent.config_loader import get_settings, global_settings
|
|
|
|
|
|
|
|
|
|
|
|
def get_setting(key: str) -> Any:
|
|
|
|
try:
|
|
|
|
key = key.upper()
|
|
|
|
return context.get("settings", global_settings).get(key, global_settings.get(key, None))
|
|
|
|
except Exception:
|
|
|
|
return global_settings.get(key, None)
|
2023-07-06 00:21:08 +03:00
|
|
|
|
|
|
|
def convert_to_markdown(output_data: dict) -> str:
|
2023-07-20 10:51:21 +03:00
|
|
|
"""
|
|
|
|
Convert a dictionary of data into markdown format.
|
|
|
|
Args:
|
|
|
|
output_data (dict): A dictionary containing data to be converted to markdown format.
|
|
|
|
Returns:
|
|
|
|
str: The markdown formatted text generated from the input dictionary.
|
|
|
|
"""
|
2023-07-06 00:21:08 +03:00
|
|
|
markdown_text = ""
|
|
|
|
|
|
|
|
emojis = {
|
|
|
|
"Main theme": "🎯",
|
|
|
|
"Type of PR": "📌",
|
2023-07-18 16:27:42 +03:00
|
|
|
"Score": "🏅",
|
2023-07-06 00:21:08 +03:00
|
|
|
"Relevant tests added": "🧪",
|
|
|
|
"Unrelated changes": "⚠️",
|
2023-07-11 08:50:28 +03:00
|
|
|
"Focused PR": "✨",
|
2023-07-06 00:21:08 +03:00
|
|
|
"Security concerns": "🔒",
|
|
|
|
"General PR suggestions": "💡",
|
2023-07-18 16:32:51 +03:00
|
|
|
"Insights from user's answers": "📝",
|
2023-07-19 01:03:47 +03:00
|
|
|
"Code suggestions": "🤖",
|
2023-07-06 00:21:08 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
for key, value in output_data.items():
|
|
|
|
if not value:
|
|
|
|
continue
|
|
|
|
if isinstance(value, dict):
|
|
|
|
markdown_text += f"## {key}\n\n"
|
|
|
|
markdown_text += convert_to_markdown(value)
|
|
|
|
elif isinstance(value, list):
|
|
|
|
if key.lower() == 'code suggestions':
|
|
|
|
markdown_text += "\n" # just looks nicer with additional line breaks
|
2023-07-19 15:12:50 +03:00
|
|
|
emoji = emojis.get(key, "")
|
2023-07-06 00:21:08 +03:00
|
|
|
markdown_text += f"- {emoji} **{key}:**\n\n"
|
|
|
|
for item in value:
|
|
|
|
if isinstance(item, dict) and key.lower() == 'code suggestions':
|
|
|
|
markdown_text += parse_code_suggestion(item)
|
|
|
|
elif item:
|
|
|
|
markdown_text += f" - {item}\n"
|
|
|
|
elif value != 'n/a':
|
2023-07-19 15:12:50 +03:00
|
|
|
emoji = emojis.get(key, "")
|
2023-07-06 00:21:08 +03:00
|
|
|
markdown_text += f"- {emoji} **{key}:** {value}\n"
|
|
|
|
return markdown_text
|
|
|
|
|
|
|
|
|
|
|
|
def parse_code_suggestion(code_suggestions: dict) -> str:
|
2023-07-20 10:51:21 +03:00
|
|
|
"""
|
|
|
|
Convert a dictionary of data into markdown format.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
code_suggestions (dict): A dictionary containing data to be converted to markdown format.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str: A string containing the markdown formatted text generated from the input dictionary.
|
|
|
|
"""
|
2023-07-06 00:21:08 +03:00
|
|
|
markdown_text = ""
|
|
|
|
for sub_key, sub_value in code_suggestions.items():
|
|
|
|
if isinstance(sub_value, dict): # "code example"
|
|
|
|
markdown_text += f" - **{sub_key}:**\n"
|
|
|
|
for code_key, code_value in sub_value.items(): # 'before' and 'after' code
|
|
|
|
code_str = f"```\n{code_value}\n```"
|
|
|
|
code_str_indented = textwrap.indent(code_str, ' ')
|
|
|
|
markdown_text += f" - **{code_key}:**\n{code_str_indented}\n"
|
|
|
|
else:
|
2023-07-13 08:10:36 +03:00
|
|
|
if "relevant file" in sub_key.lower():
|
2023-07-06 12:49:10 +03:00
|
|
|
markdown_text += f"\n - **{sub_key}:** {sub_value}\n"
|
2023-07-06 00:21:08 +03:00
|
|
|
else:
|
2023-07-06 12:49:10 +03:00
|
|
|
markdown_text += f" **{sub_key}:** {sub_value}\n"
|
|
|
|
|
2023-07-06 00:21:08 +03:00
|
|
|
markdown_text += "\n"
|
|
|
|
return markdown_text
|
|
|
|
|
2023-07-11 22:11:42 +03:00
|
|
|
|
2023-07-17 01:44:40 +03:00
|
|
|
def try_fix_json(review, max_iter=10, code_suggestions=False):
|
2023-07-20 10:51:21 +03:00
|
|
|
"""
|
|
|
|
Fix broken or incomplete JSON messages and return the parsed JSON data.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
- review: A string containing the JSON message to be fixed.
|
|
|
|
- max_iter: An integer representing the maximum number of iterations to try and fix the JSON message.
|
|
|
|
- code_suggestions: A boolean indicating whether to try and fix JSON messages with code suggestions.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
- data: A dictionary containing the parsed JSON data.
|
|
|
|
|
|
|
|
The function attempts to fix broken or incomplete JSON messages by parsing until the last valid code suggestion.
|
2023-08-01 14:43:26 +03:00
|
|
|
If the JSON message ends with a closing bracket, the function calls the fix_json_escape_char function to fix the
|
|
|
|
message.
|
|
|
|
If code_suggestions is True and the JSON message contains code suggestions, the function tries to fix the JSON
|
|
|
|
message by parsing until the last valid code suggestion.
|
|
|
|
The function uses regular expressions to find the last occurrence of "}," with any number of whitespaces or
|
|
|
|
newlines.
|
2023-07-20 10:51:21 +03:00
|
|
|
It tries to parse the JSON message with the closing bracket and checks if it is valid.
|
|
|
|
If the JSON message is valid, the parsed JSON data is returned.
|
2023-08-01 14:43:26 +03:00
|
|
|
If the JSON message is not valid, the last code suggestion is removed and the process is repeated until a valid JSON
|
|
|
|
message is obtained or the maximum number of iterations is reached.
|
2023-07-20 10:51:21 +03:00
|
|
|
If a valid JSON message is not obtained, an error is logged and an empty dictionary is returned.
|
|
|
|
"""
|
|
|
|
|
2023-07-17 01:44:40 +03:00
|
|
|
if review.endswith("}"):
|
|
|
|
return fix_json_escape_char(review)
|
2023-07-20 10:51:21 +03:00
|
|
|
|
2023-07-11 22:11:42 +03:00
|
|
|
data = {}
|
2023-07-17 01:44:40 +03:00
|
|
|
if code_suggestions:
|
|
|
|
closing_bracket = "]}"
|
|
|
|
else:
|
|
|
|
closing_bracket = "]}}"
|
2023-07-20 10:51:21 +03:00
|
|
|
|
2023-07-11 22:11:42 +03:00
|
|
|
if review.rfind("'Code suggestions': [") > 0 or review.rfind('"Code suggestions": [') > 0:
|
|
|
|
last_code_suggestion_ind = [m.end() for m in re.finditer(r"\}\s*,", review)][-1] - 1
|
|
|
|
valid_json = False
|
2023-07-11 22:22:08 +03:00
|
|
|
iter_count = 0
|
2023-07-20 10:51:21 +03:00
|
|
|
|
2023-07-11 22:22:08 +03:00
|
|
|
while last_code_suggestion_ind > 0 and not valid_json and iter_count < max_iter:
|
2023-07-11 22:11:42 +03:00
|
|
|
try:
|
2023-07-17 01:44:40 +03:00
|
|
|
data = json.loads(review[:last_code_suggestion_ind] + closing_bracket)
|
2023-07-11 22:11:42 +03:00
|
|
|
valid_json = True
|
2023-07-17 01:44:40 +03:00
|
|
|
review = review[:last_code_suggestion_ind].strip() + closing_bracket
|
2023-07-11 22:11:42 +03:00
|
|
|
except json.decoder.JSONDecodeError:
|
|
|
|
review = review[:last_code_suggestion_ind]
|
|
|
|
last_code_suggestion_ind = [m.end() for m in re.finditer(r"\}\s*,", review)][-1] - 1
|
2023-07-11 22:22:08 +03:00
|
|
|
iter_count += 1
|
2023-07-20 10:51:21 +03:00
|
|
|
|
2023-07-11 22:11:42 +03:00
|
|
|
if not valid_json:
|
|
|
|
logging.error("Unable to decode JSON response from AI")
|
|
|
|
data = {}
|
2023-07-20 10:51:21 +03:00
|
|
|
|
2023-07-11 22:11:42 +03:00
|
|
|
return data
|
2023-07-17 01:44:40 +03:00
|
|
|
|
2023-07-18 11:34:57 +03:00
|
|
|
|
2023-07-17 01:44:40 +03:00
|
|
|
def fix_json_escape_char(json_message=None):
|
2023-07-20 10:51:21 +03:00
|
|
|
"""
|
|
|
|
Fix broken or incomplete JSON messages and return the parsed JSON data.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
json_message (str): A string containing the JSON message to be fixed.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: A dictionary containing the parsed JSON data.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
None
|
|
|
|
|
|
|
|
"""
|
2023-07-17 01:44:40 +03:00
|
|
|
try:
|
|
|
|
result = json.loads(json_message)
|
|
|
|
except Exception as e:
|
|
|
|
# Find the offending character index:
|
|
|
|
idx_to_replace = int(str(e).split(' ')[-1].replace(')', ''))
|
|
|
|
# Remove the offending character:
|
|
|
|
json_message = list(json_message)
|
|
|
|
json_message[idx_to_replace] = ' '
|
|
|
|
new_message = ''.join(json_message)
|
2023-07-18 11:34:57 +03:00
|
|
|
return fix_json_escape_char(json_message=new_message)
|
|
|
|
return result
|
2023-07-18 23:14:47 +03:00
|
|
|
|
|
|
|
|
|
|
|
def convert_str_to_datetime(date_str):
|
2023-07-20 10:51:21 +03:00
|
|
|
"""
|
|
|
|
Convert a string representation of a date and time into a datetime object.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
date_str (str): A string representation of a date and time in the format '%a, %d %b %Y %H:%M:%S %Z'
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
datetime: A datetime object representing the input date and time.
|
|
|
|
|
|
|
|
Example:
|
|
|
|
>>> convert_str_to_datetime('Mon, 01 Jan 2022 12:00:00 UTC')
|
|
|
|
datetime.datetime(2022, 1, 1, 12, 0, 0)
|
|
|
|
"""
|
2023-07-18 23:14:47 +03:00
|
|
|
datetime_format = '%a, %d %b %Y %H:%M:%S %Z'
|
|
|
|
return datetime.strptime(date_str, datetime_format)
|
|
|
|
|
|
|
|
|
2023-08-03 22:14:05 +03:00
|
|
|
def load_large_diff(filename, new_file_content_str: str, original_file_content_str: str) -> str:
|
2023-07-20 10:51:21 +03:00
|
|
|
"""
|
2023-08-01 14:43:26 +03:00
|
|
|
Generate a patch for a modified file by comparing the original content of the file with the new content provided as
|
|
|
|
input.
|
2023-07-20 10:51:21 +03:00
|
|
|
|
|
|
|
Args:
|
|
|
|
new_file_content_str: The new content of the file as a string.
|
|
|
|
original_file_content_str: The original content of the file as a string.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The generated or provided patch string.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
None.
|
|
|
|
"""
|
2023-08-03 22:14:05 +03:00
|
|
|
patch = ""
|
|
|
|
try:
|
|
|
|
diff = difflib.unified_diff(original_file_content_str.splitlines(keepends=True),
|
|
|
|
new_file_content_str.splitlines(keepends=True))
|
|
|
|
if get_settings().config.verbosity_level >= 2:
|
|
|
|
logging.warning(f"File was modified, but no patch was found. Manually creating patch: {filename}.")
|
|
|
|
patch = ''.join(diff)
|
|
|
|
except Exception:
|
|
|
|
pass
|
2023-07-18 23:14:47 +03:00
|
|
|
return patch
|
2023-07-30 11:43:44 +03:00
|
|
|
|
|
|
|
|
2023-08-01 14:43:26 +03:00
|
|
|
def update_settings_from_args(args: List[str]) -> List[str]:
|
2023-07-30 12:14:26 +03:00
|
|
|
"""
|
|
|
|
Update the settings of the Dynaconf object based on the arguments passed to the function.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
args: A list of arguments passed to the function.
|
2023-07-30 12:16:43 +03:00
|
|
|
Example args: ['--pr_code_suggestions.extra_instructions="be funny',
|
|
|
|
'--pr_code_suggestions.num_code_suggestions=3']
|
2023-07-30 12:14:26 +03:00
|
|
|
|
|
|
|
Returns:
|
|
|
|
None
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
ValueError: If the argument is not in the correct format.
|
|
|
|
|
|
|
|
"""
|
2023-08-01 14:43:26 +03:00
|
|
|
other_args = []
|
2023-07-30 12:14:26 +03:00
|
|
|
if args:
|
2023-07-30 11:43:44 +03:00
|
|
|
for arg in args:
|
2023-08-01 14:43:26 +03:00
|
|
|
arg = arg.strip()
|
|
|
|
if arg.startswith('--'):
|
2023-07-30 11:43:44 +03:00
|
|
|
arg = arg.strip('-').strip()
|
2023-07-30 12:14:26 +03:00
|
|
|
vals = arg.split('=')
|
|
|
|
if len(vals) != 2:
|
2023-08-01 14:43:26 +03:00
|
|
|
logging.error(f'Invalid argument format: {arg}')
|
|
|
|
other_args.append(arg)
|
|
|
|
continue
|
2023-07-30 12:14:26 +03:00
|
|
|
key, value = vals
|
2023-08-01 14:43:26 +03:00
|
|
|
key = key.strip().upper()
|
|
|
|
value = value.strip()
|
|
|
|
get_settings().set(key, value)
|
2023-07-30 12:14:26 +03:00
|
|
|
logging.info(f'Updated setting {key} to: "{value}"')
|
2023-08-01 14:43:26 +03:00
|
|
|
else:
|
|
|
|
other_args.append(arg)
|
|
|
|
return other_args
|