diff --git a/docs/docs/installation/github.md b/docs/docs/installation/github.md index 3eeace4f..69b34b8a 100644 --- a/docs/docs/installation/github.md +++ b/docs/docs/installation/github.md @@ -203,6 +203,28 @@ For example: `GITHUB.WEBHOOK_SECRET` --> `GITHUB__WEBHOOK_SECRET` 7. Go back to steps 8-9 of [Method 5](#run-as-a-github-app) with the function url as your Webhook URL. The Webhook URL would look like `https:///api/v1/github_webhooks` +### Using AWS Secrets Manager + +For production Lambda deployments, use AWS Secrets Manager instead of environment variables: + +1. Create a secret in AWS Secrets Manager with JSON format like this: + +```json +{ + "openai.key": "sk-proj-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "github.webhook_secret": "your-webhook-secret-from-step-2", + "github.private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA...\n-----END RSA PRIVATE KEY-----" +} +``` + +2. Add IAM permission `secretsmanager:GetSecretValue` to your Lambda execution role +3. Set these environment variables in your Lambda: + +```bash +AWS_SECRETS_MANAGER__SECRET_ARN=arn:aws:secretsmanager:us-east-1:123456789012:secret:pr-agent-secrets-AbCdEf +CONFIG__SECRET_PROVIDER=aws_secrets_manager +``` + --- ## AWS CodeCommit Setup diff --git a/docs/docs/usage-guide/additional_configurations.md b/docs/docs/usage-guide/additional_configurations.md index 9f9202f6..8d205865 100644 --- a/docs/docs/usage-guide/additional_configurations.md +++ b/docs/docs/usage-guide/additional_configurations.md @@ -249,4 +249,4 @@ ignore_pr_authors = ["my-special-bot-user", ...] Where the `ignore_pr_authors` is a list of usernames that you want to ignore. !!! note - There is one specific case where bots will receive an automatic response - when they generated a PR with a _failed test_. In that case, the [`ci_feedback`](https://qodo-merge-docs.qodo.ai/tools/ci_feedback/) tool will be invoked. \ No newline at end of file + There is one specific case where bots will receive an automatic response - when they generated a PR with a _failed test_. In that case, the [`ci_feedback`](https://qodo-merge-docs.qodo.ai/tools/ci_feedback/) tool will be invoked. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 6210ed1c..74f98fb8 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -16,6 +16,7 @@ nav: - Introduction: 'usage-guide/introduction.md' - Enabling a Wiki: 'usage-guide/enabling_a_wiki.md' - Configuration File: 'usage-guide/configuration_options.md' + - AWS Secrets Manager: 'usage-guide/aws_secrets_manager.md' - Usage and Automation: 'usage-guide/automations_and_usage.md' - Managing Mail Notifications: 'usage-guide/mail_notifications.md' - Changing a Model: 'usage-guide/changing_a_model.md' diff --git a/pr_agent/config_loader.py b/pr_agent/config_loader.py index 7a62adec..f525d893 100644 --- a/pr_agent/config_loader.py +++ b/pr_agent/config_loader.py @@ -81,3 +81,62 @@ def _find_pyproject() -> Optional[Path]: pyproject_path = _find_pyproject() if pyproject_path is not None: get_settings().load_file(pyproject_path, env=f'tool.{PR_AGENT_TOML_KEY}') + + +def apply_secrets_manager_config(): + """ + Retrieve configuration from AWS Secrets Manager and override existing settings + """ + try: + # Dynamic imports to avoid circular dependency (secret_providers imports config_loader) + from pr_agent.secret_providers import get_secret_provider + from pr_agent.log import get_logger + + secret_provider = get_secret_provider() + if not secret_provider: + return + + if (hasattr(secret_provider, 'get_all_secrets') and + get_settings().get("CONFIG.SECRET_PROVIDER") == 'aws_secrets_manager'): + try: + secrets = secret_provider.get_all_secrets() + if secrets: + apply_secrets_to_config(secrets) + get_logger().info("Applied AWS Secrets Manager configuration") + except Exception as e: + get_logger().error(f"Failed to apply AWS Secrets Manager config: {e}") + except Exception as e: + try: + from pr_agent.log import get_logger + get_logger().debug(f"Secret provider not configured: {e}") + except: + # Fail completely silently if log module is not available + pass + + +def apply_secrets_to_config(secrets: dict): + """ + Apply secret dictionary to configuration + """ + try: + # Dynamic import to avoid potential circular dependency + from pr_agent.log import get_logger + except: + def get_logger(): + class DummyLogger: + def debug(self, msg): pass + return DummyLogger() + + for key, value in secrets.items(): + if '.' in key: # nested key like "openai.key" + parts = key.split('.') + if len(parts) == 2: + section, setting = parts + section_upper = section.upper() + setting_upper = setting.upper() + + # Set only when no existing value (prioritize environment variables) + current_value = get_settings().get(f"{section_upper}.{setting_upper}") + if current_value is None or current_value == "": + get_settings().set(f"{section_upper}.{setting_upper}", value) + get_logger().debug(f"Set {section}.{setting} from AWS Secrets Manager") diff --git a/pr_agent/secret_providers/__init__.py b/pr_agent/secret_providers/__init__.py index c9faf480..204872e2 100644 --- a/pr_agent/secret_providers/__init__.py +++ b/pr_agent/secret_providers/__init__.py @@ -13,5 +13,12 @@ def get_secret_provider(): return GoogleCloudStorageSecretProvider() except Exception as e: raise ValueError(f"Failed to initialize google_cloud_storage secret provider {provider_id}") from e + elif provider_id == 'aws_secrets_manager': + try: + from pr_agent.secret_providers.aws_secrets_manager_provider import \ + AWSSecretsManagerProvider + return AWSSecretsManagerProvider() + except Exception as e: + raise ValueError(f"Failed to initialize aws_secrets_manager secret provider {provider_id}") from e else: raise ValueError("Unknown SECRET_PROVIDER") diff --git a/pr_agent/secret_providers/aws_secrets_manager_provider.py b/pr_agent/secret_providers/aws_secrets_manager_provider.py new file mode 100644 index 00000000..599369db --- /dev/null +++ b/pr_agent/secret_providers/aws_secrets_manager_provider.py @@ -0,0 +1,57 @@ +import json +import boto3 +from botocore.exceptions import ClientError + +from pr_agent.config_loader import get_settings +from pr_agent.log import get_logger +from pr_agent.secret_providers.secret_provider import SecretProvider + + +class AWSSecretsManagerProvider(SecretProvider): + def __init__(self): + try: + region_name = get_settings().get("aws_secrets_manager.region_name") or \ + get_settings().get("aws.AWS_REGION_NAME") + if region_name: + self.client = boto3.client('secretsmanager', region_name=region_name) + else: + self.client = boto3.client('secretsmanager') + + self.secret_arn = get_settings().get("aws_secrets_manager.secret_arn") + if not self.secret_arn: + raise ValueError("AWS Secrets Manager ARN is not configured") + except Exception as e: + get_logger().error(f"Failed to initialize AWS Secrets Manager Provider: {e}") + raise e + + def get_secret(self, secret_name: str) -> str: + """ + Retrieve individual secret by name (for webhook tokens) + """ + try: + response = self.client.get_secret_value(SecretId=secret_name) + return response['SecretString'] + except Exception as e: + get_logger().warning(f"Failed to get secret {secret_name} from AWS Secrets Manager: {e}") + return "" + + def get_all_secrets(self) -> dict: + """ + Retrieve all secrets for configuration override + """ + try: + response = self.client.get_secret_value(SecretId=self.secret_arn) + return json.loads(response['SecretString']) + except Exception as e: + get_logger().error(f"Failed to get secrets from AWS Secrets Manager {self.secret_arn}: {e}") + return {} + + def store_secret(self, secret_name: str, secret_value: str): + try: + self.client.put_secret_value( + SecretId=secret_name, + SecretString=secret_value + ) + except Exception as e: + get_logger().error(f"Failed to store secret {secret_name} in AWS Secrets Manager: {e}") + raise e diff --git a/pr_agent/servers/serverless.py b/pr_agent/servers/serverless.py index a46eb80a..938be31b 100644 --- a/pr_agent/servers/serverless.py +++ b/pr_agent/servers/serverless.py @@ -5,6 +5,17 @@ from starlette_context.middleware import RawContextMiddleware from pr_agent.servers.github_app import router +try: + from pr_agent.config_loader import apply_secrets_manager_config + apply_secrets_manager_config() +except Exception as e: + try: + from pr_agent.log import get_logger + get_logger().debug(f"AWS Secrets Manager initialization failed, falling back to environment variables: {e}") + except: + # Fail completely silently if log module is not available + pass + middleware = [Middleware(RawContextMiddleware)] app = FastAPI(middleware=middleware) app.include_router(router) diff --git a/pr_agent/settings/.secrets_template.toml b/pr_agent/settings/.secrets_template.toml index 460711cb..350abe5c 100644 --- a/pr_agent/settings/.secrets_template.toml +++ b/pr_agent/settings/.secrets_template.toml @@ -121,4 +121,8 @@ api_base = "" [aws] AWS_ACCESS_KEY_ID = "" AWS_SECRET_ACCESS_KEY = "" -AWS_REGION_NAME = "" \ No newline at end of file +AWS_REGION_NAME = "" + +[aws_secrets_manager] +secret_arn = "" # The ARN of the AWS Secrets Manager secret containing PR-Agent configuration +region_name = "" # Optional: specific AWS region (defaults to AWS_REGION_NAME or Lambda region) diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml index cdb6d5b9..a93ea1f2 100644 --- a/pr_agent/settings/configuration.toml +++ b/pr_agent/settings/configuration.toml @@ -39,7 +39,7 @@ allow_dynamic_context=true max_extra_lines_before_dynamic_context = 10 # will try to include up to 10 extra lines before the hunk in the patch, until we reach an enclosing function or class patch_extra_lines_before = 5 # Number of extra lines (+3 default ones) to include before each hunk in the patch patch_extra_lines_after = 1 # Number of extra lines (+3 default ones) to include after each hunk in the patch -secret_provider="" +secret_provider="" # "" (disabled), "google_cloud_storage", or "aws_secrets_manager" for secure secret management cli_mode=false ai_disclaimer_title="" # Pro feature, title for a collapsible disclaimer to AI outputs ai_disclaimer="" # Pro feature, full text for the AI disclaimer diff --git a/tests/unittest/test_aws_secrets_manager_provider.py b/tests/unittest/test_aws_secrets_manager_provider.py new file mode 100644 index 00000000..f84743ca --- /dev/null +++ b/tests/unittest/test_aws_secrets_manager_provider.py @@ -0,0 +1,89 @@ +import json +import pytest +from unittest.mock import MagicMock, patch +from botocore.exceptions import ClientError + +from pr_agent.secret_providers.aws_secrets_manager_provider import AWSSecretsManagerProvider + + +class TestAWSSecretsManagerProvider: + + def _provider(self): + """Create provider following existing pattern""" + with patch('pr_agent.secret_providers.aws_secrets_manager_provider.get_settings') as mock_get_settings, \ + patch('pr_agent.secret_providers.aws_secrets_manager_provider.boto3.client') as mock_boto3_client: + + settings = MagicMock() + settings.get.side_effect = lambda k, d=None: { + 'aws_secrets_manager.secret_arn': 'arn:aws:secretsmanager:us-east-1:123456789012:secret:test-secret', + 'aws_secrets_manager.region_name': 'us-east-1', + 'aws.AWS_REGION_NAME': 'us-east-1' + }.get(k, d) + settings.aws_secrets_manager.secret_arn = 'arn:aws:secretsmanager:us-east-1:123456789012:secret:test-secret' + mock_get_settings.return_value = settings + + # Mock boto3 client + mock_client = MagicMock() + mock_boto3_client.return_value = mock_client + + provider = AWSSecretsManagerProvider() + provider.client = mock_client # Set client directly for testing + return provider, mock_client + + # Positive test cases + def test_get_secret_success(self): + provider, mock_client = self._provider() + mock_client.get_secret_value.return_value = {'SecretString': 'test-secret-value'} + + result = provider.get_secret('test-secret-name') + assert result == 'test-secret-value' + mock_client.get_secret_value.assert_called_once_with(SecretId='test-secret-name') + + def test_get_all_secrets_success(self): + provider, mock_client = self._provider() + secret_data = {'openai.key': 'sk-test', 'github.webhook_secret': 'webhook-secret'} + mock_client.get_secret_value.return_value = {'SecretString': json.dumps(secret_data)} + + result = provider.get_all_secrets() + assert result == secret_data + + # Negative test cases (following Google Cloud Storage pattern) + def test_get_secret_failure(self): + provider, mock_client = self._provider() + mock_client.get_secret_value.side_effect = Exception("AWS error") + + result = provider.get_secret('nonexistent-secret') + assert result == "" # Confirm empty string is returned + + def test_get_all_secrets_failure(self): + provider, mock_client = self._provider() + mock_client.get_secret_value.side_effect = Exception("AWS error") + + result = provider.get_all_secrets() + assert result == {} # Confirm empty dictionary is returned + + def test_store_secret_update_existing(self): + provider, mock_client = self._provider() + mock_client.update_secret.return_value = {} + + provider.store_secret('test-secret', 'test-value') + mock_client.update_secret.assert_called_once_with( + SecretId='test-secret', + SecretString='test-value' + ) + + def test_init_failure_invalid_config(self): + with patch('pr_agent.secret_providers.aws_secrets_manager_provider.get_settings') as mock_get_settings: + settings = MagicMock() + settings.aws_secrets_manager.secret_arn = None # Configuration error + mock_get_settings.return_value = settings + + with pytest.raises(Exception): + AWSSecretsManagerProvider() + + def test_store_secret_failure(self): + provider, mock_client = self._provider() + mock_client.update_secret.side_effect = Exception("AWS error") + + with pytest.raises(Exception): + provider.store_secret('test-secret', 'test-value') diff --git a/tests/unittest/test_config_loader_secrets.py b/tests/unittest/test_config_loader_secrets.py new file mode 100644 index 00000000..d0eb3c62 --- /dev/null +++ b/tests/unittest/test_config_loader_secrets.py @@ -0,0 +1,120 @@ +import pytest +from unittest.mock import MagicMock, patch + +from pr_agent.config_loader import apply_secrets_manager_config, apply_secrets_to_config + + +class TestConfigLoaderSecrets: + + def test_apply_secrets_manager_config_success(self): + with patch('pr_agent.config_loader.get_secret_provider') as mock_get_provider, \ + patch('pr_agent.config_loader.apply_secrets_to_config') as mock_apply_secrets, \ + patch('pr_agent.config_loader.get_settings') as mock_get_settings: + + # Mock secret provider + mock_provider = MagicMock() + mock_provider.get_all_secrets.return_value = {'openai.key': 'sk-test'} + mock_get_provider.return_value = mock_provider + + # Mock settings + settings = MagicMock() + settings.get.return_value = "aws_secrets_manager" + mock_get_settings.return_value = settings + + apply_secrets_manager_config() + + mock_apply_secrets.assert_called_once_with({'openai.key': 'sk-test'}) + + def test_apply_secrets_manager_config_no_provider(self): + with patch('pr_agent.config_loader.get_secret_provider') as mock_get_provider: + mock_get_provider.return_value = None + + # Confirm no exception is raised + apply_secrets_manager_config() + + def test_apply_secrets_manager_config_not_aws(self): + with patch('pr_agent.config_loader.get_secret_provider') as mock_get_provider, \ + patch('pr_agent.config_loader.get_settings') as mock_get_settings: + + # Mock Google Cloud Storage provider + mock_provider = MagicMock() + mock_get_provider.return_value = mock_provider + + # Mock settings (Google Cloud Storage) + settings = MagicMock() + settings.get.return_value = "google_cloud_storage" + mock_get_settings.return_value = settings + + # Confirm execution is skipped for non-AWS Secrets Manager + apply_secrets_manager_config() + + # Confirm get_all_secrets is not called + assert not hasattr(mock_provider, 'get_all_secrets') or \ + not mock_provider.get_all_secrets.called + + def test_apply_secrets_to_config_nested_keys(self): + with patch('pr_agent.config_loader.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = None # No existing value + settings.set = MagicMock() + mock_get_settings.return_value = settings + + secrets = { + 'openai.key': 'sk-test', + 'github.webhook_secret': 'webhook-secret' + } + + apply_secrets_to_config(secrets) + + # Confirm settings are applied correctly + settings.set.assert_any_call('OPENAI.KEY', 'sk-test') + settings.set.assert_any_call('GITHUB.WEBHOOK_SECRET', 'webhook-secret') + + def test_apply_secrets_to_config_existing_value_preserved(self): + with patch('pr_agent.config_loader.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = "existing-value" # Existing value present + settings.set = MagicMock() + mock_get_settings.return_value = settings + + secrets = {'openai.key': 'sk-test'} + + apply_secrets_to_config(secrets) + + # Confirm settings are not overridden when existing value present + settings.set.assert_not_called() + + def test_apply_secrets_to_config_single_key(self): + with patch('pr_agent.config_loader.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = None + settings.set = MagicMock() + mock_get_settings.return_value = settings + + secrets = {'simple_key': 'simple_value'} + + apply_secrets_to_config(secrets) + + # Confirm non-dot notation keys are ignored + settings.set.assert_not_called() + + def test_apply_secrets_to_config_multiple_dots(self): + with patch('pr_agent.config_loader.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = None + settings.set = MagicMock() + mock_get_settings.return_value = settings + + secrets = {'section.subsection.key': 'value'} + + apply_secrets_to_config(secrets) + + # Confirm keys with multiple dots are ignored + settings.set.assert_not_called() + + def test_apply_secrets_manager_config_exception_handling(self): + with patch('pr_agent.config_loader.get_secret_provider') as mock_get_provider: + mock_get_provider.side_effect = Exception("Provider error") + + # Confirm processing continues even when exception occurs + apply_secrets_manager_config() # Confirm no exception is raised diff --git a/tests/unittest/test_secret_provider_factory.py b/tests/unittest/test_secret_provider_factory.py new file mode 100644 index 00000000..98a1bfed --- /dev/null +++ b/tests/unittest/test_secret_provider_factory.py @@ -0,0 +1,69 @@ +import pytest +from unittest.mock import MagicMock, patch + +from pr_agent.secret_providers import get_secret_provider + + +class TestSecretProviderFactory: + + def test_get_secret_provider_none_when_not_configured(self): + with patch('pr_agent.secret_providers.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = None + mock_get_settings.return_value = settings + + result = get_secret_provider() + assert result is None + + def test_get_secret_provider_google_cloud_storage(self): + with patch('pr_agent.secret_providers.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = "google_cloud_storage" + settings.config.secret_provider = "google_cloud_storage" + mock_get_settings.return_value = settings + + with patch('pr_agent.secret_providers.google_cloud_storage_secret_provider.GoogleCloudStorageSecretProvider') as MockProvider: + mock_instance = MagicMock() + MockProvider.return_value = mock_instance + + result = get_secret_provider() + assert result is mock_instance + MockProvider.assert_called_once() + + def test_get_secret_provider_aws_secrets_manager(self): + with patch('pr_agent.secret_providers.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = "aws_secrets_manager" + settings.config.secret_provider = "aws_secrets_manager" + mock_get_settings.return_value = settings + + with patch('pr_agent.secret_providers.aws_secrets_manager_provider.AWSSecretsManagerProvider') as MockProvider: + mock_instance = MagicMock() + MockProvider.return_value = mock_instance + + result = get_secret_provider() + assert result is mock_instance + MockProvider.assert_called_once() + + def test_get_secret_provider_unknown_provider(self): + with patch('pr_agent.secret_providers.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = "unknown_provider" + settings.config.secret_provider = "unknown_provider" + mock_get_settings.return_value = settings + + with pytest.raises(ValueError, match="Unknown SECRET_PROVIDER"): + get_secret_provider() + + def test_get_secret_provider_initialization_error(self): + with patch('pr_agent.secret_providers.get_settings') as mock_get_settings: + settings = MagicMock() + settings.get.return_value = "aws_secrets_manager" + settings.config.secret_provider = "aws_secrets_manager" + mock_get_settings.return_value = settings + + with patch('pr_agent.secret_providers.aws_secrets_manager_provider.AWSSecretsManagerProvider') as MockProvider: + MockProvider.side_effect = Exception("Initialization failed") + + with pytest.raises(ValueError, match="Failed to initialize aws_secrets_manager secret provider"): + get_secret_provider()