mirror of
https://github.com/qodo-ai/pr-agent.git
synced 2025-12-12 10:55:17 +00:00
- Simplified logic for handling new and old hunks to ensure consistent presentation of changes. - Updated documentation in TOML files to reflect changes in hunk section handling and line number references.
391 lines
No EOL
17 KiB
Python
391 lines
No EOL
17 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import traceback
|
|
|
|
from pr_agent.config_loader import get_settings
|
|
from pr_agent.algo.types import EDIT_TYPE, FilePatchInfo
|
|
from pr_agent.log import get_logger
|
|
|
|
|
|
def extend_patch(original_file_str, patch_str, patch_extra_lines_before=0,
|
|
patch_extra_lines_after=0, filename: str = "") -> str:
|
|
if not patch_str or (patch_extra_lines_before == 0 and patch_extra_lines_after == 0) or not original_file_str:
|
|
return patch_str
|
|
|
|
original_file_str = decode_if_bytes(original_file_str)
|
|
if not original_file_str:
|
|
return patch_str
|
|
|
|
if should_skip_patch(filename):
|
|
return patch_str
|
|
|
|
try:
|
|
extended_patch_str = process_patch_lines(patch_str, original_file_str,
|
|
patch_extra_lines_before, patch_extra_lines_after)
|
|
except Exception as e:
|
|
get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()})
|
|
return patch_str
|
|
|
|
return extended_patch_str
|
|
|
|
|
|
def decode_if_bytes(original_file_str):
|
|
if isinstance(original_file_str, bytes):
|
|
try:
|
|
return original_file_str.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
encodings_to_try = ['iso-8859-1', 'latin-1', 'ascii', 'utf-16']
|
|
for encoding in encodings_to_try:
|
|
try:
|
|
return original_file_str.decode(encoding)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
return ""
|
|
return original_file_str
|
|
|
|
|
|
def should_skip_patch(filename):
|
|
patch_extension_skip_types = get_settings().config.patch_extension_skip_types
|
|
if patch_extension_skip_types and filename:
|
|
return any(filename.endswith(skip_type) for skip_type in patch_extension_skip_types)
|
|
return False
|
|
|
|
|
|
def process_patch_lines(patch_str, original_file_str, patch_extra_lines_before, patch_extra_lines_after):
|
|
allow_dynamic_context = get_settings().config.allow_dynamic_context
|
|
patch_extra_lines_before_dynamic = get_settings().config.max_extra_lines_before_dynamic_context
|
|
|
|
original_lines = original_file_str.splitlines()
|
|
len_original_lines = len(original_lines)
|
|
patch_lines = patch_str.splitlines()
|
|
extended_patch_lines = []
|
|
|
|
start1, size1, start2, size2 = -1, -1, -1, -1
|
|
RE_HUNK_HEADER = re.compile(
|
|
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
|
|
try:
|
|
for line in patch_lines:
|
|
if line.startswith('@@'):
|
|
match = RE_HUNK_HEADER.match(line)
|
|
# identify hunk header
|
|
if match:
|
|
# finish processing previous hunk
|
|
if start1 != -1 and patch_extra_lines_after > 0:
|
|
delta_lines = [f' {line}' for line in original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]]
|
|
extended_patch_lines.extend(delta_lines)
|
|
|
|
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
|
|
|
|
if patch_extra_lines_before > 0 or patch_extra_lines_after > 0:
|
|
def _calc_context_limits(patch_lines_before):
|
|
extended_start1 = max(1, start1 - patch_lines_before)
|
|
extended_size1 = size1 + (start1 - extended_start1) + patch_extra_lines_after
|
|
extended_start2 = max(1, start2 - patch_lines_before)
|
|
extended_size2 = size2 + (start2 - extended_start2) + patch_extra_lines_after
|
|
if extended_start1 - 1 + extended_size1 > len_original_lines:
|
|
# we cannot extend beyond the original file
|
|
delta_cap = extended_start1 - 1 + extended_size1 - len_original_lines
|
|
extended_size1 = max(extended_size1 - delta_cap, size1)
|
|
extended_size2 = max(extended_size2 - delta_cap, size2)
|
|
return extended_start1, extended_size1, extended_start2, extended_size2
|
|
|
|
if allow_dynamic_context:
|
|
extended_start1, extended_size1, extended_start2, extended_size2 = \
|
|
_calc_context_limits(patch_extra_lines_before_dynamic)
|
|
lines_before = original_lines[extended_start1 - 1:start1 - 1]
|
|
found_header = False
|
|
for i, line, in enumerate(lines_before):
|
|
if section_header in line:
|
|
found_header = True
|
|
# Update start and size in one line each
|
|
extended_start1, extended_start2 = extended_start1 + i, extended_start2 + i
|
|
extended_size1, extended_size2 = extended_size1 - i, extended_size2 - i
|
|
# get_logger().debug(f"Found section header in line {i} before the hunk")
|
|
section_header = ''
|
|
break
|
|
if not found_header:
|
|
# get_logger().debug(f"Section header not found in the extra lines before the hunk")
|
|
extended_start1, extended_size1, extended_start2, extended_size2 = \
|
|
_calc_context_limits(patch_extra_lines_before)
|
|
else:
|
|
extended_start1, extended_size1, extended_start2, extended_size2 = \
|
|
_calc_context_limits(patch_extra_lines_before)
|
|
|
|
delta_lines = [f' {line}' for line in original_lines[extended_start1 - 1:start1 - 1]]
|
|
|
|
# logic to remove section header if its in the extra delta lines (in dynamic context, this is also done)
|
|
if section_header and not allow_dynamic_context:
|
|
for line in delta_lines:
|
|
if section_header in line:
|
|
section_header = '' # remove section header if it is in the extra delta lines
|
|
break
|
|
else:
|
|
extended_start1 = start1
|
|
extended_size1 = size1
|
|
extended_start2 = start2
|
|
extended_size2 = size2
|
|
delta_lines = []
|
|
extended_patch_lines.append('')
|
|
extended_patch_lines.append(
|
|
f'@@ -{extended_start1},{extended_size1} '
|
|
f'+{extended_start2},{extended_size2} @@ {section_header}')
|
|
extended_patch_lines.extend(delta_lines) # one to zero based
|
|
continue
|
|
extended_patch_lines.append(line)
|
|
except Exception as e:
|
|
get_logger().warning(f"Failed to extend patch: {e}", artifact={"traceback": traceback.format_exc()})
|
|
return patch_str
|
|
|
|
# finish processing last hunk
|
|
if start1 != -1 and patch_extra_lines_after > 0:
|
|
delta_lines = original_lines[start1 + size1 - 1:start1 + size1 - 1 + patch_extra_lines_after]
|
|
# add space at the beginning of each extra line
|
|
delta_lines = [f' {line}' for line in delta_lines]
|
|
extended_patch_lines.extend(delta_lines)
|
|
|
|
extended_patch_str = '\n'.join(extended_patch_lines)
|
|
return extended_patch_str
|
|
|
|
|
|
def extract_hunk_headers(match):
|
|
res = list(match.groups())
|
|
for i in range(len(res)):
|
|
if res[i] is None:
|
|
res[i] = 0
|
|
try:
|
|
start1, size1, start2, size2 = map(int, res[:4])
|
|
except: # '@@ -0,0 +1 @@' case
|
|
start1, size1, size2 = map(int, res[:3])
|
|
start2 = 0
|
|
section_header = res[4]
|
|
return section_header, size1, size2, start1, start2
|
|
|
|
|
|
def omit_deletion_hunks(patch_lines) -> str:
|
|
"""
|
|
Omit deletion hunks from the patch and return the modified patch.
|
|
Args:
|
|
- patch_lines: a list of strings representing the lines of the patch
|
|
Returns:
|
|
- A string representing the modified patch with deletion hunks omitted
|
|
"""
|
|
|
|
temp_hunk = []
|
|
added_patched = []
|
|
add_hunk = False
|
|
inside_hunk = False
|
|
RE_HUNK_HEADER = re.compile(
|
|
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))?\ @@[ ]?(.*)")
|
|
|
|
for line in patch_lines:
|
|
if line.startswith('@@'):
|
|
match = RE_HUNK_HEADER.match(line)
|
|
if match:
|
|
# finish previous hunk
|
|
if inside_hunk and add_hunk:
|
|
added_patched.extend(temp_hunk)
|
|
temp_hunk = []
|
|
add_hunk = False
|
|
temp_hunk.append(line)
|
|
inside_hunk = True
|
|
else:
|
|
temp_hunk.append(line)
|
|
if line:
|
|
edit_type = line[0]
|
|
if edit_type == '+':
|
|
add_hunk = True
|
|
if inside_hunk and add_hunk:
|
|
added_patched.extend(temp_hunk)
|
|
|
|
return '\n'.join(added_patched)
|
|
|
|
|
|
def handle_patch_deletions(patch: str, original_file_content_str: str,
|
|
new_file_content_str: str, file_name: str, edit_type: EDIT_TYPE = EDIT_TYPE.UNKNOWN) -> str:
|
|
"""
|
|
Handle entire file or deletion patches.
|
|
|
|
This function takes a patch, original file content, new file content, and file name as input.
|
|
It handles entire file or deletion patches and returns the modified patch with deletion hunks omitted.
|
|
|
|
Args:
|
|
patch (str): The patch to be handled.
|
|
original_file_content_str (str): The original content of the file.
|
|
new_file_content_str (str): The new content of the file.
|
|
file_name (str): The name of the file.
|
|
|
|
Returns:
|
|
str: The modified patch with deletion hunks omitted.
|
|
|
|
"""
|
|
if not new_file_content_str and (edit_type == EDIT_TYPE.DELETED or edit_type == EDIT_TYPE.UNKNOWN):
|
|
# logic for handling deleted files - don't show patch, just show that the file was deleted
|
|
if get_settings().config.verbosity_level > 0:
|
|
get_logger().info(f"Processing file: {file_name}, minimizing deletion file")
|
|
patch = None # file was deleted
|
|
else:
|
|
patch_lines = patch.splitlines()
|
|
patch_new = omit_deletion_hunks(patch_lines)
|
|
if patch != patch_new:
|
|
if get_settings().config.verbosity_level > 0:
|
|
get_logger().info(f"Processing file: {file_name}, hunks were deleted")
|
|
patch = patch_new
|
|
return patch
|
|
|
|
|
|
def convert_to_hunks_with_lines_numbers(patch: str, file) -> str:
|
|
"""
|
|
Convert a given patch string into a string with line numbers for each hunk, indicating the new and old content of
|
|
the file.
|
|
|
|
Args:
|
|
patch (str): The patch string to be converted.
|
|
file: An object containing the filename of the file being patched.
|
|
|
|
Returns:
|
|
str: A string with line numbers for each hunk, indicating the new and old content of the file.
|
|
|
|
example output:
|
|
## src/file.ts
|
|
__new hunk__
|
|
881 line1
|
|
882 line2
|
|
883 line3
|
|
887 + line4
|
|
888 + line5
|
|
889 line6
|
|
890 line7
|
|
...
|
|
__old hunk__
|
|
line1
|
|
line2
|
|
- line3
|
|
- line4
|
|
line5
|
|
line6
|
|
...
|
|
"""
|
|
# if the file was deleted, return a message indicating that the file was deleted
|
|
if hasattr(file, 'edit_type') and file.edit_type == EDIT_TYPE.DELETED:
|
|
return f"\n\n## file '{file.filename.strip()}' was deleted\n"
|
|
|
|
patch_with_lines_str = f"\n\n## File: '{file.filename.strip()}'\n"
|
|
patch_lines = patch.splitlines()
|
|
RE_HUNK_HEADER = re.compile(
|
|
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
|
|
new_content_lines = []
|
|
old_content_lines = []
|
|
match = None
|
|
start1, size1, start2, size2 = -1, -1, -1, -1
|
|
prev_header_line = []
|
|
header_line = []
|
|
for line_i, line in enumerate(patch_lines):
|
|
if 'no newline at end of file' in line.lower():
|
|
continue
|
|
|
|
if line.startswith('@@'):
|
|
header_line = line
|
|
match = RE_HUNK_HEADER.match(line)
|
|
if match and (new_content_lines or old_content_lines): # found a new hunk, split the previous lines
|
|
if prev_header_line:
|
|
patch_with_lines_str += f'\n{prev_header_line}\n'
|
|
is_plus_lines = is_minus_lines = False
|
|
if new_content_lines:
|
|
is_plus_lines = any([line.startswith('+') for line in new_content_lines])
|
|
if old_content_lines:
|
|
is_minus_lines = any([line.startswith('-') for line in old_content_lines])
|
|
if is_plus_lines or is_minus_lines: # notice 'True' here - we always present __new hunk__ for section, otherwise LLM gets confused
|
|
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n'
|
|
for i, line_new in enumerate(new_content_lines):
|
|
patch_with_lines_str += f"{start2 + i} {line_new}\n"
|
|
if is_minus_lines:
|
|
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n'
|
|
for line_old in old_content_lines:
|
|
patch_with_lines_str += f"{line_old}\n"
|
|
new_content_lines = []
|
|
old_content_lines = []
|
|
if match:
|
|
prev_header_line = header_line
|
|
|
|
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
|
|
|
|
elif line.startswith('+'):
|
|
new_content_lines.append(line)
|
|
elif line.startswith('-'):
|
|
old_content_lines.append(line)
|
|
else:
|
|
if not line and line_i: # if this line is empty and the next line is a hunk header, skip it
|
|
if line_i + 1 < len(patch_lines) and patch_lines[line_i + 1].startswith('@@'):
|
|
continue
|
|
elif line_i + 1 == len(patch_lines):
|
|
continue
|
|
new_content_lines.append(line)
|
|
old_content_lines.append(line)
|
|
|
|
# finishing last hunk
|
|
if match and new_content_lines:
|
|
patch_with_lines_str += f'\n{header_line}\n'
|
|
is_plus_lines = is_minus_lines = False
|
|
if new_content_lines:
|
|
is_plus_lines = any([line.startswith('+') for line in new_content_lines])
|
|
if old_content_lines:
|
|
is_minus_lines = any([line.startswith('-') for line in old_content_lines])
|
|
if is_plus_lines or is_minus_lines: # notice 'True' here - we always present __new hunk__ for section, otherwise LLM gets confused
|
|
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__new hunk__\n'
|
|
for i, line_new in enumerate(new_content_lines):
|
|
patch_with_lines_str += f"{start2 + i} {line_new}\n"
|
|
if is_minus_lines:
|
|
patch_with_lines_str = patch_with_lines_str.rstrip() + '\n__old hunk__\n'
|
|
for line_old in old_content_lines:
|
|
patch_with_lines_str += f"{line_old}\n"
|
|
|
|
return patch_with_lines_str.rstrip()
|
|
|
|
|
|
def extract_hunk_lines_from_patch(patch: str, file_name, line_start, line_end, side) -> tuple[str, str]:
|
|
|
|
patch_with_lines_str = f"\n\n## File: '{file_name.strip()}'\n\n"
|
|
selected_lines = ""
|
|
patch_lines = patch.splitlines()
|
|
RE_HUNK_HEADER = re.compile(
|
|
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@[ ]?(.*)")
|
|
match = None
|
|
start1, size1, start2, size2 = -1, -1, -1, -1
|
|
skip_hunk = False
|
|
selected_lines_num = 0
|
|
for line in patch_lines:
|
|
if 'no newline at end of file' in line.lower():
|
|
continue
|
|
|
|
if line.startswith('@@'):
|
|
skip_hunk = False
|
|
selected_lines_num = 0
|
|
header_line = line
|
|
|
|
match = RE_HUNK_HEADER.match(line)
|
|
|
|
section_header, size1, size2, start1, start2 = extract_hunk_headers(match)
|
|
|
|
# check if line range is in this hunk
|
|
if side.lower() == 'left':
|
|
# check if line range is in this hunk
|
|
if not (start1 <= line_start <= start1 + size1):
|
|
skip_hunk = True
|
|
continue
|
|
elif side.lower() == 'right':
|
|
if not (start2 <= line_start <= start2 + size2):
|
|
skip_hunk = True
|
|
continue
|
|
patch_with_lines_str += f'\n{header_line}\n'
|
|
|
|
elif not skip_hunk:
|
|
if side.lower() == 'right' and line_start <= start2 + selected_lines_num <= line_end:
|
|
selected_lines += line + '\n'
|
|
if side.lower() == 'left' and start1 <= selected_lines_num + start1 <= line_end:
|
|
selected_lines += line + '\n'
|
|
patch_with_lines_str += line + '\n'
|
|
if not line.startswith('-'): # currently we don't support /ask line for deleted lines
|
|
selected_lines_num += 1
|
|
|
|
return patch_with_lines_str.rstrip(), selected_lines.rstrip() |