Files
pr-agent/pr_agent/algo/git_patch_processing.py
mrT23 5c4bc0a008 Add Bitbucket diff handling and improve error logging
- Implement `publish_file_comments` method placeholder
- Enhance `is_supported` method to include `publish_file_comments`
- Refactor diff splitting logic to handle Bitbucket-specific headers
- Improve error handling and logging for file content retrieval
- Add `get_pr_owner_id` method to retrieve PR owner ID
- Update `_get_pr_file_content` to fetch file content from remote link
- Fix variable name typo in `extend_patch` function in `git_patch_processing.py`
2024-08-12 09:48:26 +03:00

326 lines
13 KiB
Python

from __future__ import annotations
import re
from pr_agent.config_loader import get_settings
from pr_agent.algo.types import EDIT_TYPE, FilePatchInfo
from pr_agent.log import get_logger
def extend_patch(original_file_str, patch_str, patch_extra_lines_before=0, patch_extra_lines_after=0) -> str:
if not patch_str or (patch_extra_lines_before == 0 and patch_extra_lines_after == 0) or not original_file_str:
return patch_str
if type(original_file_str) == bytes:
try:
original_file_str = original_file_str.decode('utf-8')
except UnicodeDecodeError:
return ""
original_lines = original_file_str.splitlines()
len_original_lines = len(original_lines)
patch_lines = patch_str.splitlines()
extended_patch_lines = []
start1, size1, start2, size2 = -1, -1, -1, -1
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
try:
for line in patch_lines:
if line.startswith('@@'):
match = RE_HUNK_HEADER.match(line)
if match:
# finish last hunk
if start1 != -1 and patch_extra_lines_after > 0:
delta_lines = original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]
delta_lines = [f' {line}' for line in delta_lines]
extended_patch_lines.extend(delta_lines)
res = list(match.groups())
for i in range(len(res)):
if res[i] is None:
res[i] = 0
try:
start1, size1, start2, size2 = map(int, res[:4])
except: # '@@ -0,0 +1 @@' case
start1, size1, size2 = map(int, res[:3])
start2 = 0
section_header = res[4]
if patch_extra_lines_before > 0 or patch_extra_lines_after > 0:
extended_start1 = max(1, start1 - patch_extra_lines_before)
extended_size1 = size1 + (start1 - extended_start1) + patch_extra_lines_after
extended_start2 = max(1, start2 - patch_extra_lines_before)
extended_size2 = size2 + (start2 - extended_start2) + patch_extra_lines_after
if extended_start1 - 1 + extended_size1 > len_original_lines:
# we cannot extend beyond the original file
delta_cap = extended_start1 - 1 + extended_size1 - len_original_lines
extended_size1 = max(extended_size1 - delta_cap, size1)
extended_size2 = max(extended_size2 - delta_cap, size2)
delta_lines = original_lines[extended_start1 - 1:start1 - 1]
delta_lines = [f' {line}' for line in delta_lines]
if section_header:
for line in delta_lines:
if section_header in line:
section_header = '' # remove section header if it is in the extra delta lines
break
else:
extended_start1 = start1
extended_size1 = size1
extended_start2 = start2
extended_size2 = size2
delta_lines = []
extended_patch_lines.append('')
extended_patch_lines.append(
f'@@ -{extended_start1},{extended_size1} '
f'+{extended_start2},{extended_size2} @@ {section_header}')
extended_patch_lines.extend(delta_lines) # one to zero based
continue
extended_patch_lines.append(line)
except Exception as e:
if get_settings().config.verbosity_level >= 2:
get_logger().error(f"Failed to extend patch: {e}")
return patch_str
# finish last hunk
if start1 != -1 and patch_extra_lines_after > 0:
delta_lines = original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]
# add space at the beginning of each extra line
delta_lines = [f' {line}' for line in delta_lines]
extended_patch_lines.extend(delta_lines)
extended_patch_str = '\n'.join(extended_patch_lines)
return extended_patch_str
def omit_deletion_hunks(patch_lines) -> str:
"""
Omit deletion hunks from the patch and return the modified patch.
Args:
- patch_lines: a list of strings representing the lines of the patch
Returns:
- A string representing the modified patch with deletion hunks omitted
"""
temp_hunk = []
added_patched = []
add_hunk = False
inside_hunk = False
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))?\ @@[ ]?(.*)")
for line in patch_lines:
if line.startswith('@@'):
match = RE_HUNK_HEADER.match(line)
if match:
# finish previous hunk
if inside_hunk and add_hunk:
added_patched.extend(temp_hunk)
temp_hunk = []
add_hunk = False
temp_hunk.append(line)
inside_hunk = True
else:
temp_hunk.append(line)
edit_type = line[0]
if edit_type == '+':
add_hunk = True
if inside_hunk and add_hunk:
added_patched.extend(temp_hunk)
return '\n'.join(added_patched)
def handle_patch_deletions(patch: str, original_file_content_str: str,
new_file_content_str: str, file_name: str, edit_type: EDIT_TYPE = EDIT_TYPE.UNKNOWN) -> str:
"""
Handle entire file or deletion patches.
This function takes a patch, original file content, new file content, and file name as input.
It handles entire file or deletion patches and returns the modified patch with deletion hunks omitted.
Args:
patch (str): The patch to be handled.
original_file_content_str (str): The original content of the file.
new_file_content_str (str): The new content of the file.
file_name (str): The name of the file.
Returns:
str: The modified patch with deletion hunks omitted.
"""
if not new_file_content_str and (edit_type == EDIT_TYPE.DELETED or edit_type == EDIT_TYPE.UNKNOWN):
# logic for handling deleted files - don't show patch, just show that the file was deleted
if get_settings().config.verbosity_level > 0:
get_logger().info(f"Processing file: {file_name}, minimizing deletion file")
patch = None # file was deleted
else:
patch_lines = patch.splitlines()
patch_new = omit_deletion_hunks(patch_lines)
if patch != patch_new:
if get_settings().config.verbosity_level > 0:
get_logger().info(f"Processing file: {file_name}, hunks were deleted")
patch = patch_new
return patch
def convert_to_hunks_with_lines_numbers(patch: str, file) -> str:
"""
Convert a given patch string into a string with line numbers for each hunk, indicating the new and old content of
the file.
Args:
patch (str): The patch string to be converted.
file: An object containing the filename of the file being patched.
Returns:
str: A string with line numbers for each hunk, indicating the new and old content of the file.
example output:
## src/file.ts
__new hunk__
881 line1
882 line2
883 line3
887 + line4
888 + line5
889 line6
890 line7
...
__old hunk__
line1
line2
- line3
- line4
line5
line6
...
"""
patch_with_lines_str = f"\n\n## file: '{file.filename.strip()}'\n"
patch_lines = patch.splitlines()
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
new_content_lines = []
old_content_lines = []
match = None
start1, size1, start2, size2 = -1, -1, -1, -1
prev_header_line = []
header_line = []
for line in patch_lines:
if 'no newline at end of file' in line.lower():
continue
if line.startswith('@@'):
header_line = line
match = RE_HUNK_HEADER.match(line)
if match and (new_content_lines or old_content_lines): # found a new hunk, split the previous lines
if prev_header_line:
patch_with_lines_str += f'\n{prev_header_line}\n'
if new_content_lines:
is_plus_lines = any([line.startswith('+') for line in new_content_lines])
if is_plus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n'
for i, line_new in enumerate(new_content_lines):
patch_with_lines_str += f"{start2 + i} {line_new}\n"
if old_content_lines:
is_minus_lines = any([line.startswith('-') for line in old_content_lines])
if is_minus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n'
for line_old in old_content_lines:
patch_with_lines_str += f"{line_old}\n"
new_content_lines = []
old_content_lines = []
if match:
prev_header_line = header_line
res = list(match.groups())
for i in range(len(res)):
if res[i] is None:
res[i] = 0
try:
start1, size1, start2, size2 = map(int, res[:4])
except: # '@@ -0,0 +1 @@' case
start1, size1, size2 = map(int, res[:3])
start2 = 0
elif line.startswith('+'):
new_content_lines.append(line)
elif line.startswith('-'):
old_content_lines.append(line)
else:
new_content_lines.append(line)
old_content_lines.append(line)
# finishing last hunk
if match and new_content_lines:
patch_with_lines_str += f'\n{header_line}\n'
if new_content_lines:
is_plus_lines = any([line.startswith('+') for line in new_content_lines])
if is_plus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n'
for i, line_new in enumerate(new_content_lines):
patch_with_lines_str += f"{start2 + i} {line_new}\n"
if old_content_lines:
is_minus_lines = any([line.startswith('-') for line in old_content_lines])
if is_minus_lines:
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n'
for line_old in old_content_lines:
patch_with_lines_str += f"{line_old}\n"
return patch_with_lines_str.rstrip()
def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, side) -> tuple[str, str]:
patch_with_lines_str = f"\n\n## file: '{file_name.strip()}'\n\n"
selected_lines = ""
patch_lines = patch.splitlines()
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
match = None
start1, size1, start2, size2 = -1, -1, -1, -1
skip_hunk = False
selected_lines_num = 0
for line in patch_lines:
if 'no newline at end of file' in line.lower():
continue
if line.startswith('@@'):
skip_hunk = False
selected_lines_num = 0
header_line = line
match = RE_HUNK_HEADER.match(line)
res = list(match.groups())
for i in range(len(res)):
if res[i] is None:
res[i] = 0
try:
start1, size1, start2, size2 = map(int, res[:4])
except: # '@@ -0,0 +1 @@' case
start1, size1, size2 = map(int, res[:3])
start2 = 0
# check if line range is in this hunk
if side.lower() == 'left':
# check if line range is in this hunk
if not (start1 <= line_start <= start1 + size1):
skip_hunk = True
continue
elif side.lower() == 'right':
if not (start2 <= line_start <= start2 + size2):
skip_hunk = True
continue
patch_with_lines_str += f'\n{header_line}\n'
elif not skip_hunk:
if side.lower() == 'right' and line_start <= start2 + selected_lines_num <= line_end:
selected_lines += line + '\n'
if side.lower() == 'left' and start1 <= selected_lines_num + start1 <= line_end:
selected_lines += line + '\n'
patch_with_lines_str += line + '\n'
if not line.startswith('-'): # currently we don't support /ask line for deleted lines
selected_lines_num += 1
return patch_with_lines_str.rstrip(), selected_lines.rstrip()