From d141e4ff9d9b8bedc1c5be0a94ea9f8f46e3f7ce Mon Sep 17 00:00:00 2001 From: sharoneyal Date: Thu, 30 Oct 2025 11:56:46 +0200 Subject: [PATCH] Set a custom toml loader for Dynaconf (#2087) --- pr_agent/config_loader.py | 11 ++- pr_agent/custom_merge_loader.py | 167 ++++++++++++++++++++++++++++++++ pr_agent/git_providers/utils.py | 9 +- pr_agent/tools/pr_config.py | 20 +++- 4 files changed, 202 insertions(+), 5 deletions(-) create mode 100644 pr_agent/custom_merge_loader.py diff --git a/pr_agent/config_loader.py b/pr_agent/config_loader.py index f19f1067..ac7343f2 100644 --- a/pr_agent/config_loader.py +++ b/pr_agent/config_loader.py @@ -8,9 +8,15 @@ from starlette_context import context PR_AGENT_TOML_KEY = 'pr-agent' current_dir = dirname(abspath(__file__)) + +dynconf_kwargs = {'core_loaders': [], # DISABLE default loaders, otherwise will load toml files more than once. + 'loaders': ['pr_agent.custom_merge_loader', 'dynaconf.loaders.env_loader'], # Use a custom loader to merge sections, but overwrite their overlapping values. Also support ENV variables to take precedence. + 'root_path': join(current_dir, "settings"), #Used for Dynaconf.find_file() - So that root path points to settings folder, since we disabled all core loaders. + 'merge_enabled': True # In case more than one file is sent, merge them. Must be set to True, otherwise, a .toml file with section [XYZ] overwrites the entire section of a previous .toml file's [XYZ] and we want it to only overwrite the overlapping fields under such section + } global_settings = Dynaconf( envvar_prefix=False, - merge_enabled=True, + load_dotenv=False, # Security: Don't load .env files settings_files=[join(current_dir, f) for f in [ "settings/configuration.toml", "settings/ignore.toml", @@ -33,7 +39,8 @@ global_settings = Dynaconf( "settings/pr_help_docs_headings_prompts.toml", "settings/.secrets.toml", "settings_prod/.secrets.toml", - ]] + ]], + **dynconf_kwargs ) diff --git a/pr_agent/custom_merge_loader.py b/pr_agent/custom_merge_loader.py new file mode 100644 index 00000000..8aa29e8d --- /dev/null +++ b/pr_agent/custom_merge_loader.py @@ -0,0 +1,167 @@ +from pathlib import Path +import tomllib #tomllib should be used instead of Py toml for Python 3.11+ + +from jinja2.exceptions import SecurityError + +from pr_agent.log import get_logger + +def load(obj, env=None, silent=True, key=None, filename=None): + """ + Load and merge TOML configuration files into a Dynaconf settings object using a secure, in-house loader. + This loader: + - Replaces list and dict fields instead of appending/updating (non-default Dynaconf behavior). + - Enforces several security checks (e.g., disallows includes/preloads and enforces .toml files). + - Supports optional single-key loading. + Args: + obj: The Dynaconf settings instance to update. + env: The current environment name (upper case). Defaults to 'DEVELOPMENT'. Note: currently unused. + silent (bool): If True, suppress exceptions and log warnings/errors instead. + key (str | None): Load only this top-level key (section) if provided; otherwise, load all keys from the files. + filename (str | None): Custom filename for tests (not used when settings_files are provided). + Returns: + None + """ + + MAX_TOML_SIZE_IN_BYTES = 100 * 1024 * 1024 # Prevent out of mem. exceptions by limiting to 100 MBs which is sufficient for upto 1M lines + + # Get the list of files to load + # TODO: hasattr(obj, 'settings_files') for some reason returns False. Need to use 'settings_file' + settings_files = obj.settings_files if hasattr(obj, 'settings_files') else ( + obj.settings_file) if hasattr(obj, 'settings_file') else [] + if not settings_files or not isinstance(settings_files, list): + get_logger().warning("No settings files specified, or missing keys " + "(tried looking for 'settings_files' or 'settings_file'), or not a list. Skipping loading.", + artifact={'toml_obj_attributes_names': dir(obj)}) + return + + # Storage for all loaded data + accumulated_data = {} + + # Security: Check for forbidden configuration options + if hasattr(obj, 'includes') and obj.includes: + if not silent: + raise SecurityError("Configuration includes forbidden option: 'includes'. Skipping loading.") + get_logger().error("Configuration includes forbidden option: 'includes'. Skipping loading.") + return + if hasattr(obj, 'preload') and obj.preload: + if not silent: + raise SecurityError("Configuration includes forbidden option: 'preload'. Skipping loading.") + get_logger().error("Configuration includes forbidden option: 'preload'. Skipping loading.") + return + + for settings_file in settings_files: + try: + # Load the TOML file + file_path = Path(settings_file) + # Security: Only allow .toml files + if file_path.suffix.lower() != '.toml': + get_logger().warning(f"Only .toml files are allowed. Skipping: {settings_file}") + continue + + if not file_path.exists(): + get_logger().warning(f"Settings file not found: {settings_file}. Skipping it.") + continue + + if file_path.stat().st_size > MAX_TOML_SIZE_IN_BYTES: + get_logger().warning(f"Settings file too large (> {MAX_TOML_SIZE_IN_BYTES} bytes): {settings_file}. Skipping it.") + continue + + with open(file_path, 'rb') as f: + file_data = tomllib.load(f) + + # Handle sections (like [config], [default], etc.) + if not isinstance(file_data, dict): + get_logger().warning(f"TOML root is not a table in '{settings_file}'. Skipping.") + continue + + # Security: Check file contents for forbidden directives + validate_file_security(file_data, settings_file) + + for section_name, section_data in file_data.items(): + if not isinstance(section_data, dict): + get_logger().warning(f"Section '{section_name}' in '{settings_file}' is not a table. Skipping.") + continue + for field, field_value in section_data.items(): + if section_name not in accumulated_data: + accumulated_data[section_name] = {} + accumulated_data[section_name][field] = field_value + + except Exception as e: + if not silent: + raise e + get_logger().exception(f"Exception loading settings file: {settings_file}. Skipping.") + + # Update the settings object + for k, v in accumulated_data.items(): + if key is None or key == k: + obj.set(k, v) + +def validate_file_security(file_data, filename): + """ + Validate that the config file does not contain security-sensitive directives. + + Args: + file_data: Parsed TOML data representing the configuration contents. + filename: The name or path of the file being validated (used for error messages). + + Raises: + SecurityError: If forbidden directives are found within the configuration, or if data too nested. + """ + MAX_DEPTH = 50 + + # Check for forbidden keys + # Comprehensive list of forbidden keys with explanations + forbidden_keys_to_reasons = { + # Include mechanisms - allow loading arbitrary files + 'dynaconf_include': 'allows including other config files dynamically', + 'dynaconf_includes': 'allows including other config files dynamically', + 'includes': 'allows including other config files dynamically', + + # Preload mechanisms - allow loading files before main config + 'preload': 'allows preloading files with potential code execution', + 'preload_for_dynaconf': 'allows preloading files with potential code execution', + 'preloads': 'allows preloading files with potential code execution', + + # Merge controls - could be used to manipulate config behavior + 'dynaconf_merge': 'allows manipulating merge behavior', + 'dynaconf_merge_enabled': 'allows manipulating merge behavior', + 'merge_enabled': 'allows manipulating merge behavior', + + # Loader controls - allow changing how configs are loaded + 'loaders_for_dynaconf': 'allows overriding loaders to execute arbitrary code', + 'loaders': 'allows overriding loaders to execute arbitrary code', + 'core_loaders': 'allows overriding core loaders', + 'core_loaders_for_dynaconf': 'allows overriding core loaders', + + # Settings module - allows loading Python modules + 'settings_module': 'allows loading Python modules with code execution', + 'settings_file_for_dynaconf': 'could override settings file location', + 'settings_files_for_dynaconf': 'could override settings file location', + + # Environment variable prefix manipulation + 'envvar_prefix': 'allows changing environment variable prefix', + 'envvar_prefix_for_dynaconf': 'allows changing environment variable prefix', + } + + # Check at the top level and in all sections + def check_dict(data, path="", max_depth=MAX_DEPTH): + if max_depth <= 0: + raise SecurityError( + f"Maximum nesting depth exceeded at {path}. " + f"Possible attempt to cause stack overflow." + ) + + for key, value in data.items(): + full_path = f"{path}.{key}" if path else key + + if key.lower() in forbidden_keys_to_reasons: + raise SecurityError( + f"Security error in {filename}: " + f"Forbidden directive '{key}' found at {full_path}. Reason: {forbidden_keys_to_reasons[key.lower()]}" + ) + + # Recursively check nested dicts + if isinstance(value, dict): + check_dict(value, path=full_path, max_depth=(max_depth - 1)) + + check_dict(file_data, max_depth=MAX_DEPTH) diff --git a/pr_agent/git_providers/utils.py b/pr_agent/git_providers/utils.py index e37be1c1..1e64b957 100644 --- a/pr_agent/git_providers/utils.py +++ b/pr_agent/git_providers/utils.py @@ -38,10 +38,17 @@ def apply_repo_settings(pr_url): os.write(fd, repo_settings) try: + dynconf_kwargs = {'core_loaders': [], # DISABLE default loaders, otherwise will load toml files more than once. + 'loaders': ['pr_agent.custom_merge_loader'], + # Use a custom loader to merge sections, but overwrite their overlapping values. Don't involve ENV variables. + 'merge_enabled': True # Merge multiple files; ensures [XYZ] sections only overwrite overlapping keys, not whole sections. + } + new_settings = Dynaconf(settings_files=[repo_settings_file], # Disable all dynamic loading features load_dotenv=False, # Don't load .env files - merge_enabled=False, # Don't allow merging from other sources + envvar_prefix=False, # Drop DYNACONF for env. variables + **dynconf_kwargs ) except TypeError as e: # Fallback for older Dynaconf versions that don't support these parameters diff --git a/pr_agent/tools/pr_config.py b/pr_agent/tools/pr_config.py index 323a96a5..24ecaab9 100644 --- a/pr_agent/tools/pr_config.py +++ b/pr_agent/tools/pr_config.py @@ -30,8 +30,24 @@ class PRConfig: return "" def _prepare_pr_configs(self) -> str: - conf_file = get_settings().find_file("configuration.toml") - conf_settings = Dynaconf(settings_files=[conf_file]) + try: + conf_file = get_settings().find_file("configuration.toml") + dynconf_kwargs = {'core_loaders': [], # DISABLE default loaders, otherwise will load toml files more than once. + 'loaders': ['pr_agent.custom_merge_loader'], + # Use a custom loader to merge sections, but overwrite their overlapping values. Do not use ENV variables. + 'merge_enabled': True + # Merge multiple TOML files; prevent full section overwrite—only overlapping keys in sections overwrite prior ones. + } + conf_settings = Dynaconf(settings_files=[conf_file], + # Security: Disable all dynamic loading features + load_dotenv=False, # Don't load .env files + envvar_prefix=False, + **dynconf_kwargs + ) + except Exception as e: + get_logger().error("Caught exception during Dynaconf loading. Returning empty dict", + artifact={"exception": e}) + conf_settings = {} configuration_headers = [header.lower() for header in conf_settings.keys()] relevant_configs = { header: configs for header, configs in get_settings().to_dict().items()