import ast import json import os import re from typing import List import uvicorn from fastapi import APIRouter, FastAPI from fastapi.encoders import jsonable_encoder from fastapi.responses import RedirectResponse from starlette import status from starlette.background import BackgroundTasks from starlette.middleware import Middleware from starlette.requests import Request from starlette.responses import JSONResponse from starlette_context.middleware import RawContextMiddleware from pr_agent.agent.pr_agent import PRAgent from pr_agent.algo.utils import update_settings_from_args from pr_agent.config_loader import get_settings from pr_agent.git_providers.utils import apply_repo_settings from pr_agent.log import LoggingFormat, get_logger, setup_logger from pr_agent.servers.utils import verify_signature setup_logger(fmt=LoggingFormat.JSON, level=get_settings().get("CONFIG.LOG_LEVEL", "DEBUG")) router = APIRouter() def handle_request( background_tasks: BackgroundTasks, url: str, body: str, log_context: dict ): log_context["action"] = body log_context["api_url"] = url async def inner(): try: with get_logger().contextualize(**log_context): await PRAgent().handle_request(url, body) except Exception as e: get_logger().error(f"Failed to handle webhook: {e}") background_tasks.add_task(inner) def should_process_pr_logic(data) -> bool: try: pr_data = data.get("pullRequest", {}) title = pr_data.get("title", "") source_branch = pr_data.get("fromRef", {}).get("displayId", "") target_branch = pr_data.get("toRef", {}).get("displayId", "") sender = pr_data.get("author", {}).get("user", {}).get("name", "") project_key = pr_data.get("toRef", {}).get("repository", {}).get("project", {}).get("key", "") repo_slug = pr_data.get("toRef", {}).get("repository", {}).get("slug", "") repo_full_name = f"{project_key}/{repo_slug}" if project_key and repo_slug else "" pr_id = pr_data.get("id", None) # To ignore PRs from specific repositories ignore_repos = get_settings().get("CONFIG.IGNORE_REPOSITORIES", []) if repo_full_name and ignore_repos: if any(re.search(regex, repo_full_name) for regex in ignore_repos): get_logger().info(f"Ignoring PR from repository '{repo_full_name}' due to 'config.ignore_repositories' setting") return False # To ignore PRs from specific users ignore_pr_users = get_settings().get("CONFIG.IGNORE_PR_AUTHORS", []) if ignore_pr_users and sender: if sender in ignore_pr_users: get_logger().info(f"Ignoring PR from user '{sender}' due to 'config.ignore_pr_authors' setting") return False # To ignore PRs with specific titles if title: ignore_pr_title_re = get_settings().get("CONFIG.IGNORE_PR_TITLE", []) if not isinstance(ignore_pr_title_re, list): ignore_pr_title_re = [ignore_pr_title_re] if ignore_pr_title_re and any(re.search(regex, title) for regex in ignore_pr_title_re): get_logger().info(f"Ignoring PR with title '{title}' due to config.ignore_pr_title setting") return False ignore_pr_source_branches = get_settings().get("CONFIG.IGNORE_PR_SOURCE_BRANCHES", []) ignore_pr_target_branches = get_settings().get("CONFIG.IGNORE_PR_TARGET_BRANCHES", []) if (ignore_pr_source_branches or ignore_pr_target_branches): if any(re.search(regex, source_branch) for regex in ignore_pr_source_branches): get_logger().info( f"Ignoring PR with source branch '{source_branch}' due to config.ignore_pr_source_branches settings") return False if any(re.search(regex, target_branch) for regex in ignore_pr_target_branches): get_logger().info( f"Ignoring PR with target branch '{target_branch}' due to config.ignore_pr_target_branches settings") return False # allow_only_specific_folders allowed_folders = get_settings().config.get("allow_only_specific_folders", []) if allowed_folders and pr_id and project_key and repo_slug: try: from pr_agent.git_providers.bitbucket_server_provider import BitbucketServerProvider bitbucket_server_url = get_settings().get("BITBUCKET_SERVER.URL", "") pr_url = f"{bitbucket_server_url}/projects/{project_key}/repos/{repo_slug}/pull-requests/{pr_id}" provider = BitbucketServerProvider(pr_url=pr_url) changed_files = provider.get_files() if changed_files: for file_path in changed_files: if not any(file_path.startswith(folder) for folder in allowed_folders): get_logger().info(f"Ignoring PR because file '{file_path}' is not in allowed folders {allowed_folders}") return False except Exception as e: get_logger().error(f"Failed allow_only_specific_folders logic: {e}") return False except Exception as e: get_logger().error(f"Failed 'should_process_pr_logic': {e}") return False return True @router.post("/") async def redirect_to_webhook(): return RedirectResponse(url="/webhook") @router.post("/webhook") async def handle_webhook(background_tasks: BackgroundTasks, request: Request): log_context = {"server_type": "bitbucket_server"} data = await request.json() get_logger().info(json.dumps(data)) webhook_secret = get_settings().get("BITBUCKET_SERVER.WEBHOOK_SECRET", None) if webhook_secret: body_bytes = await request.body() if body_bytes.decode('utf-8') == '{"test": true}': return JSONResponse( status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "connection test successful"}) ) signature_header = request.headers.get("x-hub-signature", None) verify_signature(body_bytes, webhook_secret, signature_header) pr_id = data["pullRequest"]["id"] repository_name = data["pullRequest"]["toRef"]["repository"]["slug"] project_name = data["pullRequest"]["toRef"]["repository"]["project"]["key"] bitbucket_server = get_settings().get("BITBUCKET_SERVER.URL") pr_url = f"{bitbucket_server}/projects/{project_name}/repos/{repository_name}/pull-requests/{pr_id}" log_context["api_url"] = pr_url log_context["event"] = "pull_request" commands_to_run = [] if data["eventKey"] == "pr:opened": apply_repo_settings(pr_url) if not should_process_pr_logic(data): get_logger().info(f"PR ignored due to config settings", **log_context) return JSONResponse( status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "PR ignored by config"}) ) if get_settings().config.disable_auto_feedback: # auto commands for PR, and auto feedback is disabled get_logger().info(f"Auto feedback is disabled, skipping auto commands for PR {pr_url}", **log_context) return get_settings().set("config.is_auto_command", True) commands_to_run.extend(_get_commands_list_from_settings('BITBUCKET_SERVER.PR_COMMANDS')) elif data["eventKey"] == "pr:comment:added": commands_to_run.append(data["comment"]["text"]) else: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content=json.dumps({"message": "Unsupported event"}), ) async def inner(): try: await _run_commands_sequentially(commands_to_run, pr_url, log_context) except Exception as e: get_logger().error(f"Failed to handle webhook: {e}") background_tasks.add_task(inner) return JSONResponse( status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "success"}) ) async def _run_commands_sequentially(commands: List[str], url: str, log_context: dict): get_logger().info(f"Running commands sequentially: {commands}") if commands is None: return for command in commands: try: body = _process_command(command, url) log_context["action"] = body log_context["api_url"] = url with get_logger().contextualize(**log_context): await PRAgent().handle_request(url, body) except Exception as e: get_logger().error(f"Failed to handle command: {command} , error: {e}") def _process_command(command: str, url) -> str: # don't think we need this apply_repo_settings(url) # Process the command string split_command = command.split(" ") command = split_command[0] args = split_command[1:] # do I need this? if yes, shouldn't this be done in PRAgent? other_args = update_settings_from_args(args) new_command = ' '.join([command] + other_args) return new_command def _to_list(command_string: str) -> list: try: # Use ast.literal_eval to safely parse the string into a list commands = ast.literal_eval(command_string) # Check if the parsed object is a list of strings if isinstance(commands, list) and all(isinstance(cmd, str) for cmd in commands): return commands else: raise ValueError("Parsed data is not a list of strings.") except (SyntaxError, ValueError, TypeError) as e: raise ValueError(f"Invalid command string: {e}") def _get_commands_list_from_settings(setting_key:str ) -> list: try: return get_settings().get(setting_key, []) except ValueError as e: get_logger().error(f"Failed to get commands list from settings {setting_key}: {e}") @router.get("/") async def root(): return {"status": "ok"} def start(): app = FastAPI(middleware=[Middleware(RawContextMiddleware)]) app.include_router(router) uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "3000"))) if __name__ == "__main__": start()