diff --git a/pr_agent/git_providers/__init__.py b/pr_agent/git_providers/__init__.py index 16547d90..51c6f624 100644 --- a/pr_agent/git_providers/__init__.py +++ b/pr_agent/git_providers/__init__.py @@ -11,6 +11,7 @@ from pr_agent.git_providers.git_provider import GitProvider from pr_agent.git_providers.github_provider import GithubProvider from pr_agent.git_providers.gitlab_provider import GitLabProvider from pr_agent.git_providers.local_git_provider import LocalGitProvider +from pr_agent.git_providers.gitea_provider import GiteaProvider _GIT_PROVIDERS = { 'github': GithubProvider, @@ -21,6 +22,7 @@ _GIT_PROVIDERS = { 'codecommit': CodeCommitProvider, 'local': LocalGitProvider, 'gerrit': GerritProvider, + 'gitea': GiteaProvider, } diff --git a/pr_agent/git_providers/gitea_provider.py b/pr_agent/git_providers/gitea_provider.py new file mode 100644 index 00000000..8fd12ff4 --- /dev/null +++ b/pr_agent/git_providers/gitea_provider.py @@ -0,0 +1,243 @@ +from typing import Optional, Tuple, List, Dict +from urllib.parse import urlparse +import requests +from pr_agent.git_providers.git_provider import GitProvider +from pr_agent.config_loader import get_settings +from pr_agent.log import get_logger +from pr_agent.algo.types import EDIT_TYPE, FilePatchInfo + + +class GiteaProvider(GitProvider): + """ + Implements GitProvider for Gitea/Forgejo API v1. + """ + + def __init__(self, pr_url: Optional[str] = None, incremental: Optional[bool] = False): + self.gitea_url = get_settings().get("GITEA.URL", None) + self.gitea_token = get_settings().get("GITEA.TOKEN", None) + if not self.gitea_url: + raise ValueError("GITEA.URL is not set in the config file") + if not self.gitea_token: + raise ValueError("GITEA.TOKEN is not set in the config file") + self.headers = { + 'Authorization': f'token {self.gitea_token}', + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + self.owner = None + self.repo = None + self.pr_num = None + self.pr = None + self.pr_url = pr_url + self.incremental = incremental + if pr_url: + self.set_pr(pr_url) + + @staticmethod + def _parse_pr_url(pr_url: str) -> Tuple[str, str, str]: + """ + Parse Gitea PR URL to (owner, repo, pr_number) + """ + parsed_url = urlparse(pr_url) + path_parts = parsed_url.path.strip('/').split('/') + if len(path_parts) < 4 or path_parts[2] != 'pulls': + raise ValueError(f"Invalid PR URL format: {pr_url}") + return path_parts[0], path_parts[1], path_parts[3] + + def set_pr(self, pr_url: str): + self.owner, self.repo, self.pr_num = self._parse_pr_url(pr_url) + self.pr = self._get_pr() + + def _get_pr(self): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/pulls/{self.pr_num}" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.json() + + def is_supported(self, capability: str) -> bool: + # Gitea/Forgejo supports most capabilities + return True + + def get_files(self) -> List[str]: + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/pulls/{self.pr_num}/files" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return [file['filename'] for file in response.json()] + + def get_diff_files(self) -> List[FilePatchInfo]: + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/pulls/{self.pr_num}/files" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + diff_files = [] + for file in response.json(): + edit_type = EDIT_TYPE.MODIFIED + if file.get('status') == 'added': + edit_type = EDIT_TYPE.ADDED + elif file.get('status') == 'deleted': + edit_type = EDIT_TYPE.DELETED + elif file.get('status') == 'renamed': + edit_type = EDIT_TYPE.RENAMED + + diff_files.append( + FilePatchInfo( + file.get('previous_filename', ''), + file.get('filename', ''), + file.get('patch', ''), + file['filename'], + edit_type=edit_type, + old_filename=file.get('previous_filename') + ) + ) + return diff_files + + def publish_description(self, pr_title: str, pr_body: str): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/pulls/{self.pr_num}" + data = {'title': pr_title, 'body': pr_body} + response = requests.patch(url, headers=self.headers, json=data) + response.raise_for_status() + + def publish_comment(self, pr_comment: str, is_temporary: bool = False): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/{self.pr_num}/comments" + data = {'body': pr_comment} + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + def publish_inline_comment(self, body: str, relevant_file: str, relevant_line_in_file: str, + original_suggestion=None): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/pulls/{self.pr_num}/comments" + data = { + 'body': body, + 'path': relevant_file, + 'line': int(relevant_line_in_file) + } + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + def publish_inline_comments(self, comments: list[dict]): + for comment in comments: + self.publish_inline_comment( + comment['body'], + comment['relevant_file'], + comment['relevant_line_in_file'], + comment.get('original_suggestion') + ) + + def publish_code_suggestions(self, code_suggestions: list) -> bool: + for suggestion in code_suggestions: + self.publish_inline_comment( + suggestion['body'], + suggestion['relevant_file'], + suggestion['relevant_line_in_file'], + suggestion.get('original_suggestion') + ) + return True + + def publish_labels(self, labels): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/{self.pr_num}/labels" + data = {'labels': labels} + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + def get_pr_labels(self, update=False): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/{self.pr_num}/labels" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return [label['name'] for label in response.json()] + + def get_issue_comments(self): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/{self.pr_num}/comments" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.json() + + def remove_initial_comment(self): + # Implementation depends on how you track the initial comment + pass + + def remove_comment(self, comment): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/comments/{comment['id']}" + response = requests.delete(url, headers=self.headers) + response.raise_for_status() + + def add_eyes_reaction(self, issue_comment_id: int, disable_eyes: bool = False) -> Optional[int]: + if disable_eyes: + return None + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/comments/{issue_comment_id}/reactions" + data = {'content': 'eyes'} + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + return response.json()['id'] + + def remove_reaction(self, issue_comment_id: int, reaction_id: int) -> bool: + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/issues/comments/{issue_comment_id}/reactions/{reaction_id}" + response = requests.delete(url, headers=self.headers) + return response.status_code == 204 + + def get_commit_messages(self): + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/pulls/{self.pr_num}/commits" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return [commit['commit']['message'] for commit in response.json()] + + def get_pr_branch(self): + return self.pr['head']['ref'] + + def get_user_id(self): + return self.pr['user']['id'] + + def get_pr_description_full(self) -> str: + return self.pr['body'] or '' + + def get_git_repo_url(self, issues_or_pr_url: str) -> str: + try: + parsed_url = urlparse(issues_or_pr_url) + path_parts = parsed_url.path.strip('/').split('/') + if len(path_parts) < 2: + raise ValueError(f"Invalid URL format: {issues_or_pr_url}") + return f"{parsed_url.scheme}://{parsed_url.netloc}/{path_parts[0]}/{path_parts[1]}.git" + except Exception as e: + get_logger().exception(f"Failed to get git repo URL from: {issues_or_pr_url}") + return "" + + def get_canonical_url_parts(self, repo_git_url: str, desired_branch: str) -> Tuple[str, str]: + try: + parsed_url = urlparse(repo_git_url) + path_parts = parsed_url.path.strip('/').split('/') + if len(path_parts) < 2: + raise ValueError(f"Invalid git repo URL format: {repo_git_url}") + + repo_name = path_parts[1] + if repo_name.endswith('.git'): + repo_name = repo_name[:-4] + + prefix = f"{parsed_url.scheme}://{parsed_url.netloc}/{path_parts[0]}/{repo_name}/src/branch/{desired_branch}" + suffix = "" + return prefix, suffix + except Exception as e: + get_logger().exception(f"Failed to get canonical URL parts from: {repo_git_url}") + return ("", "") + + def get_languages(self) -> Dict[str, float]: + """ + Get the languages used in the repository and their percentages. + Returns a dictionary mapping language names to their percentage of use. + """ + if not self.owner or not self.repo: + return {} + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}/languages" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.json() + + def get_repo_settings(self) -> Dict: + """ + Get repository settings and configuration. + Returns a dictionary containing repository settings. + """ + if not self.owner or not self.repo: + return {} + url = f"{self.gitea_url}/api/v1/repos/{self.owner}/{self.repo}" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.json() diff --git a/tests/e2e_tests/test_gitea_app.py b/tests/e2e_tests/test_gitea_app.py new file mode 100644 index 00000000..7114f527 --- /dev/null +++ b/tests/e2e_tests/test_gitea_app.py @@ -0,0 +1,186 @@ +import os +import time +import requests +from datetime import datetime + +from pr_agent.config_loader import get_settings +from pr_agent.log import get_logger, setup_logger +from tests.e2e_tests.e2e_utils import (FILE_PATH, + IMPROVE_START_WITH_REGEX_PATTERN, + NEW_FILE_CONTENT, NUM_MINUTES, + PR_HEADER_START_WITH, REVIEW_START_WITH) + +log_level = os.environ.get("LOG_LEVEL", "INFO") +setup_logger(log_level) +logger = get_logger() + +def test_e2e_run_gitea_app(): + repo_name = 'pr-agent-tests' + owner = 'codiumai' + base_branch = "main" + new_branch = f"gitea_app_e2e_test-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" + get_settings().config.git_provider = "gitea" + + headers = None + pr_number = None + + try: + gitea_url = get_settings().get("GITEA.URL", None) + gitea_token = get_settings().get("GITEA.TOKEN", None) + + if not gitea_url: + logger.error("GITEA.URL is not set in the configuration") + logger.info("Please set GITEA.URL in .env file or environment variables") + assert False, "GITEA.URL is not set in the configuration" + + if not gitea_token: + logger.error("GITEA.TOKEN is not set in the configuration") + logger.info("Please set GITEA.TOKEN in .env file or environment variables") + assert False, "GITEA.TOKEN is not set in the configuration" + + headers = { + 'Authorization': f'token {gitea_token}', + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + + logger.info(f"Creating a new branch {new_branch} from {base_branch}") + + response = requests.get( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/branches/{base_branch}", + headers=headers + ) + response.raise_for_status() + base_branch_data = response.json() + base_commit_sha = base_branch_data['commit']['id'] + + branch_data = { + 'ref': f"refs/heads/{new_branch}", + 'sha': base_commit_sha + } + response = requests.post( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/git/refs", + headers=headers, + json=branch_data + ) + response.raise_for_status() + + logger.info(f"Updating file {FILE_PATH} in branch {new_branch}") + + import base64 + file_content_encoded = base64.b64encode(NEW_FILE_CONTENT.encode()).decode() + + try: + response = requests.get( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/contents/{FILE_PATH}?ref={new_branch}", + headers=headers + ) + response.raise_for_status() + existing_file = response.json() + file_sha = existing_file.get('sha') + + file_data = { + 'message': 'Update cli_pip.py', + 'content': file_content_encoded, + 'sha': file_sha, + 'branch': new_branch + } + except: + file_data = { + 'message': 'Add cli_pip.py', + 'content': file_content_encoded, + 'branch': new_branch + } + + response = requests.put( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/contents/{FILE_PATH}", + headers=headers, + json=file_data + ) + response.raise_for_status() + + logger.info(f"Creating a pull request from {new_branch} to {base_branch}") + pr_data = { + 'title': f'Test PR from {new_branch}', + 'body': 'update cli_pip.py', + 'head': new_branch, + 'base': base_branch + } + response = requests.post( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/pulls", + headers=headers, + json=pr_data + ) + response.raise_for_status() + pr = response.json() + pr_number = pr['number'] + + for i in range(NUM_MINUTES): + logger.info(f"Waiting for the PR to get all the tool results...") + time.sleep(60) + + response = requests.get( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/issues/{pr_number}/comments", + headers=headers + ) + response.raise_for_status() + comments = response.json() + + if len(comments) >= 5: # заголовок, 3 предложения, 1 ревью + valid_review = False + for comment in comments: + if comment['body'].startswith('## PR Reviewer Guide 🔍'): + valid_review = True + break + if valid_review: + break + else: + logger.error("REVIEW feedback is invalid") + raise Exception("REVIEW feedback is invalid") + else: + logger.info(f"Waiting for the PR to get all the tool results. {i + 1} minute(s) passed") + else: + assert False, f"After {NUM_MINUTES} minutes, the PR did not get all the tool results" + + logger.info(f"Cleaning up: closing PR and deleting branch {new_branch}") + + close_data = {'state': 'closed'} + response = requests.patch( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/pulls/{pr_number}", + headers=headers, + json=close_data + ) + response.raise_for_status() + + # Удаляем ветку + response = requests.delete( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/git/refs/heads/{new_branch}", + headers=headers + ) + response.raise_for_status() + + logger.info(f"Succeeded in running e2e test for Gitea app on the PR") + except Exception as e: + logger.error(f"Failed to run e2e test for Gitea app: {e}") + raise + finally: + try: + if headers is None or gitea_url is None: + return + + if pr_number is not None: + requests.patch( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/pulls/{pr_number}", + headers=headers, + json={'state': 'closed'} + ) + + requests.delete( + f"{gitea_url}/api/v1/repos/{owner}/{repo_name}/git/refs/heads/{new_branch}", + headers=headers + ) + except Exception as cleanup_error: + logger.error(f"Failed to clean up after test: {cleanup_error}") + +if __name__ == '__main__': + test_e2e_run_gitea_app() \ No newline at end of file diff --git a/tests/unittest/test_gitea_provider.py b/tests/unittest/test_gitea_provider.py new file mode 100644 index 00000000..d88de0e0 --- /dev/null +++ b/tests/unittest/test_gitea_provider.py @@ -0,0 +1,126 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from pr_agent.algo.types import EDIT_TYPE +from pr_agent.git_providers.gitea_provider import GiteaProvider + + +class TestGiteaProvider: + """Unit-tests for GiteaProvider following project style (explicit object construction, minimal patching).""" + + def _provider(self): + """Create provider instance with patched settings and avoid real HTTP calls.""" + with patch('pr_agent.git_providers.gitea_provider.get_settings') as mock_get_settings, \ + patch('requests.get') as mock_get: + settings = MagicMock() + settings.get.side_effect = lambda k, d=None: { + 'GITEA.URL': 'https://gitea.example.com', + 'GITEA.TOKEN': 'test-token' + }.get(k, d) + mock_get_settings.return_value = settings + # Stub the PR fetch triggered during provider initialization + pr_resp = MagicMock() + pr_resp.json.return_value = { + 'title': 'stub', + 'body': 'stub', + 'head': {'ref': 'main'}, + 'user': {'id': 1} + } + pr_resp.raise_for_status = MagicMock() + mock_get.return_value = pr_resp + return GiteaProvider('https://gitea.example.com/owner/repo/pulls/123') + + # ---------------- URL parsing ---------------- + def test_parse_pr_url_valid(self): + owner, repo, pr_num = GiteaProvider._parse_pr_url('https://gitea.example.com/owner/repo/pulls/123') + assert (owner, repo, pr_num) == ('owner', 'repo', '123') + + def test_parse_pr_url_invalid(self): + with pytest.raises(ValueError): + GiteaProvider._parse_pr_url('https://gitea.example.com/owner/repo') + + # ---------------- simple getters ---------------- + def test_get_files(self): + provider = self._provider() + mock_resp = MagicMock() + mock_resp.json.return_value = [{'filename': 'a.txt'}, {'filename': 'b.txt'}] + mock_resp.raise_for_status = MagicMock() + with patch('requests.get', return_value=mock_resp) as mock_get: + assert provider.get_files() == ['a.txt', 'b.txt'] + mock_get.assert_called_once() + + def test_get_diff_files(self): + provider = self._provider() + mock_resp = MagicMock() + mock_resp.json.return_value = [ + {'filename': 'f1', 'previous_filename': 'old_f1', 'status': 'renamed', 'patch': ''}, + {'filename': 'f2', 'status': 'added', 'patch': ''}, + {'filename': 'f3', 'status': 'deleted', 'patch': ''}, + {'filename': 'f4', 'status': 'modified', 'patch': ''} + ] + mock_resp.raise_for_status = MagicMock() + with patch('requests.get', return_value=mock_resp): + res = provider.get_diff_files() + assert [f.edit_type for f in res] == [EDIT_TYPE.RENAMED, EDIT_TYPE.ADDED, EDIT_TYPE.DELETED, + EDIT_TYPE.MODIFIED] + + # ---------------- publishing methods ---------------- + def test_publish_description(self): + provider = self._provider() + mock_resp = MagicMock(); + mock_resp.raise_for_status = MagicMock() + with patch('requests.patch', return_value=mock_resp) as mock_patch: + provider.publish_description('t', 'b'); + mock_patch.assert_called_once() + + def test_publish_comment(self): + provider = self._provider() + mock_resp = MagicMock(); + mock_resp.raise_for_status = MagicMock() + with patch('requests.post', return_value=mock_resp) as mock_post: + provider.publish_comment('c'); + mock_post.assert_called_once() + + def test_publish_inline_comment(self): + provider = self._provider() + mock_resp = MagicMock(); + mock_resp.raise_for_status = MagicMock() + with patch('requests.post', return_value=mock_resp) as mock_post: + provider.publish_inline_comment('body', 'file', '10'); + mock_post.assert_called_once() + + # ---------------- labels & reactions ---------------- + def test_get_pr_labels(self): + provider = self._provider() + mock_resp = MagicMock(); + mock_resp.raise_for_status = MagicMock(); + mock_resp.json.return_value = [{'name': 'l1'}] + with patch('requests.get', return_value=mock_resp): + assert provider.get_pr_labels() == ['l1'] + + def test_add_eyes_reaction(self): + provider = self._provider() + mock_resp = MagicMock(); + mock_resp.raise_for_status = MagicMock(); + mock_resp.json.return_value = {'id': 7} + with patch('requests.post', return_value=mock_resp): + assert provider.add_eyes_reaction(1) == 7 + + # ---------------- commit messages & url helpers ---------------- + def test_get_commit_messages(self): + provider = self._provider() + mock_resp = MagicMock(); + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = [ + {'commit': {'message': 'm1'}}, {'commit': {'message': 'm2'}}] + with patch('requests.get', return_value=mock_resp): + assert provider.get_commit_messages() == ['m1', 'm2'] + + def test_git_url_helpers(self): + provider = self._provider() + issues_url = 'https://gitea.example.com/owner/repo/pulls/3' + assert provider.get_git_repo_url(issues_url) == 'https://gitea.example.com/owner/repo.git' + prefix, suffix = provider.get_canonical_url_parts('https://gitea.example.com/owner/repo.git', 'dev') + assert prefix == 'https://gitea.example.com/owner/repo/src/branch/dev' + assert suffix == ''