from __future__ import annotations import re import traceback from pr_agent.config_loader import get_settings from pr_agent.algo.types import EDIT_TYPE, FilePatchInfo from pr_agent.log import get_logger def extend_patch(original_file_str, patch_str, patch_extra_lines_before=0, patch_extra_lines_after=0, filename: str = "") -> str: if not patch_str or (patch_extra_lines_before == 0 and patch_extra_lines_after == 0) or not original_file_str: return patch_str original_file_str = decode_if_bytes(original_file_str) if not original_file_str: return patch_str if should_skip_patch(filename): return patch_str try: extended_patch_str = process_patch_lines(patch_str, original_file_str, patch_extra_lines_before, patch_extra_lines_after) except Exception as e: get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()}) return patch_str return extended_patch_str def decode_if_bytes(original_file_str): if isinstance(original_file_str, bytes): try: return original_file_str.decode('utf-8') except UnicodeDecodeError: encodings_to_try = ['iso-8859-1', 'latin-1', 'ascii', 'utf-16'] for encoding in encodings_to_try: try: return original_file_str.decode(encoding) except UnicodeDecodeError: continue return "" return original_file_str def should_skip_patch(filename): patch_extension_skip_types = get_settings().config.patch_extension_skip_types if patch_extension_skip_types and filename: return any(filename.endswith(skip_type) for skip_type in patch_extension_skip_types) return False def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before, patch_extra_lines_after): allow_dynamic_context = get_settings().config.allow_dynamic_context patch_extra_lines_before_dynamic = get_settings().config.max_extra_lines_before_dynamic_context original_lines = original_file_str.splitlines() len_original_lines = len(original_lines) patch_lines = patch_str.splitlines() extended_patch_lines = [] start1, size1, start2, size2 = -1, -1, -1, -1 RE_HUNK_HEADER = re.compile( r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)") try: for line in patch_lines: if line.startswith('@@'): match = RE_HUNK_HEADER.match(line) # identify hunk header if match: # finish processing previous hunk if start1 != -1 and patch_extra_lines_after > 0: delta_lines = [f' {line}' for line in original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]] extended_patch_lines.extend(delta_lines) section_header, size1, size2, start1, start2 = extract_hunk_headers(match) if patch_extra_lines_before > 0 or patch_extra_lines_after > 0: def _calc_context_limits(patch_lines_before): extended_start1 = max(1, start1 - patch_lines_before) extended_size1 = size1 + (start1 - extended_start1) + patch_extra_lines_after extended_start2 = max(1, start2 - patch_lines_before) extended_size2 = size2 + (start2 - extended_start2) + patch_extra_lines_after if extended_start1 - 1 + extended_size1 > len_original_lines: # we cannot extend beyond the original file delta_cap = extended_start1 - 1 + extended_size1 - len_original_lines extended_size1 = max(extended_size1 - delta_cap, size1) extended_size2 = max(extended_size2 - delta_cap, size2) return extended_start1, extended_size1, extended_start2, extended_size2 if allow_dynamic_context: extended_start1, extended_size1, extended_start2, extended_size2 = \ _calc_context_limits(patch_extra_lines_before_dynamic) lines_before = original_lines[extended_start1 - 1:start1 - 1] found_header = False for i, line, in enumerate(lines_before): if section_header in line: found_header = True # Update start and size in one line each extended_start1, extended_start2 = extended_start1 + i, extended_start2 + i extended_size1, extended_size2 = extended_size1 - i, extended_size2 - i # get_logger().debug(f"Found section header in line {i} before the hunk") section_header = '' break if not found_header: # get_logger().debug(f"Section header not found in the extra lines before the hunk") extended_start1, extended_size1, extended_start2, extended_size2 = \ _calc_context_limits(patch_extra_lines_before) else: extended_start1, extended_size1, extended_start2, extended_size2 = \ _calc_context_limits(patch_extra_lines_before) delta_lines = [f' {line}' for line in original_lines[extended_start1 - 1:start1 - 1]] # logic to remove section header if its in the extra delta lines (in dynamic context, this is also done) if section_header and not allow_dynamic_context: for line in delta_lines: if section_header in line: section_header = '' # remove section header if it is in the extra delta lines break else: extended_start1 = start1 extended_size1 = size1 extended_start2 = start2 extended_size2 = size2 delta_lines = [] extended_patch_lines.append('') extended_patch_lines.append( f'@@ -{extended_start1},{extended_size1} ' f'+{extended_start2},{extended_size2} @@ {section_header}') extended_patch_lines.extend(delta_lines) # one to zero based continue extended_patch_lines.append(line) except Exception as e: get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()}) return patch_str # finish processing last hunk if start1 != -1 and patch_extra_lines_after > 0: delta_lines = original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after] # add space at the beginning of each extra line delta_lines = [f' {line}' for line in delta_lines] extended_patch_lines.extend(delta_lines) extended_patch_str = '\n'.join(extended_patch_lines) return extended_patch_str def extract_hunk_headers(match): res = list(match.groups()) for i in range(len(res)): if res[i] is None: res[i] = 0 try: start1, size1, start2, size2 = map(int, res[:4]) except: # '@@ -0,0 +1 @@' case start1, size1, size2 = map(int, res[:3]) start2 = 0 section_header = res[4] return section_header, size1, size2, start1, start2 def omit_deletion_hunks(patch_lines) -> str: """ Omit deletion hunks from the patch and return the modified patch. Args: - patch_lines: a list of strings representing the lines of the patch Returns: - A string representing the modified patch with deletion hunks omitted """ temp_hunk = [] added_patched = [] add_hunk = False inside_hunk = False RE_HUNK_HEADER = re.compile( r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))?\ @@[ ]?(.*)") for line in patch_lines: if line.startswith('@@'): match = RE_HUNK_HEADER.match(line) if match: # finish previous hunk if inside_hunk and add_hunk: added_patched.extend(temp_hunk) temp_hunk = [] add_hunk = False temp_hunk.append(line) inside_hunk = True else: temp_hunk.append(line) if line: edit_type = line[0] if edit_type == '+': add_hunk = True if inside_hunk and add_hunk: added_patched.extend(temp_hunk) return '\n'.join(added_patched) def handle_patch_deletions(patch: str, original_file_content_str: str, new_file_content_str: str, file_name: str, edit_type: EDIT_TYPE = EDIT_TYPE.UNKNOWN) -> str: """ Handle entire file or deletion patches. This function takes a patch, original file content, new file content, and file name as input. It handles entire file or deletion patches and returns the modified patch with deletion hunks omitted. Args: patch (str): The patch to be handled. original_file_content_str (str): The original content of the file. new_file_content_str (str): The new content of the file. file_name (str): The name of the file. Returns: str: The modified patch with deletion hunks omitted. """ if not new_file_content_str and (edit_type == EDIT_TYPE.DELETED or edit_type == EDIT_TYPE.UNKNOWN): # logic for handling deleted files - don't show patch, just show that the file was deleted if get_settings().config.verbosity_level > 0: get_logger().info(f"Processing file: {file_name}, minimizing deletion file") patch = None # file was deleted else: patch_lines = patch.splitlines() patch_new = omit_deletion_hunks(patch_lines) if patch != patch_new: if get_settings().config.verbosity_level > 0: get_logger().info(f"Processing file: {file_name}, hunks were deleted") patch = patch_new return patch def convert_to_hunks_with_lines_numbers(patch: str, file) -> str: """ Convert a given patch string into a string with line numbers for each hunk, indicating the new and old content of the file. Args: patch (str): The patch string to be converted. file: An object containing the filename of the file being patched. Returns: str: A string with line numbers for each hunk, indicating the new and old content of the file. example output: ## src/file.ts __new hunk__ 881 line1 882 line2 883 line3 887 + line4 888 + line5 889 line6 890 line7 ... __old hunk__ line1 line2 - line3 - line4 line5 line6 ... """ # if the file was deleted, return a message indicating that the file was deleted if hasattr(file, 'edit_type') and file.edit_type == EDIT_TYPE.DELETED: return f"\n\n## file '{file.filename.strip()}' was deleted\n" patch_with_lines_str = f"\n\n## File: '{file.filename.strip()}'\n" patch_lines = patch.splitlines() RE_HUNK_HEADER = re.compile( r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)") new_content_lines = [] old_content_lines = [] match = None start1, size1, start2, size2 = -1, -1, -1, -1 prev_header_line = [] header_line = [] for line_i, line in enumerate(patch_lines): if 'no newline at end of file' in line.lower(): continue if line.startswith('@@'): header_line = line match = RE_HUNK_HEADER.match(line) if match and (new_content_lines or old_content_lines): # found a new hunk, split the previous lines if prev_header_line: patch_with_lines_str += f'\n{prev_header_line}\n' is_plus_lines = is_minus_lines = False if new_content_lines: is_plus_lines = any([line.startswith('+') for line in new_content_lines]) if old_content_lines: is_minus_lines = any([line.startswith('-') for line in old_content_lines]) if is_plus_lines or is_minus_lines: # notice 'True' here - we always present __new hunk__ for section, otherwise LLM gets confused patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n' for i, line_new in enumerate(new_content_lines): patch_with_lines_str += f"{start2 + i} {line_new}\n" if is_minus_lines: patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n' for line_old in old_content_lines: patch_with_lines_str += f"{line_old}\n" new_content_lines = [] old_content_lines = [] if match: prev_header_line = header_line section_header, size1, size2, start1, start2 = extract_hunk_headers(match) elif line.startswith('+'): new_content_lines.append(line) elif line.startswith('-'): old_content_lines.append(line) else: if not line and line_i: # if this line is empty and the next line is a hunk header, skip it if line_i + 1 < len(patch_lines) and patch_lines[line_i + 1].startswith('@@'): continue elif line_i + 1 == len(patch_lines): continue new_content_lines.append(line) old_content_lines.append(line) # finishing last hunk if match and new_content_lines: patch_with_lines_str += f'\n{header_line}\n' is_plus_lines = is_minus_lines = False if new_content_lines: is_plus_lines = any([line.startswith('+') for line in new_content_lines]) if old_content_lines: is_minus_lines = any([line.startswith('-') for line in old_content_lines]) if is_plus_lines or is_minus_lines: # notice 'True' here - we always present __new hunk__ for section, otherwise LLM gets confused patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n' for i, line_new in enumerate(new_content_lines): patch_with_lines_str += f"{start2 + i} {line_new}\n" if is_minus_lines: patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n' for line_old in old_content_lines: patch_with_lines_str += f"{line_old}\n" return patch_with_lines_str.rstrip() def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, side) -> tuple[str, str]: patch_with_lines_str = f"\n\n## File: '{file_name.strip()}'\n\n" selected_lines = "" patch_lines = patch.splitlines() RE_HUNK_HEADER = re.compile( r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)") match = None start1, size1, start2, size2 = -1, -1, -1, -1 skip_hunk = False selected_lines_num = 0 for line in patch_lines: if 'no newline at end of file' in line.lower(): continue if line.startswith('@@'): skip_hunk = False selected_lines_num = 0 header_line = line match = RE_HUNK_HEADER.match(line) section_header, size1, size2, start1, start2 = extract_hunk_headers(match) # check if line range is in this hunk if side.lower() == 'left': # check if line range is in this hunk if not (start1 <= line_start <= start1 + size1): skip_hunk = True continue elif side.lower() == 'right': if not (start2 <= line_start <= start2 + size2): skip_hunk = True continue patch_with_lines_str += f'\n{header_line}\n' elif not skip_hunk: if side.lower() == 'right' and line_start <= start2 + selected_lines_num <= line_end: selected_lines += line + '\n' if side.lower() == 'left' and start1 <= selected_lines_num + start1 <= line_end: selected_lines += line + '\n' patch_with_lines_str += line + '\n' if not line.startswith('-'): # currently we don't support /ask line for deleted lines selected_lines_num += 1 return patch_with_lines_str.rstrip(), selected_lines.rstrip()