Merge pull request #1644 from qodo-ai/es/help_docs

Adding a new tool: /help_docs
This commit is contained in:
Tal
2025-03-25 08:17:37 +02:00
committed by GitHub
17 changed files with 898 additions and 16 deletions

View File

@ -0,0 +1,25 @@
## Overview
The `help_docs` tool answers a question based on a given relative path of documentation, either from the repository of this merge request or from a given one.
It can be invoked manually by commenting on any PR:
```
/help_docs "..."
```
## Example usage
![help_docs on the documentation of this repository](https://codium.ai/images/pr_agent/help_docs_comment.png){width=512}
![help_docs on the documentation of another repository](https://codium.ai/images/pr_agent/help_docs_comment_explicit_git.png){width=512}
![help_docs response](https://codium.ai/images/pr_agent/help_docs_response.png){width=512}
## Configuration options
Under the section `--pr_help_docs`, the [configuration file](https://github.com/Codium-ai/pr-agent/blob/main/pr_agent/settings/configuration.toml#L50) contains options to customize the 'help docs' tool:
- `repo_url`: If not overwritten, will use the repo from where the context came from (issue or PR), otherwise - use the given repo as context.
- `repo_default_branch`: The branch to use in case repo_url overwritten, otherwise - has no effect.
- `docs_path`: Relative path from root of repository (either the one this PR has been issued for, or above repo url).
- `exclude_root_readme`: Whether or not to exclude the root README file for querying the model.
- `supported_doc_exts` : Which file extensions should be included for the purpose of querying the model.

View File

@ -28,6 +28,7 @@ nav:
- Improve: 'tools/improve.md'
- Ask: 'tools/ask.md'
- Update Changelog: 'tools/update_changelog.md'
- Help Docs: 'tools/help_docs.md'
- Help: 'tools/help.md'
- 💎 Analyze: 'tools/analyze.md'
- 💎 Test: 'tools/test.md'

View File

@ -13,6 +13,7 @@ from pr_agent.tools.pr_code_suggestions import PRCodeSuggestions
from pr_agent.tools.pr_config import PRConfig
from pr_agent.tools.pr_description import PRDescription
from pr_agent.tools.pr_generate_labels import PRGenerateLabels
from pr_agent.tools.pr_help_docs import PRHelpDocs
from pr_agent.tools.pr_help_message import PRHelpMessage
from pr_agent.tools.pr_line_questions import PR_LineQuestions
from pr_agent.tools.pr_questions import PRQuestions
@ -39,6 +40,7 @@ command2class = {
"similar_issue": PRSimilarIssue,
"add_docs": PRAddDocs,
"generate_labels": PRGenerateLabels,
"help_docs": PRHelpDocs,
}
commands = list(command2class.keys())

View File

@ -1,6 +1,7 @@
from threading import Lock
from jinja2 import Environment, StrictUndefined
from math import ceil
from tiktoken import encoding_for_model, get_encoding
from pr_agent.config_loader import get_settings
@ -76,7 +77,35 @@ class TokenHandler:
get_logger().error(f"Error in _get_system_user_tokens: {e}")
return 0
def count_tokens(self, patch: str) -> int:
def calc_claude_tokens(self, patch):
try:
import anthropic
from pr_agent.algo import MAX_TOKENS
client = anthropic.Anthropic(api_key=get_settings(use_context=False).get('anthropic.key'))
MaxTokens = MAX_TOKENS[get_settings().config.model]
# Check if the content size is too large (9MB limit)
if len(patch.encode('utf-8')) > 9_000_000:
get_logger().warning(
"Content too large for Anthropic token counting API, falling back to local tokenizer"
)
return MaxTokens
response = client.messages.count_tokens(
model="claude-3-7-sonnet-20250219",
system="system",
messages=[{
"role": "user",
"content": patch
}],
)
return response.input_tokens
except Exception as e:
get_logger().error( f"Error in Anthropic token counting: {e}")
return MaxTokens
def count_tokens(self, patch: str, force_accurate=False) -> int:
"""
Counts the number of tokens in a given patch string.
@ -86,4 +115,22 @@ class TokenHandler:
Returns:
The number of tokens in the patch string.
"""
return len(self.encoder.encode(patch, disallowed_special=()))
encoder_estimate = len(self.encoder.encode(patch, disallowed_special=()))
if not force_accurate:
return encoder_estimate
#else, need to provide an accurate estimation:
model = get_settings().config.model.lower()
if force_accurate and 'claude' in model and get_settings(use_context=False).get('anthropic.key'):
return self.calc_claude_tokens(patch) # API call to Anthropic for accurate token counting for Claude models
#else: Non Anthropic provided model
import re
model_is_from_o_series = re.match(r"^o[1-9](-mini|-preview)?$", model)
if ('gpt' in get_settings().config.model.lower() or model_is_from_o_series) and get_settings(use_context=False).get('openai.key'):
return encoder_estimate
#else: Model is neither an OpenAI, nor an Anthropic model - therefore, cannot provide an accurate token count and instead, return a higher number as best effort.
elbow_factor = 1 + get_settings().get('config.model_token_count_estimate_factor', 0)
get_logger().warning(f"{model}'s expected token count cannot be accurately estimated. Using {elbow_factor} of encoder output as best effort estimate")
return ceil(elbow_factor * encoder_estimate)

View File

@ -22,6 +22,7 @@ def set_parser():
- cli.py --pr_url=... ask "write me a poem about this PR"
- cli.py --pr_url=... reflect
- cli.py --issue_url=... similar_issue
- cli.py --pr_url/--issue_url= help_docs [<asked question>]
Supported commands:
- review / review_pr - Add a review that includes a summary of the PR and specific suggestions for improvement.
@ -40,6 +41,8 @@ def set_parser():
- add_docs
- generate_labels
- help_docs - Ask a question, from either an issue or PR context, on a given repo (current context or a different one)
Configuration:

View File

@ -28,6 +28,7 @@ global_settings = Dynaconf(
"settings/pr_add_docs.toml",
"settings/custom_labels.toml",
"settings/pr_help_prompts.toml",
"settings/pr_help_docs_prompts.toml",
"settings/.secrets.toml",
"settings_prod/.secrets.toml",
]]

View File

@ -30,12 +30,15 @@ class BitbucketProvider(GitProvider):
):
s = requests.Session()
try:
bearer = context.get("bitbucket_bearer_token", None)
self.bearer_token = bearer = context.get("bitbucket_bearer_token", None)
if not bearer and get_settings().get("BITBUCKET.BEARER_TOKEN", None):
self.bearer_token = bearer = get_settings().get("BITBUCKET.BEARER_TOKEN", None)
s.headers["Authorization"] = f"Bearer {bearer}"
except Exception:
self.bearer_token = get_settings().get("BITBUCKET.BEARER_TOKEN", None)
s.headers[
"Authorization"
] = f'Bearer {get_settings().get("BITBUCKET.BEARER_TOKEN", None)}'
] = f'Bearer {self.bearer_token}'
s.headers["Content-Type"] = "application/json"
self.headers = s.headers
self.bitbucket_client = Cloud(session=s)
@ -67,6 +70,37 @@ class BitbucketProvider(GitProvider):
except Exception:
return ""
def get_git_repo_url(self, pr_url: str=None) -> str: #bitbucket does not support issue url, so ignore param
try:
parsed_url = urlparse(self.pr_url)
return f"{parsed_url.scheme}://{parsed_url.netloc}/{self.workspace_slug}/{self.repo_slug}.git"
except Exception as e:
get_logger().exception(f"url is not a valid merge requests url: {self.pr_url}")
return ""
# Given a git repo url, return prefix and suffix of the provider in order to view a given file belonging to that repo.
# Example: git clone git clone https://bitbucket.org/codiumai/pr-agent.git and branch: main -> prefix: "https://bitbucket.org/codiumai/pr-agent/src/main", suffix: ""
# In case git url is not provided, provider will use PR context (which includes branch) to determine the prefix and suffix.
def get_canonical_url_parts(self, repo_git_url:str=None, desired_branch:str=None) -> Tuple[str, str]:
scheme_and_netloc = None
if repo_git_url:
parsed_git_url = urlparse(repo_git_url)
scheme_and_netloc = parsed_git_url.scheme + "://" + parsed_git_url.netloc
repo_path = parsed_git_url.path.split('.git')[0][1:] #/<workspace>/<repo>.git -> <workspace>/<repo>
if repo_path.count('/') != 1:
get_logger().error(f"repo_git_url is not a valid git repo url: {repo_git_url}")
return ("", "")
workspace_name, project_name = repo_path.split('/')
else:
desired_branch = self.get_pr_branch()
parsed_pr_url = urlparse(self.pr_url)
scheme_and_netloc = parsed_pr_url.scheme + "://" + parsed_pr_url.netloc
workspace_name, project_name = (self.workspace_slug, self.repo_slug)
prefix = f"{scheme_and_netloc}/{workspace_name}/{project_name}/src/{desired_branch}"
suffix = "" #None
return (prefix, suffix)
def publish_code_suggestions(self, code_suggestions: list) -> bool:
"""
Publishes code suggestions as comments on the PR.
@ -457,7 +491,7 @@ class BitbucketProvider(GitProvider):
return True
@staticmethod
def _parse_pr_url(pr_url: str) -> Tuple[str, int]:
def _parse_pr_url(pr_url: str) -> Tuple[str, int, int]:
parsed_url = urlparse(pr_url)
if "bitbucket.org" not in parsed_url.netloc:
@ -559,3 +593,21 @@ class BitbucketProvider(GitProvider):
# bitbucket does not support labels
def get_pr_labels(self, update=False):
pass
#Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if "bitbucket.org" not in repo_url_to_clone:
get_logger().error("Repo URL is not a valid bitbucket URL.")
return None
bearer_token = self.bearer_token
if not bearer_token:
get_logger().error("No bearer token provided. Returning None")
return None
#For example: For repo: https://bitbucket.org/codiumai/pr-agent-tests.git
#clone url will be: https://x-token-auth:<token>@bitbucket.org/codiumai/pr-agent-tests.git
(scheme, base_url) = repo_url_to_clone.split("bitbucket.org")
if not all([scheme, base_url]):
get_logger().error(f"repo_url_to_clone: {repo_url_to_clone} is not a valid bitbucket URL.")
return None
clone_url = f"{scheme}x-token-auth:{bearer_token}@bitbucket.org{base_url}"
return clone_url

View File

@ -7,6 +7,8 @@ from urllib.parse import quote_plus, urlparse
from atlassian.bitbucket import Bitbucket
from requests.exceptions import HTTPError
import shlex
import subprocess
from ..algo.git_patch_processing import decode_if_bytes
from ..algo.language_handler import is_valid_file
@ -34,7 +36,7 @@ class BitbucketServerProvider(GitProvider):
self.incremental = incremental
self.diff_files = None
self.bitbucket_pull_request_api_url = pr_url
self.bearer_token = get_settings().get("BITBUCKET_SERVER.BEARER_TOKEN", None)
self.bitbucket_server_url = self._parse_bitbucket_server(url=pr_url)
self.bitbucket_client = bitbucket_client or Bitbucket(url=self.bitbucket_server_url,
token=get_settings().get("BITBUCKET_SERVER.BEARER_TOKEN",
@ -47,6 +49,35 @@ class BitbucketServerProvider(GitProvider):
if pr_url:
self.set_pr(pr_url)
def get_git_repo_url(self, pr_url: str=None) -> str: #bitbucket server does not support issue url, so ignore param
try:
parsed_url = urlparse(self.pr_url)
return f"{parsed_url.scheme}://{parsed_url.netloc}/scm/{self.workspace_slug.lower()}/{self.repo_slug.lower()}.git"
except Exception as e:
get_logger().exception(f"url is not a valid merge requests url: {self.pr_url}")
return ""
# Given a git repo url, return prefix and suffix of the provider in order to view a given file belonging to that repo.
# Example: https://bitbucket.dev.my_inc.com/scm/my_work/my_repo.git and branch: my_branch -> prefix: "https://bitbucket.dev.my_inc.com/projects/MY_WORK/repos/my_repo/browse/src", suffix: "?at=refs%2Fheads%2Fmy_branch"
# In case git url is not provided, provider will use PR context (which includes branch) to determine the prefix and suffix.
def get_canonical_url_parts(self, repo_git_url:str=None, desired_branch:str=None) -> Tuple[str, str]:
workspace_name = None
project_name = None
if not repo_git_url:
desired_branch = self.get_pr_branch()
workspace_name = self.workspace_slug
project_name = self.repo_slug
elif '.git' in repo_git_url and 'scm/' in repo_git_url:
repo_path = repo_git_url.split('.git')[0].split('scm/')[-1]
if repo_path.count('/') == 1: # Has to have the form <workspace>/<repo>
workspace_name, project_name = repo_path.split('/')
if not workspace_name or not project_name:
get_logger().error(f"workspace_name or project_name not found in context, either git url: {repo_git_url} or uninitialized workspace/project.")
return ("", "")
prefix = f"{self.bitbucket_server_url}/projects/{workspace_name}/repos/{project_name}/browse"
suffix = f"?at=refs%2Fheads%2F{desired_branch}"
return (prefix, suffix)
def get_repo_settings(self):
try:
content = self.bitbucket_client.get_content_of_file(self.workspace_slug, self.repo_slug, ".pr_agent.toml", self.get_pr_branch())
@ -481,3 +512,28 @@ class BitbucketServerProvider(GitProvider):
def _get_merge_base(self):
return f"rest/api/latest/projects/{self.workspace_slug}/repos/{self.repo_slug}/pull-requests/{self.pr_num}/merge-base"
# Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if 'bitbucket.' not in repo_url_to_clone:
get_logger().error("Repo URL is not a valid bitbucket URL.")
return None
bearer_token = self.bearer_token
if not bearer_token:
get_logger().error("No bearer token provided. Returning None")
return None
# Return unmodified URL as the token is passed via HTTP headers in _clone_inner, as seen below.
return repo_url_to_clone
#Overriding the shell command, since for some reason usage of x-token-auth doesn't work, as mentioned here:
# https://stackoverflow.com/questions/56760396/cloning-bitbucket-server-repo-with-access-tokens
def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_seconds: int=None):
bearer_token = self.bearer_token
if not bearer_token:
#Shouldn't happen since this is checked in _prepare_clone, therefore - throwing an exception.
raise RuntimeError(f"Bearer token is required!")
cli_args = shlex.split(f"git clone -c http.extraHeader='Authorization: Bearer {bearer_token}' "
f"--filter=blob:none --depth 1 {repo_url} {dest_folder}")
subprocess.run(cli_args, check=True, # check=True will raise an exception if the command fails
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=operation_timeout_in_seconds)

View File

@ -1,6 +1,9 @@
from abc import ABC, abstractmethod
# enum EDIT_TYPE (ADDED, DELETED, MODIFIED, RENAMED)
from typing import Optional
import os
import shutil
import subprocess
from typing import Optional, Tuple
from pr_agent.algo.types import FilePatchInfo
from pr_agent.algo.utils import Range, process_description
@ -14,6 +17,75 @@ class GitProvider(ABC):
def is_supported(self, capability: str) -> bool:
pass
#Given a url (issues or PR/MR) - get the .git repo url to which they belong. Needs to be implemented by the provider.
def get_git_repo_url(self, issues_or_pr_url: str) -> str:
get_logger().warning("Not implemented! Returning empty url")
return ""
# Given a git repo url, return prefix and suffix of the provider in order to view a given file belonging to that repo. Needs to be implemented by the provider.
# For example: For a git: https://git_provider.com/MY_PROJECT/MY_REPO.git and desired branch: <MY_BRANCH> then it should return ('https://git_provider.com/projects/MY_PROJECT/repos/MY_REPO/.../<MY_BRANCH>', '?=<SOME HEADER>')
# so that to properly view the file: docs/readme.md -> <PREFIX>/docs/readme.md<SUFFIX> -> https://git_provider.com/projects/MY_PROJECT/repos/MY_REPO/<MY_BRANCH>/docs/readme.md?=<SOME HEADER>)
def get_canonical_url_parts(self, repo_git_url:str, desired_branch:str) -> Tuple[str, str]:
get_logger().warning("Not implemented! Returning empty prefix and suffix")
return ("", "")
#Clone related API
#An object which ensures deletion of a cloned repo, once it becomes out of scope.
# Example usage:
# with TemporaryDirectory() as tmp_dir:
# returned_obj: GitProvider.ScopedClonedRepo = self.git_provider.clone(self.repo_url, tmp_dir, remove_dest_folder=False)
# print(returned_obj.path) #Use returned_obj.path.
# #From this point, returned_obj.path may be deleted at any point and therefore must not be used.
class ScopedClonedRepo(object):
def __init__(self, dest_folder):
self.path = dest_folder
def __del__(self):
if self.path and os.path.exists(self.path):
shutil.rmtree(self.path, ignore_errors=True)
#Method to allow implementors to manipulate the repo url to clone (such as embedding tokens in the url string). Needs to be implemented by the provider.
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
get_logger().warning("Not implemented! Returning None")
return None
# Does a shallow clone, using a forked process to support a timeout guard.
# In case operation has failed, it is expected to throw an exception as this method does not return a value.
def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_seconds: int=None) -> None:
#The following ought to be equivalent to:
# #Repo.clone_from(repo_url, dest_folder)
# , but with throwing an exception upon timeout.
# Note: This can only be used in context that supports using pipes.
subprocess.run([
"git", "clone",
"--filter=blob:none",
"--depth", "1",
repo_url, dest_folder
], check=True, # check=True will raise an exception if the command fails
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=operation_timeout_in_seconds)
CLONE_TIMEOUT_SEC = 20
# Clone a given url to a destination folder. If successful, returns an object that wraps the destination folder,
# deleting it once it is garbage collected. See: GitProvider.ScopedClonedRepo for more details.
def clone(self, repo_url_to_clone: str, dest_folder: str, remove_dest_folder: bool = True,
operation_timeout_in_seconds: int=CLONE_TIMEOUT_SEC) -> ScopedClonedRepo|None:
returned_obj = None
clone_url = self._prepare_clone_url_with_token(repo_url_to_clone)
if not clone_url:
get_logger().error("Clone failed: Unable to obtain url to clone.")
return returned_obj
try:
if remove_dest_folder and os.path.exists(dest_folder) and os.path.isdir(dest_folder):
shutil.rmtree(dest_folder)
self._clone_inner(clone_url, dest_folder, operation_timeout_in_seconds)
returned_obj = GitProvider.ScopedClonedRepo(dest_folder)
except Exception as e:
get_logger().exception(f"Clone failed: Could not clone url.",
artifact={"error": str(e), "url": clone_url, "dest_folder": dest_folder})
finally:
return returned_obj
@abstractmethod
def get_files(self) -> list:
pass

View File

@ -63,6 +63,60 @@ class GithubProvider(GitProvider):
def is_supported(self, capability: str) -> bool:
return True
def _get_owner_and_repo_path(self, given_url: str) -> str:
try:
repo_path = None
if 'issues' in given_url:
repo_path, _ = self._parse_issue_url(given_url)
elif 'pull' in given_url:
repo_path, _ = self._parse_pr_url(given_url)
elif given_url.endswith('.git'):
parsed_url = urlparse(given_url)
repo_path = (parsed_url.path.split('.git')[0])[1:] # /<owner>/<repo>.git -> <owner>/<repo>
if not repo_path:
get_logger().error(f"url is neither an issues url nor a pr url nor a valid git url: {given_url}. Returning empty result.")
return ""
return repo_path
except Exception as e:
get_logger().exception(f"unable to parse url: {given_url}. Returning empty result.")
return ""
def get_git_repo_url(self, issues_or_pr_url: str) -> str:
repo_path = self._get_owner_and_repo_path(issues_or_pr_url)
if not repo_path or repo_path not in issues_or_pr_url:
get_logger().error(f"Unable to retrieve owner/path from url: {issues_or_pr_url}")
return ""
return f"{issues_or_pr_url.split(repo_path)[0]}{repo_path}.git"
# Given a git repo url, return prefix and suffix of the provider in order to view a given file belonging to that repo.
# Example: https://github.com/qodo-ai/pr-agent.git and branch: v0.8 -> prefix: "https://github.com/qodo-ai/pr-agent/blob/v0.8", suffix: ""
# In case git url is not provided, provider will use PR context (which includes branch) to determine the prefix and suffix.
def get_canonical_url_parts(self, repo_git_url:str, desired_branch:str) -> Tuple[str, str]:
owner = None
repo = None
scheme_and_netloc = None
if repo_git_url: #If user provided an external git url, which may be different than what this provider was initialized with, we cannot use self.repo
repo_path = self._get_owner_and_repo_path(repo_git_url)
parsed_git_url = urlparse(repo_git_url)
scheme_and_netloc = parsed_git_url.scheme + "://" + parsed_git_url.netloc
if repo_path.count('/') == 1: #Has to have the form <owner>/<repo>
owner, repo = repo_path.split('/')
else:
get_logger().error(f"Invalid repo_path: {repo_path} from repo_git_url: {repo_git_url}")
return ("", "")
if (not owner or not repo) and self.repo: #"else" - User did not provide an external git url, use self.repo object:
owner, repo = self.repo.split('/')
scheme_and_netloc = self.base_url_html
desired_branch = self.get_pr_branch()
if not any([scheme_and_netloc, owner, repo]): #"else": Not invoked from a PR context,but no provided git url for context
get_logger().error(f"Unable to get canonical url parts since missing context (PR or explicit git url)")
return ("", "")
prefix = f"{scheme_and_netloc}/{owner}/{repo}/blob/{desired_branch}"
suffix = "" # github does not add a suffix
return (prefix, suffix)
def get_pr_url(self) -> str:
return self.pr.html_url
@ -703,9 +757,9 @@ class GithubProvider(GitProvider):
return repo_name, issue_number
def _get_github_client(self):
deployment_type = get_settings().get("GITHUB.DEPLOYMENT_TYPE", "user")
if deployment_type == 'app':
self.deployment_type = get_settings().get("GITHUB.DEPLOYMENT_TYPE", "user")
self.auth = None
if self.deployment_type == 'app':
try:
private_key = get_settings().github.private_key
app_id = get_settings().github.app_id
@ -715,16 +769,19 @@ class GithubProvider(GitProvider):
raise ValueError("GitHub app installation ID is required when using GitHub app deployment")
auth = AppAuthentication(app_id=app_id, private_key=private_key,
installation_id=self.installation_id)
return Github(app_auth=auth, base_url=self.base_url)
if deployment_type == 'user':
self.auth = auth
elif self.deployment_type == 'user':
try:
token = get_settings().github.user_token
except AttributeError as e:
raise ValueError(
"GitHub token is required when using user deployment. See: "
"https://github.com/Codium-ai/pr-agent#method-2-run-from-source") from e
return Github(auth=Auth.Token(token), base_url=self.base_url)
self.auth = Auth.Token(token)
if self.auth:
return Github(auth=self.auth, base_url=self.base_url)
else:
raise ValueError("Could not authenticate to GitHub")
def _get_repo(self):
if hasattr(self, 'repo_obj') and \
@ -1064,3 +1121,37 @@ class GithubProvider(GitProvider):
get_logger().error(f"Failed to process patch for committable comment, error: {e}")
return code_suggestions_copy
#Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
scheme = "https://"
#For example, to clone:
#https://github.com/Codium-ai/pr-agent-pro.git
#Need to embed inside the github token:
#https://<token>@github.com/Codium-ai/pr-agent-pro.git
github_token = self.auth.token
github_base_url = self.base_url_html
if not all([github_token, github_base_url]):
get_logger().error("Either missing auth token or missing base url")
return None
if scheme not in github_base_url:
get_logger().error(f"Base url: {github_base_url} is missing prefix: {scheme}")
return None
github_com = github_base_url.split(scheme)[1] # e.g. 'github.com' or github.<org>.com
if not github_com:
get_logger().error(f"Base url: {github_base_url} has an empty base url")
return None
if github_com not in repo_url_to_clone:
get_logger().error(f"url to clone: {repo_url_to_clone} does not contain {github_com}")
return None
repo_full_name = repo_url_to_clone.split(github_com)[-1]
if not repo_full_name:
get_logger().error(f"url to clone: {repo_url_to_clone} is malformed")
return None
clone_url = scheme
if self.deployment_type == 'app':
clone_url += "git:"
clone_url += f"{github_token}@{github_com}{repo_full_name}"
return clone_url

View File

@ -57,6 +57,43 @@ class GitLabProvider(GitProvider):
return False
return True
def _get_project_path_from_pr_or_issue_url(self, pr_or_issue_url: str) -> str:
repo_project_path = None
if 'issues' in pr_or_issue_url:
#replace 'issues' with 'merge_requests', since gitlab provider does not support issue urls, just to get the git repo url:
pr_or_issue_url = pr_or_issue_url.replace('issues', 'merge_requests')
if 'merge_requests' in pr_or_issue_url:
repo_project_path, _ = self._parse_merge_request_url(pr_or_issue_url)
if not repo_project_path:
get_logger().error(f"url is not a valid merge requests url: {pr_or_issue_url}")
return ""
return repo_project_path
def get_git_repo_url(self, issues_or_pr_url: str) -> str:
provider_url = issues_or_pr_url
repo_path = self._get_project_path_from_pr_or_issue_url(provider_url)
if not repo_path or repo_path not in issues_or_pr_url:
get_logger().error(f"Unable to retrieve project path from url: {issues_or_pr_url}")
return ""
return f"{issues_or_pr_url.split(repo_path)[0]}{repo_path}.git"
# Given a git repo url, return prefix and suffix of the provider in order to view a given file belonging to that repo.
# Example: https://gitlab.com/codiumai/pr-agent.git and branch: t1 -> prefix: "https://gitlab.com/codiumai/pr-agent/-/blob/t1", suffix: "?ref_type=heads"
# In case git url is not provided, provider will use PR context (which includes branch) to determine the prefix and suffix.
def get_canonical_url_parts(self, repo_git_url:str=None, desired_branch:str=None) -> Tuple[str, str]:
repo_path = ""
if not repo_git_url and not self.pr_url:
get_logger().error("Cannot get canonical URL parts: missing either context PR URL or a repo GIT URL")
return ("", "")
if not repo_git_url: #Use PR url as context
repo_path = self._get_project_path_from_pr_or_issue_url(self.pr_url)
desired_branch = self.get_pr_branch()
else: #Use repo git url
repo_path = repo_git_url.split('.git')[0].split('.com/')[-1]
prefix = f"{self.gitlab_url}/{repo_path}/-/blob/{desired_branch}"
suffix = "?ref_type=heads" # gitlab cloud adds this suffix. gitlab server does not, but it is harmless.
return (prefix, suffix)
@property
def pr(self):
'''The GitLab terminology is merge request (MR) instead of pull request (PR)'''
@ -597,3 +634,24 @@ class GitLabProvider(GitProvider):
get_logger().info(f"Failed adding line link, error: {e}")
return ""
#Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if "gitlab." not in repo_url_to_clone:
get_logger().error(f"Repo URL: {repo_url_to_clone} is not a valid gitlab URL.")
return None
(scheme, base_url) = repo_url_to_clone.split("gitlab.")
access_token = self.gl.oauth_token
if not all([scheme, access_token, base_url]):
get_logger().error(f"Either no access token found, or repo URL: {repo_url_to_clone} "
f"is missing prefix: {scheme} and/or base URL: {base_url}.")
return None
#Note that the ""official"" method found here:
# https://docs.gitlab.com/user/profile/personal_access_tokens/#clone-repository-using-personal-access-token
# requires a username, which may not be applicable.
# The following solution is taken from: https://stackoverflow.com/questions/25409700/using-gitlab-token-to-clone-without-authentication/35003812#35003812
# For example: For repo url: https://gitlab.codium-inc.com/qodo/autoscraper.git
# Then to clone one will issue: 'git clone https://oauth2:<access token>@gitlab.codium-inc.com/qodo/autoscraper.git'
clone_url = f"{scheme}oauth2:{access_token}@gitlab.{base_url}"
return clone_url

View File

@ -6,6 +6,7 @@ class HelpMessage:
"> - **/improve [--extended]**: Suggest code improvements. Extended mode provides a higher quality feedback. \n" \
"> - **/ask \\<QUESTION\\>**: Ask a question about the PR. \n" \
"> - **/update_changelog**: Update the changelog based on the PR's contents. \n" \
"> - **/help_docs \\<QUESTION\\>**: Given a path to documentation (either for this repository or for a given one), ask a question. \n" \
"> - **/add_docs** 💎: Generate docstring for new components introduced in the PR. \n" \
"> - **/generate_labels** 💎: Generate labels for the PR based on the PR's contents. \n" \
"> - **/analyze** 💎: Automatically analyzes the PR, and presents changes walkthrough for each component. \n\n" \
@ -201,3 +202,17 @@ some_config2=...
output += f"\n\nSee the improve [usage page](https://pr-agent-docs.codium.ai/tools/improve/) for a comprehensive guide on using this tool.\n\n"
return output
@staticmethod
def get_help_docs_usage_guide():
output = "**Overview:**\n"
output += """\
The help docs tool, named `help_docs`, answers a question based on a given relative path of documentation, either from the repository of this merge request or from a given one."
It can be invoked manually by commenting on any PR:
```
/help_docs "..."
```
"""
output += f"\n\nSee the [help_docs usage](https://pr-agent-docs.codium.ai/tools/help_docs/) page for a comprehensive guide on using this tool.\n\n"
return output

View File

@ -9,6 +9,7 @@
model="o3-mini"
fallback_models=["gpt-4o-2024-11-20"]
#model_weak="gpt-4o-mini-2024-07-18" # optional, a weaker model to use for some easier tasks
model_token_count_estimate_factor=0.3 # factor to increase the token count estimate, in order to reduce likelihood of model failure due to too many tokens.
# CLI
git_provider="github"
publish_output=true
@ -212,6 +213,14 @@ num_retrieved_snippets=5
[pr_config] # /config #
[pr_help_docs]
repo_url = "" #If not overwritten, will use the repo from where the context came from (issue or PR)
repo_default_branch = "main"
docs_path = "docs"
exclude_root_readme = false
supported_doc_exts = [".md", ".mdx", ".rst"]
enable_help_text=false
[github]
# The type of deployment to create. Valid values are 'app' or 'user'.
deployment_type = "user"

View File

@ -0,0 +1,77 @@
[pr_help_docs_prompts]
system="""You are Doc-helper, a language model designed to answer questions about a documentation website for a given repository.
You will receive a question, a repository url and the full documentation content for that repository (either as markdown or as restructred text).
Your goal is to provide the best answer to the question using the documentation provided.
Additional instructions:
- Be short and concise in your answers. Give examples if needed.
- Answer only questions that are related to the documentation website content. If the question is completely unrelated to the documentation, return an empty response.
The output must be a YAML object equivalent to type $DocHelper, according to the following Pydantic definitions:
=====
class relevant_section(BaseModel):
file_name: str = Field(description="The name of the relevant file")
relevant_section_header_string: str = Field(description="The exact text of the relevant markdown/restructured text section heading from the relevant file (starting with '#', '##', etc.). Return empty string if the entire file is the relevant section, or if the relevant section has no heading")
class DocHelper(BaseModel):
user_question: str = Field(description="The user's question")
response: str = Field(description="The response to the user's question")
relevant_sections: List[relevant_section] = Field(description="A list of the relevant markdown/restructured text sections in the documentation that answer the user's question, ordered by importance (most relevant first)")
question_is_relevant: int = Field(description="Return 1 if the question is somewhat relevant to documentation. 0 - otherwise")
=====
Example output:
```yaml
user_question: |
...
response: |
...
relevant_sections:
- file_name: "src/file1.py"
relevant_section_header_string: |
...
- ...
question_is_relevant: |
1
"""
user="""\
Documentation url: '{{ docs_url| trim }}'
-----
User's Question:
=====
{{ question|trim }}
=====
Documentation website content:
=====
{{ snippets|trim }}
=====
Reminder: The output must be a YAML object equivalent to type $DocHelper, similar to the following example output:
=====
Example output:
```yaml
user_question: |
...
response: |
...
relevant_sections:
- file_name: "src/file1.py"
relevant_section_header_string: |
...
- ...
question_is_relevant: |
1
=====
Response (should be a valid YAML, and nothing else).
```yaml
"""

View File

@ -0,0 +1,369 @@
import copy
from functools import partial
from jinja2 import Environment, StrictUndefined
import math
import os
import re
from tempfile import TemporaryDirectory
from typing import Dict, List, Optional, Tuple
from pr_agent.algo import MAX_TOKENS
from pr_agent.algo.ai_handlers.base_ai_handler import BaseAiHandler
from pr_agent.algo.ai_handlers.litellm_ai_handler import LiteLLMAIHandler
from pr_agent.algo.pr_processing import retry_with_fallback_models
from pr_agent.algo.token_handler import TokenHandler
from pr_agent.algo.utils import clip_tokens, get_max_tokens, load_yaml, ModelType
from pr_agent.config_loader import get_settings
from pr_agent.git_providers import get_git_provider_with_context
from pr_agent.log import get_logger
from pr_agent.servers.help import HelpMessage
#Common code that can be called from similar tools:
def modify_answer_section(ai_response: str) -> str | None:
# Gets the model's answer and relevant sources section, repacing the heading of the answer section with:
# :bulb: Auto-generated documentation-based answer:
"""
For example: The following input:
### Question: \nThe following general issue was asked by a user: Title: How does one request to re-review a PR? More Info: I cannot seem to find to do this.
### Answer:\nAccording to the documentation, one needs to invoke the command: /review
#### Relevant Sources...
Should become:
### :bulb: Auto-generated documentation-based answer:\n
According to the documentation, one needs to invoke the command: /review
#### Relevant Sources...
"""
model_answer_and_relevant_sections_in_response \
= extract_model_answer_and_relevant_sources(ai_response)
if model_answer_and_relevant_sections_in_response is not None:
cleaned_question_with_answer = "### :bulb: Auto-generated documentation-based answer:\n"
cleaned_question_with_answer += model_answer_and_relevant_sections_in_response
return cleaned_question_with_answer
get_logger().warning(f"Either no answer section found, or that section is malformed: {ai_response}")
return None
def extract_model_answer_and_relevant_sources(ai_response: str) -> str | None:
# It is assumed that the input contains several sections with leading "### ",
# where the answer is the last one of them having the format: "### Answer:\n"), since the model returns the answer
# AFTER the user question. By splitting using the string: "### Answer:\n" and grabbing the last part,
# the model answer is guaranteed to be in that last part, provided it is followed by a "#### Relevant Sources:\n\n".
# (for more details, see here: https://github.com/Codium-ai/pr-agent-pro/blob/main/pr_agent/tools/pr_help_message.py#L173)
"""
For example:
### Question: \nHow does one request to re-review a PR?\n\n
### Answer:\nAccording to the documentation, one needs to invoke the command: /review\n\n
#### Relevant Sources:\n\n...
The answer part is: "According to the documentation, one needs to invoke the command: /review\n\n"
followed by "Relevant Sources:\n\n".
"""
if "### Answer:\n" in ai_response:
model_answer_and_relevant_sources_sections_in_response = ai_response.split("### Answer:\n")[-1]
# Split such part by "Relevant Sources" section to contain only the model answer:
if "#### Relevant Sources:\n\n" in model_answer_and_relevant_sources_sections_in_response:
model_answer_section_in_response \
= model_answer_and_relevant_sources_sections_in_response.split("#### Relevant Sources:\n\n")[0]
get_logger().info(f"Found model answer: {model_answer_section_in_response}")
return model_answer_and_relevant_sources_sections_in_response \
if len(model_answer_section_in_response) > 0 else None
get_logger().warning(f"Either no answer section found, or that section is malformed: {ai_response}")
return None
def get_maximal_text_input_length_for_token_count_estimation():
model = get_settings().config.model
if 'claude-3-7-sonnet' in model.lower():
return 900000 #Claude API for token estimation allows maximal text input of 900K chars
return math.inf #Otherwise, no known limitation on input text just for token estimation
# Load documentation files to memory, decorating them with a header to mark where each file begins,
# as to help the LLM to give a better answer.
def aggregate_documentation_files_for_prompt_contents(base_path: str, doc_files: List[str]) -> Optional[str]:
docs_prompt = ""
for file in doc_files:
try:
with open(file, 'r', encoding='utf-8') as f:
content = f.read()
# Skip files with no text content
if not re.search(r'[a-zA-Z]', content):
continue
file_path = str(file).replace(str(base_path), '')
docs_prompt += f"\n==file name==\n\n{file_path}\n\n==file content==\n\n{content.strip()}\n=========\n\n"
except Exception as e:
get_logger().warning(f"Error while reading the file {file}: {e}")
continue
if not docs_prompt:
get_logger().error("Couldn't find any usable documentation files. Returning None.")
return None
return docs_prompt
def format_markdown_q_and_a_response(question_str: str, response_str: str, relevant_sections: List[Dict[str, str]],
supported_suffixes: List[str], base_url_prefix: str, base_url_suffix: str="") -> str:
answer_str = ""
answer_str += f"### Question: \n{question_str}\n\n"
answer_str += f"### Answer:\n{response_str.strip()}\n\n"
answer_str += f"#### Relevant Sources:\n\n"
for section in relevant_sections:
file = section.get('file_name').strip()
ext = [suffix for suffix in supported_suffixes if file.endswith(suffix)]
if not ext:
get_logger().warning(f"Unsupported file extension: {file}")
continue
if str(section['relevant_section_header_string']).strip():
markdown_header = format_markdown_header(section['relevant_section_header_string'])
if base_url_prefix:
answer_str += f"> - {base_url_prefix}{file}{base_url_suffix}#{markdown_header}\n"
else:
answer_str += f"> - {base_url_prefix}{file}{base_url_suffix}\n"
return answer_str
def format_markdown_header(header: str) -> str:
try:
# First, strip common characters from both ends
cleaned = header.strip('# 💎\n')
# Define all characters to be removed/replaced in a single pass
replacements = {
"'": '',
"`": '',
'(': '',
')': '',
',': '',
'.': '',
'?': '',
'!': '',
' ': '-'
}
# Compile regex pattern for characters to remove
pattern = re.compile('|'.join(map(re.escape, replacements.keys())))
# Perform replacements in a single pass and convert to lowercase
return pattern.sub(lambda m: replacements[m.group()], cleaned).lower()
except Exception:
get_logger().exception(f"Error while formatting markdown header", artifacts={'header': header})
return ""
def clean_markdown_content(content: str) -> str:
"""
Remove hidden comments and unnecessary elements from markdown content to reduce size.
Args:
content: The original markdown content
Returns:
Cleaned markdown content
"""
# Remove HTML comments
content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
# Remove frontmatter (YAML between --- or +++ delimiters)
content = re.sub(r'^---\s*\n.*?\n---\s*\n', '', content, flags=re.DOTALL)
content = re.sub(r'^\+\+\+\s*\n.*?\n\+\+\+\s*\n', '', content, flags=re.DOTALL)
# Remove excessive blank lines (more than 2 consecutive)
content = re.sub(r'\n{3,}', '\n\n', content)
# Remove HTML tags that are often used for styling only
content = re.sub(r'<div.*?>|</div>|<span.*?>|</span>', '', content, flags=re.DOTALL)
# Remove image alt text which can be verbose
content = re.sub(r'!\[(.*?)\]', '![]', content)
# Remove images completely
content = re.sub(r'!\[.*?\]\(.*?\)', '', content)
# Remove simple HTML tags but preserve content between them
content = re.sub(r'<(?!table|tr|td|th|thead|tbody)([a-zA-Z][a-zA-Z0-9]*)[^>]*>(.*?)</\1>',
r'\2', content, flags=re.DOTALL)
return content.strip()
class PredictionPreparator:
def __init__(self, ai_handler, vars, system_prompt, user_prompt):
self.ai_handler = ai_handler
variables = copy.deepcopy(vars)
environment = Environment(undefined=StrictUndefined)
self.system_prompt = environment.from_string(system_prompt).render(variables)
self.user_prompt = environment.from_string(user_prompt).render(variables)
async def __call__(self, model: str) -> str:
try:
response, finish_reason = await self.ai_handler.chat_completion(
model=model, temperature=get_settings().config.temperature, system=self.system_prompt, user=self.user_prompt)
return response
except Exception as e:
get_logger().error(f"Error while preparing prediction: {e}")
return ""
class PRHelpDocs(object):
def __init__(self, ctx_url, ai_handler:partial[BaseAiHandler,] = LiteLLMAIHandler, args: Tuple[str]=None, return_as_string: bool=False):
self.ctx_url = ctx_url
self.question = args[0] if args else None
self.return_as_string = return_as_string
self.repo_url_given_explicitly = True
self.repo_url = get_settings().get('PR_HELP_DOCS.REPO_URL', '')
self.repo_desired_branch = get_settings().get('PR_HELP_DOCS.REPO_DEFAULT_BRANCH', 'main') #Ignored if self.repo_url is empty
self.include_root_readme_file = not(get_settings()['PR_HELP_DOCS.EXCLUDE_ROOT_README'])
self.supported_doc_exts = get_settings()['PR_HELP_DOCS.SUPPORTED_DOC_EXTS']
self.docs_path = get_settings()['PR_HELP_DOCS.DOCS_PATH']
retrieved_settings = [self.include_root_readme_file, self.supported_doc_exts, self.docs_path]
if any([setting is None for setting in retrieved_settings]):
raise Exception(f"One of the settings is invalid: {retrieved_settings}")
self.git_provider = get_git_provider_with_context(ctx_url)
if not self.git_provider:
raise Exception(f"No git provider found at {ctx_url}")
if not self.repo_url:
self.repo_url_given_explicitly = False
get_logger().debug(f"No explicit repo url provided, deducing it from type: {self.git_provider.__class__.__name__} "
f"context url: {self.ctx_url}")
self.repo_url = self.git_provider.get_git_repo_url(self.ctx_url)
if not self.repo_url:
raise Exception(f"Unable to deduce repo url from type: {self.git_provider.__class__.__name__} url: {self.ctx_url}")
get_logger().debug(f"deduced repo url: {self.repo_url}")
self.repo_desired_branch = None #Inferred from the repo provider.
self.ai_handler = ai_handler()
self.vars = {
"docs_url": self.repo_url,
"question": self.question,
"snippets": "",
}
self.token_handler = TokenHandler(None,
self.vars,
get_settings().pr_help_docs_prompts.system,
get_settings().pr_help_docs_prompts.user)
async def run(self):
if not self.question:
get_logger().warning('No question provided. Will do nothing.')
return None
try:
# Clone the repository and gather relevant documentation files.
docs_prompt = None
with TemporaryDirectory() as tmp_dir:
get_logger().debug(f"About to clone repository: {self.repo_url} to temporary directory: {tmp_dir}...")
returned_cloned_repo_root = self.git_provider.clone(self.repo_url, tmp_dir, remove_dest_folder=False)
if not returned_cloned_repo_root:
raise Exception(f"Failed to clone {self.repo_url} to {tmp_dir}")
get_logger().debug(f"About to gather relevant documentation files...")
doc_files = []
if self.include_root_readme_file:
for root, _, files in os.walk(returned_cloned_repo_root.path):
# Only look at files in the root directory, not subdirectories
if root == returned_cloned_repo_root.path:
for file in files:
if file.lower().startswith("readme."):
doc_files.append(os.path.join(root, file))
abs_docs_path = os.path.join(returned_cloned_repo_root.path, self.docs_path)
if os.path.exists(abs_docs_path):
doc_files.extend(self._find_all_document_files_matching_exts(abs_docs_path,
ignore_readme=(self.docs_path=='.')))
if not doc_files:
get_logger().warning(f"No documentation files found matching file extensions: "
f"{self.supported_doc_exts} under repo: {self.repo_url} path: {self.docs_path}")
return None
get_logger().info(f'Answering a question inside context {self.ctx_url} for repo: {self.repo_url}'
f' using the following documentation files: ', artifacts={'doc_files': doc_files})
docs_prompt = aggregate_documentation_files_for_prompt_contents(returned_cloned_repo_root.path, doc_files)
if not docs_prompt:
get_logger().warning(f"Error reading one of the documentation files. Returning with no result...")
return None
docs_prompt_to_send_to_model = docs_prompt
# Estimate how many tokens will be needed. Trim in case of exceeding limit.
# Firstly, check if text needs to be trimmed, as some models fail to return the estimated token count if the input text is too long.
max_allowed_txt_input = get_maximal_text_input_length_for_token_count_estimation()
if len(docs_prompt_to_send_to_model) >= max_allowed_txt_input:
get_logger().warning(f"Text length: {len(docs_prompt_to_send_to_model)} exceeds the current returned limit of {max_allowed_txt_input} just for token count estimation. Trimming the text...")
docs_prompt_to_send_to_model = docs_prompt_to_send_to_model[:max_allowed_txt_input]
# Then, count the tokens in the prompt. If the count exceeds the limit, trim the text.
token_count = self.token_handler.count_tokens(docs_prompt_to_send_to_model, force_accurate=True)
get_logger().debug(f"Estimated token count of documentation to send to model: {token_count}")
model = get_settings().config.model
if model in MAX_TOKENS:
max_tokens_full = MAX_TOKENS[model] # note - here we take the actual max tokens, without any reductions. we do aim to get the full documentation website in the prompt
else:
max_tokens_full = get_max_tokens(model)
delta_output = 5000 #Elbow room to reduce chance of exceeding token limit or model paying less attention to prompt guidelines.
if token_count > max_tokens_full - delta_output:
docs_prompt_to_send_to_model = clean_markdown_content(docs_prompt_to_send_to_model) #Reduce unnecessary text/images/etc.
get_logger().info(f"Token count {token_count} exceeds the limit {max_tokens_full - delta_output}. Attempting to clip text to fit within the limit...")
docs_prompt_to_send_to_model = clip_tokens(docs_prompt_to_send_to_model, max_tokens_full - delta_output,
num_input_tokens=token_count)
self.vars['snippets'] = docs_prompt_to_send_to_model.strip()
# Run the AI model and extract sections from its response
response = await retry_with_fallback_models(PredictionPreparator(self.ai_handler, self.vars,
get_settings().pr_help_docs_prompts.system,
get_settings().pr_help_docs_prompts.user),
model_type=ModelType.REGULAR)
response_yaml = load_yaml(response)
if not response_yaml:
get_logger().exception("Failed to parse the AI response.", artifacts={'response': response})
raise Exception(f"Failed to parse the AI response.")
response_str = response_yaml.get('response')
relevant_sections = response_yaml.get('relevant_sections')
if not response_str or not relevant_sections:
get_logger().exception("Failed to extract response/relevant sections.",
artifacts={'response_str': response_str, 'relevant_sections': relevant_sections})
raise Exception(f"Failed to extract response/relevant sections.")
# Format the response as markdown
canonical_url_prefix, canonical_url_suffix = self.git_provider.get_canonical_url_parts(repo_git_url=self.repo_url if self.repo_url_given_explicitly else None,
desired_branch=self.repo_desired_branch)
answer_str = format_markdown_q_and_a_response(self.question, response_str, relevant_sections, self.supported_doc_exts, canonical_url_prefix, canonical_url_suffix)
if answer_str:
#Remove the question phrase and replace with light bulb and a heading mentioning this is an automated answer:
answer_str = modify_answer_section(answer_str)
# For PR help docs, we return the answer string instead of publishing it
if answer_str and self.return_as_string:
if int(response_yaml.get('question_is_relevant', '1')) == 0:
get_logger().warning(f"Chat help docs answer would be ignored due to an invalid question.",
artifacts={'answer_str': answer_str})
return ""
get_logger().info(f"Chat help docs answer", artifacts={'answer_str': answer_str})
return answer_str
# Publish the answer
if not answer_str or int(response_yaml.get('question_is_relevant', '1')) == 0:
get_logger().info(f"No answer found")
return ""
if self.git_provider.is_supported("gfm_markdown") and get_settings().pr_help_docs.enable_help_text:
answer_str += "<hr>\n\n<details> <summary><strong>💡 Tool usage guide:</strong></summary><hr> \n\n"
answer_str += HelpMessage.get_help_docs_usage_guide()
answer_str += "\n</details>\n"
if get_settings().config.publish_output:
self.git_provider.publish_comment(answer_str)
else:
get_logger().info("Answer:", artifacts={'answer_str': answer_str})
except:
get_logger().exception('failed to provide answer to given user question as a result of a thrown exception (see above)')
def _find_all_document_files_matching_exts(self, abs_docs_path: str, ignore_readme=False) -> List[str]:
matching_files = []
# Ensure extensions don't have leading dots and are lowercase
dotless_extensions = [ext.lower().lstrip('.') for ext in self.supported_doc_exts]
# Walk through directory and subdirectories
for root, _, files in os.walk(abs_docs_path):
for file in files:
if ignore_readme and root == abs_docs_path and file.lower() in [f"readme.{ext}" for ext in dotless_extensions]:
continue
# Check if file has one of the specified extensions
if any(file.lower().endswith(f'.{ext}') for ext in dotless_extensions):
matching_files.append(os.path.join(root, file))
return matching_files

