feat: enhance GitHub polling with synchronous comment processing and improved logging and bug fixing

This commit is contained in:
mrT23
2024-09-05 07:42:50 +03:00
parent 65c0bc414f
commit b8d2b263b9

View File

@ -1,7 +1,10 @@
import asyncio import asyncio
import multiprocessing
from collections import deque
import traceback import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
import time
import requests
import aiohttp import aiohttp
from pr_agent.agent.pr_agent import PRAgent from pr_agent.agent.pr_agent import PRAgent
@ -24,6 +27,106 @@ def now() -> str:
now_utc = now_utc.replace("+00:00", "Z") now_utc = now_utc.replace("+00:00", "Z")
return now_utc return now_utc
async def async_handle_request(pr_url, rest_of_comment, comment_id, git_provider):
agent = PRAgent()
success = await agent.handle_request(
pr_url,
rest_of_comment,
notify=lambda: git_provider.add_eyes_reaction(comment_id)
)
return success
def run_handle_request(pr_url, rest_of_comment, comment_id, git_provider):
return asyncio.run(async_handle_request(pr_url, rest_of_comment, comment_id, git_provider))
def process_comment_sync(pr_url, rest_of_comment, comment_id):
try:
# Run the async handle_request in a separate function
git_provider = get_git_provider()(pr_url=pr_url)
success = run_handle_request(pr_url, rest_of_comment, comment_id, git_provider)
except Exception as e:
get_logger().error(f"Error processing comment: {e}", artifact={"traceback": traceback.format_exc()})
async def process_comment(pr_url, rest_of_comment, comment_id):
try:
git_provider = get_git_provider()(pr_url=pr_url)
git_provider.set_pr(pr_url)
agent = PRAgent()
success = await agent.handle_request(
pr_url,
rest_of_comment,
notify=lambda: git_provider.add_eyes_reaction(comment_id)
)
get_logger().info(f"Finished processing comment for PR: {pr_url}")
except Exception as e:
get_logger().error(f"Error processing comment: {e}", artifact={"traceback": traceback.format_exc()})
async def is_valid_notification(notification, headers, handled_ids, session, user_id):
try:
if 'reason' in notification and notification['reason'] == 'mention':
if 'subject' in notification and notification['subject']['type'] == 'PullRequest':
pr_url = notification['subject']['url']
latest_comment = notification['subject']['latest_comment_url']
if not latest_comment or not isinstance(latest_comment, str):
get_logger().debug(f"not latest_comment, but its ok")
# continue
async with session.get(latest_comment, headers=headers) as comment_response:
check_prev_comments = False
if comment_response.status == 200:
comment = await comment_response.json()
if 'id' in comment:
if comment['id'] in handled_ids:
get_logger().debug(f"comment['id'] in handled_ids")
return False, handled_ids
else:
handled_ids.add(comment['id'])
if 'user' in comment and 'login' in comment['user']:
if comment['user']['login'] == user_id:
get_logger().debug(f"comment['user']['login'] == user_id")
check_prev_comments = True
comment_body = comment.get('body', '')
if not comment_body:
get_logger().debug(f"no comment_body")
check_prev_comments = True
commenter_github_user = comment['user']['login'] \
if 'user' in comment else ''
get_logger().info(f"Polling, pr_url: {pr_url}",
artifact={"comment": comment_body})
user_tag = "@" + user_id
if user_tag not in comment_body:
get_logger().debug(f"user_tag not in comment_body")
check_prev_comments = True
if not check_prev_comments:
return True, handled_ids, comment, comment_body, pr_url, user_tag
else: # we could not find the user tag in the latest comment. Check previous comments
# get all comments in the PR
requests_url = f"{pr_url}/comments".replace("pulls", "issues")
comments_response = requests.get(requests_url, headers=headers)
comments = comments_response.json()[::-1]
max_comment_to_scan = 4
for comment in comments[:max_comment_to_scan]:
if 'user' in comment and 'login' in comment['user']:
if comment['user']['login'] == user_id:
continue
comment_body = comment.get('body', '')
if not comment_body:
continue
if user_tag in comment_body:
get_logger().info("found user tag in previous comments")
return True, handled_ids, comment, comment_body, pr_url, user_tag
get_logger().error(f"Failed to fetch comments for PR: {pr_url}")
return False, handled_ids
return False, handled_ids
except Exception as e:
get_logger().error(f"Error processing notification: {e}", artifact={"traceback": traceback.format_exc()})
return False, handled_ids
async def polling_loop(): async def polling_loop():
""" """
@ -34,7 +137,6 @@ async def polling_loop():
last_modified = [None] last_modified = [None]
git_provider = get_git_provider()() git_provider = get_git_provider()()
user_id = git_provider.get_user_id() user_id = git_provider.get_user_id()
agent = PRAgent()
get_settings().set("CONFIG.PUBLISH_OUTPUT_PROGRESS", False) get_settings().set("CONFIG.PUBLISH_OUTPUT_PROGRESS", False)
get_settings().set("pr_description.publish_description_as_comment", True) get_settings().set("pr_description.publish_description_as_comment", True)
@ -53,7 +155,8 @@ async def polling_loop():
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
while True: while True:
try: try:
await asyncio.sleep(5) await asyncio.sleep(3)
get_logger().info("Polling for notifications")
headers = { headers = {
"Accept": "application/vnd.github.v3+json", "Accept": "application/vnd.github.v3+json",
"Authorization": f"Bearer {token}" "Authorization": f"Bearer {token}"
@ -74,43 +177,38 @@ async def polling_loop():
notifications = await response.json() notifications = await response.json()
if not notifications: if not notifications:
continue continue
get_logger().info(f"Received {len(notifications)} notifications")
task_queue = deque()
for notification in notifications: for notification in notifications:
# mark notification as read
await mark_notification_as_read(headers, notification, session)
handled_ids.add(notification['id']) handled_ids.add(notification['id'])
if 'reason' in notification and notification['reason'] == 'mention': output = await is_valid_notification(notification, headers, handled_ids, session, user_id)
if 'subject' in notification and notification['subject']['type'] == 'PullRequest': if output[0]:
pr_url = notification['subject']['url'] _, handled_ids, comment, comment_body, pr_url, user_tag = output
latest_comment = notification['subject']['latest_comment_url'] rest_of_comment = comment_body.split(user_tag)[1].strip()
if not latest_comment or not isinstance(latest_comment, str): comment_id = comment['id']
continue
async with session.get(latest_comment, headers=headers) as comment_response: # Add to the task queue
if comment_response.status == 200: get_logger().info(
comment = await comment_response.json() f"Adding comment processing to task queue for PR, {pr_url}, comment_body: {comment_body}")
if 'id' in comment: task_queue.append((process_comment_sync, (pr_url, rest_of_comment, comment_id)))
if comment['id'] in handled_ids: get_logger().info(f"Queued comment processing for PR: {pr_url}")
continue else:
else: get_logger().debug(f"Skipping comment processing for PR: {pr_url}")
handled_ids.add(comment['id'])
if 'user' in comment and 'login' in comment['user']: if task_queue:
if comment['user']['login'] == user_id: processes = []
continue for func, args in task_queue: # Create parallel tasks
comment_body = comment.get('body', '') p = multiprocessing.Process(target=func, args=args)
if not comment_body: processes.append(p)
continue p.start()
commenter_github_user = comment['user']['login'] \ task_queue.clear()
if 'user' in comment else ''
get_logger().info(f"Polling, pr_url: {pr_url}", # Dont wait for all processes to complete. Move on to the next iteration
artifact={"comment": comment_body}) # for p in processes:
user_tag = "@" + user_id # p.join()
if user_tag not in comment_body:
continue
rest_of_comment = comment_body.split(user_tag)[1].strip()
comment_id = comment['id']
git_provider.set_pr(pr_url)
success = await agent.handle_request(pr_url, rest_of_comment,
notify=lambda: git_provider.add_eyes_reaction(
comment_id)) # noqa E501
if not success:
git_provider.set_pr(pr_url)
elif response.status != 304: elif response.status != 304:
print(f"Failed to fetch notifications. Status code: {response.status}") print(f"Failed to fetch notifications. Status code: {response.status}")
@ -120,5 +218,18 @@ async def polling_loop():
artifact={"traceback": traceback.format_exc()}) artifact={"traceback": traceback.format_exc()})
async def mark_notification_as_read(headers, notification, session):
async with session.patch(
f"https://api.github.com/notifications/threads/{notification['id']}",
headers=headers) as mark_read_response:
if mark_read_response.status != 205:
get_logger().error(
f"Failed to mark notification as read. Status code: {mark_read_response.status}")
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(polling_loop()) asyncio.run(polling_loop())
# # Example usage
# task_queue = deque([lambda: print("Task executed1"), lambda: print("Task executed2")])
# asyncio.run(background_task_manager(task_queue))