mirror of
https://github.com/qodo-ai/pr-agent.git
synced 2025-07-03 12:20:38 +08:00
feat: improve patch extension with new file content comparison
This commit is contained in:
@ -9,11 +9,12 @@ from pr_agent.log import get_logger
|
||||
|
||||
|
||||
def extend_patch(original_file_str, patch_str, patch_extra_lines_before=0,
|
||||
patch_extra_lines_after=0, filename: str = "") -> str:
|
||||
patch_extra_lines_after=0, filename: str = "", new_file_str="") -> str:
|
||||
if not patch_str or (patch_extra_lines_before == 0 and patch_extra_lines_after == 0) or not original_file_str:
|
||||
return patch_str
|
||||
|
||||
original_file_str = decode_if_bytes(original_file_str)
|
||||
new_file_str = decode_if_bytes(new_file_str)
|
||||
if not original_file_str:
|
||||
return patch_str
|
||||
|
||||
@ -22,7 +23,7 @@ def extend_patch(original_file_str, patch_str, patch_extra_lines_before=0,
|
||||
|
||||
try:
|
||||
extended_patch_str = process_patch_lines(patch_str, original_file_str,
|
||||
patch_extra_lines_before, patch_extra_lines_after)
|
||||
patch_extra_lines_before, patch_extra_lines_after, new_file_str)
|
||||
except Exception as e:
|
||||
get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()})
|
||||
return patch_str
|
||||
@ -52,12 +53,13 @@ def should_skip_patch(filename):
|
||||
return False
|
||||
|
||||
|
||||
def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before, patch_extra_lines_after):
|
||||
def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before, patch_extra_lines_after, new_file_str=""):
|
||||
allow_dynamic_context = get_settings().config.allow_dynamic_context
|
||||
patch_extra_lines_before_dynamic = get_settings().config.max_extra_lines_before_dynamic_context
|
||||
|
||||
original_lines = original_file_str.splitlines()
|
||||
len_original_lines = len(original_lines)
|
||||
file_original_lines = original_file_str.splitlines()
|
||||
file_new_lines = new_file_str.splitlines() if new_file_str else []
|
||||
len_original_lines = len(file_original_lines)
|
||||
patch_lines = patch_str.splitlines()
|
||||
extended_patch_lines = []
|
||||
|
||||
@ -73,12 +75,12 @@ def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before,
|
||||
if match:
|
||||
# finish processing previous hunk
|
||||
if is_valid_hunk and (start1 != -1 and patch_extra_lines_after > 0):
|
||||
delta_lines = [f' {line}' for line in original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]]
|
||||
extended_patch_lines.extend(delta_lines)
|
||||
delta_lines_original = [f' {line}' for line in file_original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]]
|
||||
extended_patch_lines.extend(delta_lines_original)
|
||||
|
||||
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
|
||||
|
||||
is_valid_hunk = check_if_hunk_lines_matches_to_file(i, original_lines, patch_lines, start1)
|
||||
is_valid_hunk = check_if_hunk_lines_matches_to_file(i, file_original_lines, patch_lines, start1)
|
||||
|
||||
if is_valid_hunk and (patch_extra_lines_before > 0 or patch_extra_lines_after > 0):
|
||||
def _calc_context_limits(patch_lines_before):
|
||||
@ -93,20 +95,28 @@ def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before,
|
||||
extended_size2 = max(extended_size2 - delta_cap, size2)
|
||||
return extended_start1, extended_size1, extended_start2, extended_size2
|
||||
|
||||
if allow_dynamic_context:
|
||||
if allow_dynamic_context and file_new_lines:
|
||||
extended_start1, extended_size1, extended_start2, extended_size2 = \
|
||||
_calc_context_limits(patch_extra_lines_before_dynamic)
|
||||
lines_before = original_lines[extended_start1 - 1:start1 - 1]
|
||||
|
||||
lines_before_original = file_original_lines[extended_start1 - 1:start1 - 1]
|
||||
lines_before_new = file_new_lines[extended_start2 - 1:start2 - 1]
|
||||
found_header = False
|
||||
for i, line, in enumerate(lines_before):
|
||||
if section_header in line:
|
||||
found_header = True
|
||||
# Update start and size in one line each
|
||||
extended_start1, extended_start2 = extended_start1 + i, extended_start2 + i
|
||||
extended_size1, extended_size2 = extended_size1 - i, extended_size2 - i
|
||||
# get_logger().debug(f"Found section header in line {i} before the hunk")
|
||||
section_header = ''
|
||||
break
|
||||
if lines_before_original == lines_before_new: # Making sure no changes from a previous hunk
|
||||
for i, line, in enumerate(lines_before_original):
|
||||
if section_header in line:
|
||||
found_header = True
|
||||
# Update start and size in one line each
|
||||
extended_start1, extended_start2 = extended_start1 + i, extended_start2 + i
|
||||
extended_size1, extended_size2 = extended_size1 - i, extended_size2 - i
|
||||
# get_logger().debug(f"Found section header in line {i} before the hunk")
|
||||
section_header = ''
|
||||
break
|
||||
else:
|
||||
get_logger().debug(f"Extra lines before hunk are different in original and new file - dynamic context",
|
||||
artifact={"lines_before_original": lines_before_original,
|
||||
"lines_before_new": lines_before_new})
|
||||
|
||||
if not found_header:
|
||||
# get_logger().debug(f"Section header not found in the extra lines before the hunk")
|
||||
extended_start1, extended_size1, extended_start2, extended_size2 = \
|
||||
@ -115,11 +125,23 @@ def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before,
|
||||
extended_start1, extended_size1, extended_start2, extended_size2 = \
|
||||
_calc_context_limits(patch_extra_lines_before)
|
||||
|
||||
delta_lines = [f' {line}' for line in original_lines[extended_start1 - 1:start1 - 1]]
|
||||
# check if extra lines before hunk are different in original and new file
|
||||
delta_lines_original = [f' {line}' for line in file_original_lines[extended_start1 - 1:start1 - 1]]
|
||||
if file_new_lines:
|
||||
delta_lines_new = [f' {line}' for line in file_new_lines[extended_start2 - 1:start2 - 1]]
|
||||
if delta_lines_original != delta_lines_new:
|
||||
get_logger().debug(f"Extra lines before hunk are different in original and new file",
|
||||
artifact={"delta_lines_original": delta_lines_original,
|
||||
"delta_lines_new": delta_lines_new})
|
||||
extended_start1 = start1
|
||||
extended_size1 = size1
|
||||
extended_start2 = start2
|
||||
extended_size2 = size2
|
||||
delta_lines_original = []
|
||||
|
||||
# logic to remove section header if its in the extra delta lines (in dynamic context, this is also done)
|
||||
if section_header and not allow_dynamic_context:
|
||||
for line in delta_lines:
|
||||
for line in delta_lines_original:
|
||||
if section_header in line:
|
||||
section_header = '' # remove section header if it is in the extra delta lines
|
||||
break
|
||||
@ -128,12 +150,12 @@ def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before,
|
||||
extended_size1 = size1
|
||||
extended_start2 = start2
|
||||
extended_size2 = size2
|
||||
delta_lines = []
|
||||
delta_lines_original = []
|
||||
extended_patch_lines.append('')
|
||||
extended_patch_lines.append(
|
||||
f'@@ -{extended_start1},{extended_size1} '
|
||||
f'+{extended_start2},{extended_size2} @@ {section_header}')
|
||||
extended_patch_lines.extend(delta_lines) # one to zero based
|
||||
extended_patch_lines.extend(delta_lines_original) # one to zero based
|
||||
continue
|
||||
extended_patch_lines.append(line)
|
||||
except Exception as e:
|
||||
@ -142,15 +164,14 @@ def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before,
|
||||
|
||||
# finish processing last hunk
|
||||
if start1 != -1 and patch_extra_lines_after > 0 and is_valid_hunk:
|
||||
delta_lines = original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]
|
||||
delta_lines_original = file_original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]
|
||||
# add space at the beginning of each extra line
|
||||
delta_lines = [f' {line}' for line in delta_lines]
|
||||
extended_patch_lines.extend(delta_lines)
|
||||
delta_lines_original = [f' {line}' for line in delta_lines_original]
|
||||
extended_patch_lines.extend(delta_lines_original)
|
||||
|
||||
extended_patch_str = '\n'.join(extended_patch_lines)
|
||||
return extended_patch_str
|
||||
|
||||
|
||||
def check_if_hunk_lines_matches_to_file(i, original_lines, patch_lines, start1):
|
||||
"""
|
||||
Check if the hunk lines match the original file content. We saw cases where the hunk header line doesn't match the original file content, and then
|
||||
@ -160,8 +181,18 @@ def check_if_hunk_lines_matches_to_file(i, original_lines, patch_lines, start1):
|
||||
try:
|
||||
if i + 1 < len(patch_lines) and patch_lines[i + 1][0] == ' ': # an existing line in the file
|
||||
if patch_lines[i + 1].strip() != original_lines[start1 - 1].strip():
|
||||
# check if different encoding is needed
|
||||
original_line = original_lines[start1 - 1].strip()
|
||||
for encoding in ['iso-8859-1', 'latin-1', 'ascii', 'utf-16']:
|
||||
try:
|
||||
if original_line.encode(encoding).decode().strip() == patch_lines[i + 1].strip():
|
||||
get_logger().info(f"Detected different encoding in hunk header line {start1}, needed encoding: {encoding}")
|
||||
return False # we still want to avoid extending the hunk. But we don't want to log an error
|
||||
except:
|
||||
pass
|
||||
|
||||
is_valid_hunk = False
|
||||
get_logger().error(
|
||||
get_logger().info(
|
||||
f"Invalid hunk in PR, line {start1} in hunk header doesn't match the original file content")
|
||||
except:
|
||||
pass
|
||||
@ -288,7 +319,7 @@ __old hunk__
|
||||
"""
|
||||
# if the file was deleted, return a message indicating that the file was deleted
|
||||
if hasattr(file, 'edit_type') and file.edit_type == EDIT_TYPE.DELETED:
|
||||
return f"\n\n## file '{file.filename.strip()}' was deleted\n"
|
||||
return f"\n\n## File '{file.filename.strip()}' was deleted\n"
|
||||
|
||||
patch_with_lines_str = f"\n\n## File: '{file.filename.strip()}'\n"
|
||||
patch_lines = patch.splitlines()
|
||||
@ -363,7 +394,7 @@ __old hunk__
|
||||
return patch_with_lines_str.rstrip()
|
||||
|
||||
|
||||
def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, side) -> tuple[str, str]:
|
||||
def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, side, remove_trailing_chars: bool = True) -> tuple[str, str]:
|
||||
try:
|
||||
patch_with_lines_str = f"\n\n## File: '{file_name.strip()}'\n\n"
|
||||
selected_lines = ""
|
||||
@ -411,4 +442,8 @@ def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, s
|
||||
get_logger().error(f"Failed to extract hunk lines from patch: {e}", artifact={"traceback": traceback.format_exc()})
|
||||
return "", ""
|
||||
|
||||
return patch_with_lines_str.rstrip(), selected_lines.rstrip()
|
||||
if remove_trailing_chars:
|
||||
patch_with_lines_str = patch_with_lines_str.rstrip()
|
||||
selected_lines = selected_lines.rstrip()
|
||||
|
||||
return patch_with_lines_str, selected_lines
|
||||
|
Reference in New Issue
Block a user