View File

@ -35,7 +35,6 @@ class PRHelpMessage:
self.ai_handler = ai_handler()
self.question_str = self.parse_args(args)
self.return_as_string = return_as_string
self.num_retrieved_snippets = get_settings().get('pr_help.num_retrieved_snippets', 5)
if self.question_str:
self.vars = {
"question": self.question_str,
@ -209,6 +208,7 @@ class PRHelpMessage:
tool_names.append(f"[REVIEW]({base_path}/review/)")
tool_names.append(f"[IMPROVE]({base_path}/improve/)")
tool_names.append(f"[UPDATE CHANGELOG]({base_path}/update_changelog/)")
tool_names.append(f"[HELP DOCS]({base_path}/help_docs/)")
tool_names.append(f"[ADD DOCS]({base_path}/documentation/) 💎")
tool_names.append(f"[TEST]({base_path}/test/) 💎")
tool_names.append(f"[IMPROVE COMPONENT]({base_path}/improve_component/) 💎")
@ -224,6 +224,7 @@ class PRHelpMessage:
descriptions.append("Adjustable feedback about the PR, possible issues, security concerns, review effort and more")
descriptions.append("Code suggestions for improving the PR")
descriptions.append("Automatically updates the changelog")
descriptions.append("Answers a question regarding this repository, or a given one, based on given documentation path")
descriptions.append("Generates documentation to methods/functions/classes that changed in the PR")
descriptions.append("Generates unit tests for a specific component, based on the PR code change")
descriptions.append("Code suggestions for a specific component that changed in the PR")
@ -240,6 +241,7 @@ class PRHelpMessage:
commands.append("`/review`")
commands.append("`/improve`")
commands.append("`/update_changelog`")
commands.append("`/help_docs`")
commands.append("`/add_docs`")
commands.append("`/test`")
commands.append("`/improve_component`")
@ -255,6 +257,7 @@ class PRHelpMessage:
checkbox_list.append(" - [ ] Run <!-- /review -->")
checkbox_list.append(" - [ ] Run <!-- /improve -->")
checkbox_list.append(" - [ ] Run <!-- /update_changelog -->")
checkbox_list.append(" - [ ] Run <!-- /help_docs -->")
checkbox_list.append(" - [ ] Run <!-- /add_docs -->")
checkbox_list.append(" - [ ] Run <!-- /test -->")
checkbox_list.append(" - [ ] Run <!-- /improve_component -->")

View File

@ -1,5 +1,6 @@
aiohttp==3.9.5
anthropic[vertex]==0.47.1
anthropic>=0.48
#anthropic[vertex]==0.47.1
atlassian-python-api==3.41.4
azure-devops==7.1.0b3
azure-identity==1.15.0