mirror of
https://github.com/qodo-ai/pr-agent.git
synced 2025-07-03 12:20:38 +08:00
Compare commits
1 Commits
v0.28
...
test-almog
Author | SHA1 | Date | |
---|---|---|---|
94826f81c8 |
250
pr_agent/tools/index.py
Normal file
250
pr_agent/tools/index.py
Normal file
@ -0,0 +1,250 @@
|
||||
|
||||
import copy
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import uvicorn
|
||||
from fastapi import APIRouter, FastAPI, Request, status
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.responses import JSONResponse
|
||||
from starlette.background import BackgroundTasks
|
||||
from starlette.middleware import Middleware
|
||||
from starlette_context import context
|
||||
from starlette_context.middleware import RawContextMiddleware
|
||||
|
||||
from pr_agent.agent.pr_agent import PRAgent
|
||||
from pr_agent.algo.utils import update_settings_from_args
|
||||
from pr_agent.config_loader import get_settings, global_settings
|
||||
from pr_agent.git_providers.utils import apply_repo_settings
|
||||
from pr_agent.log import LoggingFormat, get_logger, setup_logger
|
||||
from pr_agent.secret_providers import get_secret_provider
|
||||
|
||||
setup_logger(fmt=LoggingFormat.JSON, level="DEBUG")
|
||||
router = APIRouter()
|
||||
|
||||
secret_provider = get_secret_provider() if get_settings().get("CONFIG.SECRET_PROVIDER") else None
|
||||
|
||||
|
||||
async def handle_request(api_url: str, body: str, log_context: dict, sender_id: str):
|
||||
log_context["action"] = body
|
||||
log_context["event"] = "pull_request" if body == "/review" else "comment"
|
||||
log_context["api_url"] = api_url
|
||||
log_context["app_name"] = get_settings().get("CONFIG.APP_NAME", "Unknown")
|
||||
|
||||
with get_logger().contextualize(**log_context):
|
||||
await PRAgent().handle_request(api_url, body)
|
||||
|
||||
|
||||
async def _perform_commands_gitlab(commands_conf: str, agent: PRAgent, api_url: str,
|
||||
log_context: dict, data: dict):
|
||||
apply_repo_settings(api_url)
|
||||
if commands_conf == "pr_commands" and get_settings().config.disable_auto_feedback: # auto commands for PR, and auto feedback is disabled
|
||||
get_logger().info(f"Auto feedback is disabled, skipping auto commands for PR {api_url=}", **log_context)
|
||||
return
|
||||
if not should_process_pr_logic(data): # Here we already updated the configurations
|
||||
return
|
||||
commands = get_settings().get(f"gitlab.{commands_conf}", {})
|
||||
get_settings().set("config.is_auto_command", True)
|
||||
for command in commands:
|
||||
try:
|
||||
split_command = command.split(" ")
|
||||
command = split_command[0]
|
||||
args = split_command[1:]
|
||||
other_args = update_settings_from_args(args)
|
||||
new_command = ' '.join([command] + other_args)
|
||||
get_logger().info(f"Performing command: {new_command}")
|
||||
with get_logger().contextualize(**log_context):
|
||||
await agent.handle_request(api_url, new_command)
|
||||
except Exception as e:
|
||||
get_logger().error(f"Failed to perform command {command}: {e}")
|
||||
|
||||
|
||||
def is_bot_user(data) -> bool:
|
||||
try:
|
||||
# logic to ignore bot users (unlike Github, no direct flag for bot users in gitlab)
|
||||
sender_name = data.get("user", {}).get("name", "unknown").lower()
|
||||
bot_indicators = ['codium', 'bot_', 'bot-', '_bot', '-bot']
|
||||
if any(indicator in sender_name for indicator in bot_indicators):
|
||||
get_logger().info(f"Skipping GitLab bot user: {sender_name}")
|
||||
return True
|
||||
except Exception as e:
|
||||
get_logger().error(f"Failed 'is_bot_user' logic: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def should_process_pr_logic(data) -> bool:
|
||||
try:
|
||||
if not data.get('object_attributes', {}):
|
||||
return False
|
||||
title = data['object_attributes'].get('title')
|
||||
sender = data.get("user", {}).get("username", "")
|
||||
|
||||
# logic to ignore PRs from specific users
|
||||
ignore_pr_users = get_settings().get("CONFIG.IGNORE_PR_AUTHORS", [])
|
||||
if ignore_pr_users and sender:
|
||||
if sender in ignore_pr_users:
|
||||
get_logger().info(f"Ignoring PR from user '{sender}' due to 'config.ignore_pr_authors' settings")
|
||||
return False
|
||||
|
||||
# logic to ignore MRs for titles, labels and source, target branches.
|
||||
ignore_mr_title = get_settings().get("CONFIG.IGNORE_PR_TITLE", [])
|
||||
ignore_mr_labels = get_settings().get("CONFIG.IGNORE_PR_LABELS", [])
|
||||
ignore_mr_source_branches = get_settings().get("CONFIG.IGNORE_PR_SOURCE_BRANCHES", [])
|
||||
ignore_mr_target_branches = get_settings().get("CONFIG.IGNORE_PR_TARGET_BRANCHES", [])
|
||||
|
||||
#
|
||||
if ignore_mr_source_branches:
|
||||
source_branch = data['object_attributes'].get('source_branch')
|
||||
if any(re.search(regex, source_branch) for regex in ignore_mr_source_branches):
|
||||
get_logger().info(
|
||||
f"Ignoring MR with source branch '{source_branch}' due to gitlab.ignore_mr_source_branches settings")
|
||||
return False
|
||||
|
||||
if ignore_mr_target_branches:
|
||||
target_branch = data['object_attributes'].get('target_branch')
|
||||
if any(re.search(regex, target_branch) for regex in ignore_mr_target_branches):
|
||||
get_logger().info(
|
||||
f"Ignoring MR with target branch '{target_branch}' due to gitlab.ignore_mr_target_branches settings")
|
||||
return False
|
||||
|
||||
if ignore_mr_labels:
|
||||
labels = [label['title'] for label in data['object_attributes'].get('labels', [])]
|
||||
if any(label in ignore_mr_labels for label in labels):
|
||||
labels_str = ", ".join(labels)
|
||||
get_logger().info(f"Ignoring MR with labels '{labels_str}' due to gitlab.ignore_mr_labels settings")
|
||||
return False
|
||||
|
||||
if ignore_mr_title:
|
||||
if any(re.search(regex, title) for regex in ignore_mr_title):
|
||||
get_logger().info(f"Ignoring MR with title '{title}' due to gitlab.ignore_mr_title settings")
|
||||
return False
|
||||
except Exception as e:
|
||||
get_logger().error(f"Failed 'should_process_pr_logic': {e}")
|
||||
return True
|
||||
|
||||
|
||||
@router.post("/webhook")
|
||||
async def gitlab_webhook(background_tasks: BackgroundTasks, request: Request):
|
||||
start_time = datetime.now()
|
||||
request_json = await request.json()
|
||||
context["settings"] = copy.deepcopy(global_settings)
|
||||
|
||||
async def inner(data: dict):
|
||||
log_context = {"server_type": "gitlab_app"}
|
||||
get_logger().debug("Received a GitLab webhook")
|
||||
if request.headers.get("X-Gitlab-Token") and secret_provider:
|
||||
request_token = request.headers.get("X-Gitlab-Token")
|
||||
secret = secret_provider.get_secret(request_token)
|
||||
if not secret:
|
||||
get_logger().warning(f"Empty secret retrieved, request_token: {request_token}")
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
content=jsonable_encoder({"message": "unauthorized"}))
|
||||
try:
|
||||
secret_dict = json.loads(secret)
|
||||
gitlab_token = secret_dict["gitlab_token"]
|
||||
log_context["token_id"] = secret_dict.get("token_name", secret_dict.get("id", "unknown"))
|
||||
context["settings"].gitlab.personal_access_token = gitlab_token
|
||||
except Exception as e:
|
||||
get_logger().error(f"Failed to validate secret {request_token}: {e}")
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content=jsonable_encoder({"message": "unauthorized"}))
|
||||
elif get_settings().get("GITLAB.SHARED_SECRET"):
|
||||
secret = get_settings().get("GITLAB.SHARED_SECRET")
|
||||
if not request.headers.get("X-Gitlab-Token") == secret:
|
||||
get_logger().error("Failed to validate secret")
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content=jsonable_encoder({"message": "unauthorized"}))
|
||||
else:
|
||||
get_logger().error("Failed to validate secret")
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content=jsonable_encoder({"message": "unauthorized"}))
|
||||
gitlab_token = get_settings().get("GITLAB.PERSONAL_ACCESS_TOKEN", None)
|
||||
if not gitlab_token:
|
||||
get_logger().error("No gitlab token found")
|
||||
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content=jsonable_encoder({"message": "unauthorized"}))
|
||||
|
||||
get_logger().info("GitLab data", artifact=data)
|
||||
sender = data.get("user", {}).get("username", "unknown")
|
||||
sender_id = data.get("user", {}).get("id", "unknown")
|
||||
|
||||
# ignore bot users
|
||||
if is_bot_user(data):
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "success"}))
|
||||
|
||||
log_context["sender"] = sender
|
||||
if data.get('object_kind') == 'merge_request':
|
||||
# ignore MRs based on title, labels, source and target branches
|
||||
if not should_process_pr_logic(data):
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "success"}))
|
||||
|
||||
if data['object_attributes'].get('action') in ['open', 'reopen']:
|
||||
url = data['object_attributes'].get('url')
|
||||
draft = data['object_attributes'].get('draft')
|
||||
get_logger().info(f"New merge request: {url}")
|
||||
if draft:
|
||||
get_logger().info(f"Skipping draft MR: {url}")
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "success"}))
|
||||
|
||||
await _perform_commands_gitlab("pr_commands", PRAgent(), url, log_context, data)
|
||||
|
||||
# Handle the Draft to Ready transition
|
||||
elif data['object_attributes'].get('action') == 'update':
|
||||
url = data['object_attributes'].get('url')
|
||||
old_draft_status = data['changes']['draft']['previous']
|
||||
new_draft_status = data['object_attributes'].get('draft')
|
||||
|
||||
# Check if the merge request transitioned from Draft to Ready
|
||||
if old_draft_status and not new_draft_status:
|
||||
get_logger().info(f"Merge Request transitioned from Draft to Ready: {url}")
|
||||
await _perform_commands_gitlab("pr_draft_ready_commands", PRAgent(), url, log_context, data)
|
||||
|
||||
elif data.get('object_kind') == 'note' and data.get('event_type') == 'note': # comment on MR
|
||||
if 'merge_request' in data:
|
||||
mr = data['merge_request']
|
||||
url = mr.get('url')
|
||||
|
||||
get_logger().info(f"A comment has been added to a merge request: {url}")
|
||||
body = data.get('object_attributes', {}).get('note')
|
||||
if data.get('object_attributes', {}).get('type') == 'DiffNote' and '/ask' in body: # /ask_line
|
||||
body = handle_ask_line(body, data)
|
||||
|
||||
await handle_request(url, body, log_context, sender_id)
|
||||
|
||||
background_tasks.add_task(inner, request_json)
|
||||
end_time = datetime.now()
|
||||
get_logger().info(f"Processing time: {end_time - start_time}", request=request_json)
|
||||
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder({"message": "success"}))
|
||||
|
||||
|
||||
def handle_ask_line(body, data):
|
||||
try:
|
||||
line_range_ = data['object_attributes']['position']['line_range']
|
||||
start_line = line_range_['start']['new_line']
|
||||
end_line = line_range_['end']['new_line']
|
||||
question = body.replace('/ask', '').strip()
|
||||
path = data['object_attributes']['position']['new_path']
|
||||
side = 'RIGHT'
|
||||
comment_id = data['object_attributes']["discussion_id"]
|
||||
get_logger().info("Handling line comment")
|
||||
body = f"/ask_line --line_start={start_line} --line_end={end_line} --side={side} --file_name={path} --comment_id={comment_id} {question}"
|
||||
except Exception as e:
|
||||
get_logger().error(f"Failed to handle ask line comment: {e}")
|
||||
return body
|
||||
|
||||
@router.get("/")
|
||||
async def root():
|
||||
return {"status": "ok"}
|
||||
|
||||
gitlab_url = get_settings().get("GITLAB.URL", None)
|
||||
if not gitlab_url:
|
||||
raise ValueError("GITLAB.URL is not set")
|
||||
get_settings().config.git_provider = "gitlab"
|
||||
middleware = [Middleware(RawContextMiddleware)]
|
||||
app = FastAPI(middleware=middleware)
|
||||
app.include_router(router)
|
||||
|
||||
|
||||
def start():
|
||||
uvicorn.run(app, host="0.0.0.0", port=3000)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
start()
|
Reference in New Issue
Block a user