Files
pr-agent/pr_agent/servers/bitbucket_app.py

217 lines
9.6 KiB
Python

import base64
import copy
import hashlib
import json
import os
import time
import jwt
import requests
import uvicorn
from fastapi import APIRouter, FastAPI, Request, Response
from starlette.background import BackgroundTasks
from starlette.middleware import Middleware
from starlette.responses import JSONResponse
from starlette_context import context
from starlette_context.middleware import RawContextMiddleware
from pr_agent.agent.pr_agent import PRAgent
from pr_agent.algo.utils import update_settings_from_args
from pr_agent.config_loader import get_settings, global_settings
from pr_agent.git_providers.utils import apply_repo_settings
from pr_agent.identity_providers import get_identity_provider
from pr_agent.identity_providers.identity_provider import Eligibility
from pr_agent.log import LoggingFormat, get_logger, setup_logger
from pr_agent.secret_providers import get_secret_provider
from pr_agent.servers.github_action_runner import get_setting_or_env, is_true
from pr_agent.tools.pr_code_suggestions import PRCodeSuggestions
from pr_agent.tools.pr_description import PRDescription
from pr_agent.tools.pr_reviewer import PRReviewer
setup_logger(fmt=LoggingFormat.JSON, level="DEBUG")
router = APIRouter()
secret_provider = get_secret_provider() if get_settings().get("CONFIG.SECRET_PROVIDER") else None
async def get_bearer_token(shared_secret: str, client_key: str):
try:
now = int(time.time())
url = "https://bitbucket.org/site/oauth2/access_token"
canonical_url = "GET&/site/oauth2/access_token&"
qsh = hashlib.sha256(canonical_url.encode("utf-8")).hexdigest()
app_key = get_settings().bitbucket.app_key
payload = {
"iss": app_key,
"iat": now,
"exp": now + 240,
"qsh": qsh,
"sub": client_key,
}
token = jwt.encode(payload, shared_secret, algorithm="HS256")
payload = 'grant_type=urn%3Abitbucket%3Aoauth2%3Ajwt'
headers = {
'Authorization': f'JWT {token}',
'Content-Type': 'application/x-www-form-urlencoded'
}
response = requests.request("POST", url, headers=headers, data=payload)
bearer_token = response.json()["access_token"]
return bearer_token
except Exception as e:
get_logger().error(f"Failed to get bearer token: {e}")
raise e
@router.get("/")
async def handle_manifest(request: Request, response: Response):
cur_dir = os.path.dirname(os.path.abspath(__file__))
manifest = open(os.path.join(cur_dir, "atlassian-connect.json"), "rt").read()
try:
manifest = manifest.replace("app_key", get_settings().bitbucket.app_key)
manifest = manifest.replace("base_url", get_settings().bitbucket.base_url)
except:
get_logger().error("Failed to replace api_key in Bitbucket manifest, trying to continue")
manifest_obj = json.loads(manifest)
return JSONResponse(manifest_obj)
async def _perform_commands_bitbucket(commands_conf: str, agent: PRAgent, api_url: str, log_context: dict):
apply_repo_settings(api_url)
commands = get_settings().get(f"bitbucket_app.{commands_conf}", {})
for command in commands:
try:
split_command = command.split(" ")
command = split_command[0]
args = split_command[1:]
other_args = update_settings_from_args(args)
new_command = ' '.join([command] + other_args)
get_logger().info(f"Performing command: {new_command}")
with get_logger().contextualize(**log_context):
await agent.handle_request(api_url, new_command)
except Exception as e:
get_logger().error(f"Failed to perform command {command}: {e}")
@router.post("/webhook")
async def handle_github_webhooks(background_tasks: BackgroundTasks, request: Request):
app_name = get_settings().get("CONFIG.APP_NAME", "Unknown")
log_context = {"server_type": "bitbucket_app", "app_name": app_name}
get_logger().debug(request.headers)
jwt_header = request.headers.get("authorization", None)
if jwt_header:
input_jwt = jwt_header.split(" ")[1]
data = await request.json()
get_logger().debug(data)
async def inner():
try:
try:
if data["data"]["actor"]["type"] != "user":
return "OK"
except KeyError:
get_logger().error("Failed to get actor type, check previous logs, this shouldn't happen.")
# Get the username of the sender
try:
username = data["data"]["actor"]["username"]
except KeyError:
try:
username = data["data"]["actor"]["display_name"]
except KeyError:
username = data["data"]["actor"]["nickname"]
log_context["sender"] = username
sender_id = data["data"]["actor"]["account_id"]
log_context["sender_id"] = sender_id
jwt_parts = input_jwt.split(".")
claim_part = jwt_parts[1]
claim_part += "=" * (-len(claim_part) % 4)
decoded_claims = base64.urlsafe_b64decode(claim_part)
claims = json.loads(decoded_claims)
client_key = claims["iss"]
secrets = json.loads(secret_provider.get_secret(client_key))
shared_secret = secrets["shared_secret"]
jwt.decode(input_jwt, shared_secret, audience=client_key, algorithms=["HS256"])
bearer_token = await get_bearer_token(shared_secret, client_key)
context['bitbucket_bearer_token'] = bearer_token
context["settings"] = copy.deepcopy(global_settings)
event = data["event"]
agent = PRAgent()
if event == "pullrequest:created":
pr_url = data["data"]["pullrequest"]["links"]["html"]["href"]
log_context["api_url"] = pr_url
log_context["event"] = "pull_request"
if pr_url:
with get_logger().contextualize(**log_context):
apply_repo_settings(pr_url)
if get_identity_provider().verify_eligibility("bitbucket",
sender_id, pr_url) is not Eligibility.NOT_ELIGIBLE:
if get_settings().get("bitbucket_app.pr_commands"):
await _perform_commands_bitbucket("pr_commands", PRAgent(), pr_url, log_context)
else: # backwards compatibility
auto_review = get_setting_or_env("BITBUCKET_APP.AUTO_REVIEW", None)
if is_true(auto_review): # by default, auto review is disabled
await PRReviewer(pr_url).run()
auto_improve = get_setting_or_env("BITBUCKET_APP.AUTO_IMPROVE", None)
if is_true(auto_improve): # by default, auto improve is disabled
await PRCodeSuggestions(pr_url).run()
auto_describe = get_setting_or_env("BITBUCKET_APP.AUTO_DESCRIBE", None)
if is_true(auto_describe): # by default, auto describe is disabled
await PRDescription(pr_url).run()
elif event == "pullrequest:comment_created":
pr_url = data["data"]["pullrequest"]["links"]["html"]["href"]
log_context["api_url"] = pr_url
log_context["event"] = "comment"
comment_body = data["data"]["comment"]["content"]["raw"]
with get_logger().contextualize(**log_context):
if get_identity_provider().verify_eligibility("bitbucket",
sender_id, pr_url) is not Eligibility.NOT_ELIGIBLE:
await agent.handle_request(pr_url, comment_body)
except Exception as e:
get_logger().error(f"Failed to handle webhook: {e}")
background_tasks.add_task(inner)
return "OK"
@router.get("/webhook")
async def handle_github_webhooks(request: Request, response: Response):
return "Webhook server online!"
@router.post("/installed")
async def handle_installed_webhooks(request: Request, response: Response):
try:
get_logger().info("handle_installed_webhooks")
get_logger().info(request.headers)
data = await request.json()
get_logger().info(data)
shared_secret = data["sharedSecret"]
client_key = data["clientKey"]
username = data["principal"]["username"]
secrets = {
"shared_secret": shared_secret,
"client_key": client_key
}
secret_provider.store_secret(username, json.dumps(secrets))
except Exception as e:
get_logger().error(f"Failed to register user: {e}")
return JSONResponse({"error": "Unable to register user"}, status_code=500)
@router.post("/uninstalled")
async def handle_uninstalled_webhooks(request: Request, response: Response):
get_logger().info("handle_uninstalled_webhooks")
data = await request.json()
get_logger().info(data)
def start():
get_settings().set("CONFIG.PUBLISH_OUTPUT_PROGRESS", False)
get_settings().set("CONFIG.GIT_PROVIDER", "bitbucket")
get_settings().set("PR_DESCRIPTION.PUBLISH_DESCRIPTION_AS_COMMENT", True)
middleware = [Middleware(RawContextMiddleware)]
app = FastAPI(middleware=middleware)
app.include_router(router)
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "3000")))
if __name__ == '__main__':
start()