from pathlib import Path import tomllib #tomllib should be used instead of Py toml for Python 3.11+ from jinja2.exceptions import SecurityError from pr_agent.log import get_logger def load(obj, env=None, silent=True, key=None, filename=None): """ Load and merge TOML configuration files into a Dynaconf settings object using a secure, in-house loader. This loader: - Replaces list and dict fields instead of appending/updating (non-default Dynaconf behavior). - Enforces several security checks (e.g., disallows includes/preloads and enforces .toml files). - Supports optional single-key loading. Args: obj: The Dynaconf settings instance to update. env: The current environment name (upper case). Defaults to 'DEVELOPMENT'. Note: currently unused. silent (bool): If True, suppress exceptions and log warnings/errors instead. key (str | None): Load only this top-level key (section) if provided; otherwise, load all keys from the files. filename (str | None): Custom filename for tests (not used when settings_files are provided). Returns: None """ MAX_TOML_SIZE_IN_BYTES = 100 * 1024 * 1024 # Prevent out of mem. exceptions by limiting to 100 MBs which is sufficient for upto 1M lines # Get the list of files to load # TODO: hasattr(obj, 'settings_files') for some reason returns False. Need to use 'settings_file' settings_files = obj.settings_files if hasattr(obj, 'settings_files') else ( obj.settings_file) if hasattr(obj, 'settings_file') else [] if not settings_files or not isinstance(settings_files, list): get_logger().warning("No settings files specified, or missing keys " "(tried looking for 'settings_files' or 'settings_file'), or not a list. Skipping loading.", artifact={'toml_obj_attributes_names': dir(obj)}) return # Storage for all loaded data accumulated_data = {} # Security: Check for forbidden configuration options if hasattr(obj, 'includes') and obj.includes: if not silent: raise SecurityError("Configuration includes forbidden option: 'includes'. Skipping loading.") get_logger().error("Configuration includes forbidden option: 'includes'. Skipping loading.") return if hasattr(obj, 'preload') and obj.preload: if not silent: raise SecurityError("Configuration includes forbidden option: 'preload'. Skipping loading.") get_logger().error("Configuration includes forbidden option: 'preload'. Skipping loading.") return for settings_file in settings_files: try: # Load the TOML file file_path = Path(settings_file) # Security: Only allow .toml files if file_path.suffix.lower() != '.toml': get_logger().warning(f"Only .toml files are allowed. Skipping: {settings_file}") continue if not file_path.exists(): get_logger().warning(f"Settings file not found: {settings_file}. Skipping it.") continue if file_path.stat().st_size > MAX_TOML_SIZE_IN_BYTES: get_logger().warning(f"Settings file too large (> {MAX_TOML_SIZE_IN_BYTES} bytes): {settings_file}. Skipping it.") continue with open(file_path, 'rb') as f: file_data = tomllib.load(f) # Handle sections (like [config], [default], etc.) if not isinstance(file_data, dict): get_logger().warning(f"TOML root is not a table in '{settings_file}'. Skipping.") continue # Security: Check file contents for forbidden directives validate_file_security(file_data, settings_file) for section_name, section_data in file_data.items(): if not isinstance(section_data, dict): get_logger().warning(f"Section '{section_name}' in '{settings_file}' is not a table. Skipping.") continue for field, field_value in section_data.items(): if section_name not in accumulated_data: accumulated_data[section_name] = {} accumulated_data[section_name][field] = field_value except Exception as e: if not silent: raise e get_logger().exception(f"Exception loading settings file: {settings_file}. Skipping.") # Update the settings object for k, v in accumulated_data.items(): if key is None or key == k: obj.set(k, v) def validate_file_security(file_data, filename): """ Validate that the config file does not contain security-sensitive directives. Args: file_data: Parsed TOML data representing the configuration contents. filename: The name or path of the file being validated (used for error messages). Raises: SecurityError: If forbidden directives are found within the configuration, or if data too nested. """ MAX_DEPTH = 50 # Check for forbidden keys # Comprehensive list of forbidden keys with explanations forbidden_keys_to_reasons = { # Include mechanisms - allow loading arbitrary files 'dynaconf_include': 'allows including other config files dynamically', 'dynaconf_includes': 'allows including other config files dynamically', 'includes': 'allows including other config files dynamically', # Preload mechanisms - allow loading files before main config 'preload': 'allows preloading files with potential code execution', 'preload_for_dynaconf': 'allows preloading files with potential code execution', 'preloads': 'allows preloading files with potential code execution', # Merge controls - could be used to manipulate config behavior 'dynaconf_merge': 'allows manipulating merge behavior', 'dynaconf_merge_enabled': 'allows manipulating merge behavior', 'merge_enabled': 'allows manipulating merge behavior', # Loader controls - allow changing how configs are loaded 'loaders_for_dynaconf': 'allows overriding loaders to execute arbitrary code', 'loaders': 'allows overriding loaders to execute arbitrary code', 'core_loaders': 'allows overriding core loaders', 'core_loaders_for_dynaconf': 'allows overriding core loaders', # Settings module - allows loading Python modules 'settings_module': 'allows loading Python modules with code execution', 'settings_file_for_dynaconf': 'could override settings file location', 'settings_files_for_dynaconf': 'could override settings file location', # Environment variable prefix manipulation 'envvar_prefix': 'allows changing environment variable prefix', 'envvar_prefix_for_dynaconf': 'allows changing environment variable prefix', } # Check at the top level and in all sections def check_dict(data, path="", max_depth=MAX_DEPTH): if max_depth <= 0: raise SecurityError( f"Maximum nesting depth exceeded at {path}. " f"Possible attempt to cause stack overflow." ) for key, value in data.items(): full_path = f"{path}.{key}" if path else key if key.lower() in forbidden_keys_to_reasons: raise SecurityError( f"Security error in {filename}: " f"Forbidden directive '{key}' found at {full_path}. Reason: {forbidden_keys_to_reasons[key.lower()]}" ) # Recursively check nested dicts if isinstance(value, dict): check_dict(value, path=full_path, max_depth=(max_depth - 1)) check_dict(file_data, max_depth=MAX_DEPTH)