From 5e82d0a3168e066fd054ab2f4e78aa6c240e2365 Mon Sep 17 00:00:00 2001 From: Makonike Date: Tue, 8 Jul 2025 11:51:30 +0800 Subject: [PATCH 1/7] feat: add streaming support for openai/qwq-plus model --- pr_agent/algo/__init__.py | 6 ++ .../algo/ai_handlers/litellm_ai_handler.py | 65 +++++++++++++++---- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/pr_agent/algo/__init__.py b/pr_agent/algo/__init__.py index ed9edddc..0de83548 100644 --- a/pr_agent/algo/__init__.py +++ b/pr_agent/algo/__init__.py @@ -45,6 +45,7 @@ MAX_TOKENS = { 'command-nightly': 4096, 'deepseek/deepseek-chat': 128000, # 128K, but may be limited by config.max_model_tokens 'deepseek/deepseek-reasoner': 64000, # 64K, but may be limited by config.max_model_tokens + 'openai/qwq-plus': 131072, # 131K context length, but may be limited by config.max_model_tokens 'replicate/llama-2-70b-chat:2c1608e18606fad2812020dc541930f2d0495ce32eee50074220b87300bc16e1': 4096, 'meta-llama/Llama-2-7b-chat-hf': 4096, 'vertex_ai/codechat-bison': 6144, @@ -193,3 +194,8 @@ CLAUDE_EXTENDED_THINKING_MODELS = [ "anthropic/claude-3-7-sonnet-20250219", "claude-3-7-sonnet-20250219" ] + +# Models that require streaming mode +STREAMING_REQUIRED_MODELS = [ + "openai/qwq-plus" +] diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index ec96d952..59a00045 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -5,7 +5,7 @@ import requests from litellm import acompletion from tenacity import retry, retry_if_exception_type, retry_if_not_exception_type, stop_after_attempt -from pr_agent.algo import CLAUDE_EXTENDED_THINKING_MODELS, NO_SUPPORT_TEMPERATURE_MODELS, SUPPORT_REASONING_EFFORT_MODELS, USER_MESSAGE_ONLY_MODELS +from pr_agent.algo import CLAUDE_EXTENDED_THINKING_MODELS, NO_SUPPORT_TEMPERATURE_MODELS, SUPPORT_REASONING_EFFORT_MODELS, USER_MESSAGE_ONLY_MODELS, STREAMING_REQUIRED_MODELS from pr_agent.algo.ai_handlers.base_ai_handler import BaseAiHandler from pr_agent.algo.utils import ReasoningEffort, get_version from pr_agent.config_loader import get_settings @@ -143,6 +143,9 @@ class LiteLLMAIHandler(BaseAiHandler): # Models that support extended thinking self.claude_extended_thinking_models = CLAUDE_EXTENDED_THINKING_MODELS + # Models that require streaming + self.streaming_required_models = STREAMING_REQUIRED_MODELS + def _get_azure_ad_token(self): """ Generates an access token using Azure AD credentials from settings. @@ -370,7 +373,21 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().info(f"\nSystem prompt:\n{system}") get_logger().info(f"\nUser prompt:\n{user}") - response = await acompletion(**kwargs) + # Check if model requires streaming + if model in self.streaming_required_models: + kwargs["stream"] = True + get_logger().info(f"Using streaming mode for model {model}") + response = await acompletion(**kwargs) + # Handle streaming response + resp, finish_reason = await self._handle_streaming_response(response) + else: + response = await acompletion(**kwargs) + # Handle non-streaming response + if response is None or len(response["choices"]) == 0: + raise openai.APIError + resp = response["choices"][0]['message']['content'] + finish_reason = response["choices"][0]["finish_reason"] + except openai.RateLimitError as e: get_logger().error(f"Rate limit error during LLM inference: {e}") raise @@ -380,19 +397,43 @@ class LiteLLMAIHandler(BaseAiHandler): except Exception as e: get_logger().warning(f"Unknown error during LLM inference: {e}") raise openai.APIError from e - if response is None or len(response["choices"]) == 0: - raise openai.APIError - else: - resp = response["choices"][0]['message']['content'] - finish_reason = response["choices"][0]["finish_reason"] - get_logger().debug(f"\nAI response:\n{resp}") - # log the full response for debugging + get_logger().debug(f"\nAI response:\n{resp}") + + # log the full response for debugging + if not (model in self.streaming_required_models): response_log = self.prepare_logs(response, system, user, resp, finish_reason) get_logger().debug("Full_response", artifact=response_log) - # for CLI debugging - if get_settings().config.verbosity_level >= 2: - get_logger().info(f"\nAI response:\n{resp}") + # for CLI debugging + if get_settings().config.verbosity_level >= 2: + get_logger().info(f"\nAI response:\n{resp}") return resp, finish_reason + + async def _handle_streaming_response(self, response): + """ + Handle streaming response from acompletion and collect the full response. + + Args: + response: The streaming response object from acompletion + + Returns: + tuple: (full_response_content, finish_reason) + """ + full_response = "" + finish_reason = None + + try: + async for chunk in response: + if chunk.choices and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + if hasattr(delta, 'content') and delta.content: + full_response += delta.content + if chunk.choices[0].finish_reason: + finish_reason = chunk.choices[0].finish_reason + except Exception as e: + get_logger().error(f"Error handling streaming response: {e}") + raise + + return full_response, finish_reason From 2d8bee0d6d3984fe2b234ec65068e377112c28bc Mon Sep 17 00:00:00 2001 From: Makonike Date: Wed, 9 Jul 2025 15:04:18 +0800 Subject: [PATCH 2/7] feat: add validation for empty streaming responses in LiteLLM handler --- pr_agent/algo/ai_handlers/litellm_ai_handler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index 59a00045..2b914c9f 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -436,4 +436,8 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().error(f"Error handling streaming response: {e}") raise + if not full_response: + get_logger().warning("Streaming response resulted in empty content") + raise openai.APIError("Empty streaming response received") + return full_response, finish_reason From 85e1e2d4ee1b806cff25e6309355194b593e530a Mon Sep 17 00:00:00 2001 From: Makonike Date: Wed, 9 Jul 2025 15:29:03 +0800 Subject: [PATCH 3/7] feat: add debug logging support for streaming models --- .../algo/ai_handlers/litellm_ai_handler.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index 2b914c9f..787575c9 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -15,6 +15,23 @@ import json OPENAI_RETRIES = 5 +class MockResponse: + """Mock response object for streaming models to enable consistent logging.""" + + def __init__(self, resp, finish_reason): + self._data = { + "choices": [ + { + "message": {"content": resp}, + "finish_reason": finish_reason + } + ] + } + + def dict(self): + return self._data + + class LiteLLMAIHandler(BaseAiHandler): """ This class handles interactions with the OpenAI API for chat completions. @@ -401,9 +418,13 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().debug(f"\nAI response:\n{resp}") # log the full response for debugging - if not (model in self.streaming_required_models): + if model in self.streaming_required_models: + # for streaming, we don't have the full response object, so we create a mock one + mock_response = MockResponse(resp, finish_reason) + response_log = self.prepare_logs(mock_response, system, user, resp, finish_reason) + else: response_log = self.prepare_logs(response, system, user, resp, finish_reason) - get_logger().debug("Full_response", artifact=response_log) + get_logger().debug("Full_response", artifact=response_log) # for CLI debugging if get_settings().config.verbosity_level >= 2: From 31e25a596577615c998342950282186c3bfb149d Mon Sep 17 00:00:00 2001 From: Makonike Date: Wed, 9 Jul 2025 15:39:15 +0800 Subject: [PATCH 4/7] refactor(ai_handler): improve streaming response handling robustness --- pr_agent/algo/ai_handlers/litellm_ai_handler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index 787575c9..32d1420c 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -448,11 +448,13 @@ class LiteLLMAIHandler(BaseAiHandler): try: async for chunk in response: if chunk.choices and len(chunk.choices) > 0: - delta = chunk.choices[0].delta - if hasattr(delta, 'content') and delta.content: - full_response += delta.content - if chunk.choices[0].finish_reason: - finish_reason = chunk.choices[0].finish_reason + choice = chunk.choices[0] + delta = choice.delta + content = getattr(delta, 'content', None) + if content: + full_response += content + if choice.finish_reason: + finish_reason = choice.finish_reason except Exception as e: get_logger().error(f"Error handling streaming response: {e}") raise From 74df3f8bd58d2c52c287f36b5717e5067868aabc Mon Sep 17 00:00:00 2001 From: Makonike Date: Thu, 10 Jul 2025 15:14:25 +0800 Subject: [PATCH 5/7] fix(ai_handler): improve empty streaming response validation logic --- pr_agent/algo/ai_handlers/litellm_ai_handler.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index 32d1420c..596ff7f9 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -459,8 +459,10 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().error(f"Error handling streaming response: {e}") raise - if not full_response: - get_logger().warning("Streaming response resulted in empty content") - raise openai.APIError("Empty streaming response received") + if not full_response and finish_reason is None: + get_logger().warning("Streaming response resulted in empty content with no finish reason") + raise openai.APIError("Empty streaming response received without proper completion") + elif not full_response and finish_reason: + get_logger().debug(f"Streaming response resulted in empty content but completed with finish_reason: {finish_reason}") return full_response, finish_reason From 11fb6ccc7e03b6060d312f7aa73add2af0fba3a8 Mon Sep 17 00:00:00 2001 From: Makonike Date: Sun, 13 Jul 2025 22:37:14 +0800 Subject: [PATCH 6/7] refactor(ai_handler): compact streaming path to reduce main flow impact --- .../algo/ai_handlers/litellm_ai_handler.py | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index 596ff7f9..380a7893 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -390,20 +390,8 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().info(f"\nSystem prompt:\n{system}") get_logger().info(f"\nUser prompt:\n{user}") - # Check if model requires streaming - if model in self.streaming_required_models: - kwargs["stream"] = True - get_logger().info(f"Using streaming mode for model {model}") - response = await acompletion(**kwargs) - # Handle streaming response - resp, finish_reason = await self._handle_streaming_response(response) - else: - response = await acompletion(**kwargs) - # Handle non-streaming response - if response is None or len(response["choices"]) == 0: - raise openai.APIError - resp = response["choices"][0]['message']['content'] - finish_reason = response["choices"][0]["finish_reason"] + # Get completion with automatic streaming detection + resp, finish_reason, response_obj = await self._get_completion(model, **kwargs) except openai.RateLimitError as e: get_logger().error(f"Rate limit error during LLM inference: {e}") @@ -418,12 +406,7 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().debug(f"\nAI response:\n{resp}") # log the full response for debugging - if model in self.streaming_required_models: - # for streaming, we don't have the full response object, so we create a mock one - mock_response = MockResponse(resp, finish_reason) - response_log = self.prepare_logs(mock_response, system, user, resp, finish_reason) - else: - response_log = self.prepare_logs(response, system, user, resp, finish_reason) + response_log = self.prepare_logs(response_obj, system, user, resp, finish_reason) get_logger().debug("Full_response", artifact=response_log) # for CLI debugging @@ -466,3 +449,23 @@ class LiteLLMAIHandler(BaseAiHandler): get_logger().debug(f"Streaming response resulted in empty content but completed with finish_reason: {finish_reason}") return full_response, finish_reason + + async def _get_completion(self, model, **kwargs): + """ + Wrapper that automatically handles streaming for required models. + """ + if model in self.streaming_required_models: + kwargs["stream"] = True + get_logger().info(f"Using streaming mode for model {model}") + response = await acompletion(**kwargs) + resp, finish_reason = await self._handle_streaming_response(response) + # Create MockResponse for streaming since we don't have the full response object + mock_response = MockResponse(resp, finish_reason) + return resp, finish_reason, mock_response + else: + response = await acompletion(**kwargs) + if response is None or len(response["choices"]) == 0: + raise openai.APIError + return (response["choices"][0]['message']['content'], + response["choices"][0]["finish_reason"], + response) From 8c7680d85ddef4bf77620acd0c77a2d373ce243e Mon Sep 17 00:00:00 2001 From: Makonike Date: Sun, 13 Jul 2025 22:57:43 +0800 Subject: [PATCH 7/7] refactor(ai_handler): add a return statement or raise an exception in the elif branch --- pr_agent/algo/ai_handlers/litellm_ai_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pr_agent/algo/ai_handlers/litellm_ai_handler.py b/pr_agent/algo/ai_handlers/litellm_ai_handler.py index 380a7893..ad9747be 100644 --- a/pr_agent/algo/ai_handlers/litellm_ai_handler.py +++ b/pr_agent/algo/ai_handlers/litellm_ai_handler.py @@ -447,7 +447,7 @@ class LiteLLMAIHandler(BaseAiHandler): raise openai.APIError("Empty streaming response received without proper completion") elif not full_response and finish_reason: get_logger().debug(f"Streaming response resulted in empty content but completed with finish_reason: {finish_reason}") - + raise openai.APIError(f"Streaming response completed with finish_reason '{finish_reason}' but no content received") return full_response, finish_reason async def _get_completion(self, model, **kwargs):