Files
pr-agent/pr_agent/algo/git_patch_processing.py
2024-09-13 22:17:24 +03:00

389 lines
16 KiB
Python

from __future__ import annotations
import re
import traceback
from pr_agent.config_loader import get_settings
from pr_agent.algo.types import EDIT_TYPE, FilePatchInfo
from pr_agent.log import get_logger
def extend_patch(original_file_str, patch_str, patch_extra_lines_before=0,
patch_extra_lines_after=0, filename: str = "") -> str:
if not patch_str or (patch_extra_lines_before == 0 and patch_extra_lines_after == 0) or not original_file_str:
return patch_str
original_file_str = decode_if_bytes(original_file_str)
if not original_file_str:
return patch_str
if should_skip_patch(filename):
return patch_str
try:
extended_patch_str = process_patch_lines(patch_str, original_file_str,
patch_extra_lines_before, patch_extra_lines_after)
except Exception as e:
get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()})
return patch_str
return extended_patch_str
def decode_if_bytes(original_file_str):
if isinstance(original_file_str, bytes):
try:
return original_file_str.decode('utf-8')
except UnicodeDecodeError:
encodings_to_try = ['iso-8859-1', 'latin-1', 'ascii', 'utf-16']
for encoding in encodings_to_try:
try:
return original_file_str.decode(encoding)
except UnicodeDecodeError:
continue
return ""
return original_file_str
def should_skip_patch(filename):
patch_extension_skip_types = get_settings().config.patch_extension_skip_types
if patch_extension_skip_types and filename:
return any(filename.endswith(skip_type) for skip_type in patch_extension_skip_types)
return False
def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before, patch_extra_lines_after):
allow_dynamic_context = get_settings().config.allow_dynamic_context
patch_extra_lines_before_dynamic = get_settings().config.max_extra_lines_before_dynamic_context
original_lines = original_file_str.splitlines()
len_original_lines = len(original_lines)
patch_lines = patch_str.splitlines()
extended_patch_lines = []
start1, size1, start2, size2 = -1, -1, -1, -1
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
try:
for line in patch_lines:
if line.startswith('@@'):
match = RE_HUNK_HEADER.match(line)
# identify hunk header
if match:
# finish processing previous hunk
if start1 != -1 and patch_extra_lines_after > 0:
delta_lines = [f' {line}' for line in original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]]
extended_patch_lines.extend(delta_lines)
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
if patch_extra_lines_before > 0 or patch_extra_lines_after > 0:
def _calc_context_limits(patch_lines_before):
extended_start1 = max(1, start1 - patch_lines_before)
extended_size1 = size1 + (start1 - extended_start1) + patch_extra_lines_after
extended_start2 = max(1, start2 - patch_lines_before)
extended_size2 = size2 + (start2 - extended_start2) + patch_extra_lines_after
if extended_start1 - 1 + extended_size1 > len_original_lines:
# we cannot extend beyond the original file
delta_cap = extended_start1 - 1 + extended_size1 - len_original_lines
extended_size1 = max(extended_size1 - delta_cap, size1)
extended_size2 = max(extended_size2 - delta_cap, size2)
return extended_start1, extended_size1, extended_start2, extended_size2
if allow_dynamic_context:
extended_start1, extended_size1, extended_start2, extended_size2 = \
_calc_context_limits(patch_extra_lines_before_dynamic)
lines_before = original_lines[extended_start1 - 1:start1 - 1]
found_header = False
for i, line, in enumerate(lines_before):
if section_header in line:
found_header = True
# Update start and size in one line each
extended_start1, extended_start2 = extended_start1 + i, extended_start2 + i
extended_size1, extended_size2 = extended_size1 - i, extended_size2 - i
get_logger().debug(f"Found section header in line {i} before the hunk")
section_header = ''
break
if not found_header:
get_logger().debug(f"Section header not found in the extra lines before the hunk")
extended_start1, extended_size1, extended_start2, extended_size2 = \
_calc_context_limits(patch_extra_lines_before)
else:
extended_start1, extended_size1, extended_start2, extended_size2 = \
_calc_context_limits(patch_extra_lines_before)
delta_lines = [f' {line}' for line in original_lines[extended_start1 - 1:start1 - 1]]
# logic to remove section header if its in the extra delta lines (in dynamic context, this is also done)
if section_header and not allow_dynamic_context:
for line in delta_lines:
if section_header in line:
section_header = '' # remove section header if it is in the extra delta lines
break
else:
extended_start1 = start1
extended_size1 = size1
extended_start2 = start2
extended_size2 = size2
delta_lines = []
extended_patch_lines.append('')
extended_patch_lines.append(
f'@@ -{extended_start1},{extended_size1} '
f'+{extended_start2},{extended_size2} @@ {section_header}')
extended_patch_lines.extend(delta_lines) # one to zero based
continue
extended_patch_lines.append(line)
except Exception as e:
get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()})
return patch_str
# finish processing last hunk
if start1 != -1 and patch_extra_lines_after > 0:
delta_lines = original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]
# add space at the beginning of each extra line
delta_lines = [f' {line}' for line in delta_lines]
extended_patch_lines.extend(delta_lines)
extended_patch_str = '\n'.join(extended_patch_lines)
return extended_patch_str
def extract_hunk_headers(match):
res = list(match.groups())
for i in range(len(res)):
if res[i] is None:
res[i] = 0
try:
start1, size1, start2, size2 = map(int, res[:4])
except: # '@@ -0,0 +1 @@' case
start1, size1, size2 = map(int, res[:3])
start2 = 0
section_header = res[4]
return section_header, size1, size2, start1, start2
def omit_deletion_hunks(patch_lines) -> str:
"""
Omit deletion hunks from the patch and return the modified patch.
Args:
- patch_lines: a list of strings representing the lines of the patch
Returns:
- A string representing the modified patch with deletion hunks omitted
"""
temp_hunk = []
added_patched = []
add_hunk = False
inside_hunk = False
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))?\ @@[ ]?(.*)")
for line in patch_lines:
if line.startswith('@@'):
match = RE_HUNK_HEADER.match(line)
if match:
# finish previous hunk
if inside_hunk and add_hunk:
added_patched.extend(temp_hunk)
temp_hunk = []
add_hunk = False
temp_hunk.append(line)
inside_hunk = True
else:
temp_hunk.append(line)
if line:
edit_type = line[0]
if edit_type == '+':
add_hunk = True
if inside_hunk and add_hunk:
added_patched.extend(temp_hunk)
return '\n'.join(added_patched)
def handle_patch_deletions(patch: str, original_file_content_str: str,
new_file_content_str: str, file_name: str, edit_type: EDIT_TYPE = EDIT_TYPE.UNKNOWN) -> str:
"""
Handle entire file or deletion patches.
This function takes a patch, original file content, new file content, and file name as input.
It handles entire file or deletion patches and returns the modified patch with deletion hunks omitted.
Args:
patch (str): The patch to be handled.
original_file_content_str (str): The original content of the file.
new_file_content_str (str): The new content of the file.
file_name (str): The name of the file.
Returns:
str: The modified patch with deletion hunks omitted.
"""
if not new_file_content_str and (edit_type == EDIT_TYPE.DELETED or edit_type == EDIT_TYPE.UNKNOWN):
# logic for handling deleted files - don't show patch, just show that the file was deleted
if get_settings().config.verbosity_level > 0:
get_logger().info(f"Processing file: {file_name}, minimizing deletion file")
patch = None # file was deleted
else:
patch_lines = patch.splitlines()
patch_new = omit_deletion_hunks(patch_lines)
if patch != patch_new:
if get_settings().config.verbosity_level > 0:
get_logger().info(f"Processing file: {file_name}, hunks were deleted")
patch = patch_new
return patch
def convert_to_hunks_with_lines_numbers(patch: str, file) -> str:
"""
Convert a given patch string into a string with line numbers for each hunk, indicating the new and old content of
the file.
Args:
patch (str): The patch string to be converted.
file: An object containing the filename of the file being patched.
Returns:
str: A string with line numbers for each hunk, indicating the new and old content of the file.
example output:
## src/file.ts
__new hunk__
881 line1
882 line2
883 line3
887 + line4
888 + line5
889 line6
890 line7
...
__old hunk__
line1
line2
- line3
- line4
line5
line6
...
"""
# if the file was deleted, return a message indicating that the file was deleted
if hasattr(file, 'edit_type') and file.edit_type == EDIT_TYPE.DELETED:
return f"\n\n## file '{file.filename.strip()}' was deleted\n"
patch_with_lines_str = f"\n\n## File: '{file.filename.strip()}'\n"
patch_lines = patch.splitlines()
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
new_content_lines = []
old_content_lines = []
match = None
start1, size1, start2, size2 = -1, -1, -1, -1
prev_header_line = []
header_line = []
for line_i, line in enumerate(patch_lines):
if 'no newline at end of file' in line.lower().strip().strip('//'):
continue
if line.startswith('@@'):
header_line = line
match = RE_HUNK_HEADER.match(line)
if match and (new_content_lines or old_content_lines): # found a new hunk, split the previous lines
if prev_header_line:
patch_with_lines_str += f'\n{prev_header_line}\n'
if new_content_lines:
is_plus_lines = any([line.startswith('+') for line in new_content_lines])
if is_plus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n'
for i, line_new in enumerate(new_content_lines):
patch_with_lines_str += f"{start2 + i} {line_new}\n"
if old_content_lines:
is_minus_lines = any([line.startswith('-') for line in old_content_lines])
if is_minus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n'
for line_old in old_content_lines:
patch_with_lines_str += f"{line_old}\n"
new_content_lines = []
old_content_lines = []
if match:
prev_header_line = header_line
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
elif line.startswith('+'):
new_content_lines.append(line)
elif line.startswith('-'):
old_content_lines.append(line)
else:
if not line and line_i: # if this line is empty and the next line is a hunk header, skip it
if line_i + 1 < len(patch_lines) and patch_lines[line_i + 1].startswith('@@'):
continue
elif line_i + 1 == len(patch_lines):
continue
new_content_lines.append(line)
old_content_lines.append(line)
# finishing last hunk
if match and new_content_lines:
patch_with_lines_str += f'\n{header_line}\n'
if new_content_lines:
is_plus_lines = any([line.startswith('+') for line in new_content_lines])
if is_plus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n'
for i, line_new in enumerate(new_content_lines):
patch_with_lines_str += f"{start2 + i} {line_new}\n"
if old_content_lines:
is_minus_lines = any([line.startswith('-') for line in old_content_lines])
if is_minus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n'
for line_old in old_content_lines:
patch_with_lines_str += f"{line_old}\n"
return patch_with_lines_str.rstrip()
def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, side) -> tuple[str, str]:
patch_with_lines_str = f"\n\n## File: '{file_name.strip()}'\n\n"
selected_lines = ""
patch_lines = patch.splitlines()
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
match = None
start1, size1, start2, size2 = -1, -1, -1, -1
skip_hunk = False
selected_lines_num = 0
for line in patch_lines:
if 'no newline at end of file' in line.lower():
continue
if line.startswith('@@'):
skip_hunk = False
selected_lines_num = 0
header_line = line
match = RE_HUNK_HEADER.match(line)
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
# check if line range is in this hunk
if side.lower() == 'left':
# check if line range is in this hunk
if not (start1 <= line_start <= start1 + size1):
skip_hunk = True
continue
elif side.lower() == 'right':
if not (start2 <= line_start <= start2 + size2):
skip_hunk = True
continue
patch_with_lines_str += f'\n{header_line}\n'
elif not skip_hunk:
if side.lower() == 'right' and line_start <= start2 + selected_lines_num <= line_end:
selected_lines += line + '\n'
if side.lower() == 'left' and start1 <= selected_lines_num + start1 <= line_end:
selected_lines += line + '\n'
patch_with_lines_str += line + '\n'
if not line.startswith('-'): # currently we don't support /ask line for deleted lines
selected_lines_num += 1
return patch_with_lines_str.rstrip(), selected_lines.rstrip()