mirror of
https://github.com/qodo-ai/pr-agent.git
synced 2025-12-12 10:55:17 +00:00
Set a custom toml loader for Dynaconf (#2087)
Some checks are pending
Build-and-test / build-and-test (push) Waiting to run
Some checks are pending
Build-and-test / build-and-test (push) Waiting to run
This commit is contained in:
parent
8bd134e30c
commit
d141e4ff9d
4 changed files with 202 additions and 5 deletions
|
|
@ -8,9 +8,15 @@ from starlette_context import context
|
||||||
PR_AGENT_TOML_KEY = 'pr-agent'
|
PR_AGENT_TOML_KEY = 'pr-agent'
|
||||||
|
|
||||||
current_dir = dirname(abspath(__file__))
|
current_dir = dirname(abspath(__file__))
|
||||||
|
|
||||||
|
dynconf_kwargs = {'core_loaders': [], # DISABLE default loaders, otherwise will load toml files more than once.
|
||||||
|
'loaders': ['pr_agent.custom_merge_loader', 'dynaconf.loaders.env_loader'], # Use a custom loader to merge sections, but overwrite their overlapping values. Also support ENV variables to take precedence.
|
||||||
|
'root_path': join(current_dir, "settings"), #Used for Dynaconf.find_file() - So that root path points to settings folder, since we disabled all core loaders.
|
||||||
|
'merge_enabled': True # In case more than one file is sent, merge them. Must be set to True, otherwise, a .toml file with section [XYZ] overwrites the entire section of a previous .toml file's [XYZ] and we want it to only overwrite the overlapping fields under such section
|
||||||
|
}
|
||||||
global_settings = Dynaconf(
|
global_settings = Dynaconf(
|
||||||
envvar_prefix=False,
|
envvar_prefix=False,
|
||||||
merge_enabled=True,
|
load_dotenv=False, # Security: Don't load .env files
|
||||||
settings_files=[join(current_dir, f) for f in [
|
settings_files=[join(current_dir, f) for f in [
|
||||||
"settings/configuration.toml",
|
"settings/configuration.toml",
|
||||||
"settings/ignore.toml",
|
"settings/ignore.toml",
|
||||||
|
|
@ -33,7 +39,8 @@ global_settings = Dynaconf(
|
||||||
"settings/pr_help_docs_headings_prompts.toml",
|
"settings/pr_help_docs_headings_prompts.toml",
|
||||||
"settings/.secrets.toml",
|
"settings/.secrets.toml",
|
||||||
"settings_prod/.secrets.toml",
|
"settings_prod/.secrets.toml",
|
||||||
]]
|
]],
|
||||||
|
**dynconf_kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
167
pr_agent/custom_merge_loader.py
Normal file
167
pr_agent/custom_merge_loader.py
Normal file
|
|
@ -0,0 +1,167 @@
|
||||||
|
from pathlib import Path
|
||||||
|
import tomllib #tomllib should be used instead of Py toml for Python 3.11+
|
||||||
|
|
||||||
|
from jinja2.exceptions import SecurityError
|
||||||
|
|
||||||
|
from pr_agent.log import get_logger
|
||||||
|
|
||||||
|
def load(obj, env=None, silent=True, key=None, filename=None):
|
||||||
|
"""
|
||||||
|
Load and merge TOML configuration files into a Dynaconf settings object using a secure, in-house loader.
|
||||||
|
This loader:
|
||||||
|
- Replaces list and dict fields instead of appending/updating (non-default Dynaconf behavior).
|
||||||
|
- Enforces several security checks (e.g., disallows includes/preloads and enforces .toml files).
|
||||||
|
- Supports optional single-key loading.
|
||||||
|
Args:
|
||||||
|
obj: The Dynaconf settings instance to update.
|
||||||
|
env: The current environment name (upper case). Defaults to 'DEVELOPMENT'. Note: currently unused.
|
||||||
|
silent (bool): If True, suppress exceptions and log warnings/errors instead.
|
||||||
|
key (str | None): Load only this top-level key (section) if provided; otherwise, load all keys from the files.
|
||||||
|
filename (str | None): Custom filename for tests (not used when settings_files are provided).
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
|
||||||
|
MAX_TOML_SIZE_IN_BYTES = 100 * 1024 * 1024 # Prevent out of mem. exceptions by limiting to 100 MBs which is sufficient for upto 1M lines
|
||||||
|
|
||||||
|
# Get the list of files to load
|
||||||
|
# TODO: hasattr(obj, 'settings_files') for some reason returns False. Need to use 'settings_file'
|
||||||
|
settings_files = obj.settings_files if hasattr(obj, 'settings_files') else (
|
||||||
|
obj.settings_file) if hasattr(obj, 'settings_file') else []
|
||||||
|
if not settings_files or not isinstance(settings_files, list):
|
||||||
|
get_logger().warning("No settings files specified, or missing keys "
|
||||||
|
"(tried looking for 'settings_files' or 'settings_file'), or not a list. Skipping loading.",
|
||||||
|
artifact={'toml_obj_attributes_names': dir(obj)})
|
||||||
|
return
|
||||||
|
|
||||||
|
# Storage for all loaded data
|
||||||
|
accumulated_data = {}
|
||||||
|
|
||||||
|
# Security: Check for forbidden configuration options
|
||||||
|
if hasattr(obj, 'includes') and obj.includes:
|
||||||
|
if not silent:
|
||||||
|
raise SecurityError("Configuration includes forbidden option: 'includes'. Skipping loading.")
|
||||||
|
get_logger().error("Configuration includes forbidden option: 'includes'. Skipping loading.")
|
||||||
|
return
|
||||||
|
if hasattr(obj, 'preload') and obj.preload:
|
||||||
|
if not silent:
|
||||||
|
raise SecurityError("Configuration includes forbidden option: 'preload'. Skipping loading.")
|
||||||
|
get_logger().error("Configuration includes forbidden option: 'preload'. Skipping loading.")
|
||||||
|
return
|
||||||
|
|
||||||
|
for settings_file in settings_files:
|
||||||
|
try:
|
||||||
|
# Load the TOML file
|
||||||
|
file_path = Path(settings_file)
|
||||||
|
# Security: Only allow .toml files
|
||||||
|
if file_path.suffix.lower() != '.toml':
|
||||||
|
get_logger().warning(f"Only .toml files are allowed. Skipping: {settings_file}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not file_path.exists():
|
||||||
|
get_logger().warning(f"Settings file not found: {settings_file}. Skipping it.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if file_path.stat().st_size > MAX_TOML_SIZE_IN_BYTES:
|
||||||
|
get_logger().warning(f"Settings file too large (> {MAX_TOML_SIZE_IN_BYTES} bytes): {settings_file}. Skipping it.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
file_data = tomllib.load(f)
|
||||||
|
|
||||||
|
# Handle sections (like [config], [default], etc.)
|
||||||
|
if not isinstance(file_data, dict):
|
||||||
|
get_logger().warning(f"TOML root is not a table in '{settings_file}'. Skipping.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Security: Check file contents for forbidden directives
|
||||||
|
validate_file_security(file_data, settings_file)
|
||||||
|
|
||||||
|
for section_name, section_data in file_data.items():
|
||||||
|
if not isinstance(section_data, dict):
|
||||||
|
get_logger().warning(f"Section '{section_name}' in '{settings_file}' is not a table. Skipping.")
|
||||||
|
continue
|
||||||
|
for field, field_value in section_data.items():
|
||||||
|
if section_name not in accumulated_data:
|
||||||
|
accumulated_data[section_name] = {}
|
||||||
|
accumulated_data[section_name][field] = field_value
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if not silent:
|
||||||
|
raise e
|
||||||
|
get_logger().exception(f"Exception loading settings file: {settings_file}. Skipping.")
|
||||||
|
|
||||||
|
# Update the settings object
|
||||||
|
for k, v in accumulated_data.items():
|
||||||
|
if key is None or key == k:
|
||||||
|
obj.set(k, v)
|
||||||
|
|
||||||
|
def validate_file_security(file_data, filename):
|
||||||
|
"""
|
||||||
|
Validate that the config file does not contain security-sensitive directives.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_data: Parsed TOML data representing the configuration contents.
|
||||||
|
filename: The name or path of the file being validated (used for error messages).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SecurityError: If forbidden directives are found within the configuration, or if data too nested.
|
||||||
|
"""
|
||||||
|
MAX_DEPTH = 50
|
||||||
|
|
||||||
|
# Check for forbidden keys
|
||||||
|
# Comprehensive list of forbidden keys with explanations
|
||||||
|
forbidden_keys_to_reasons = {
|
||||||
|
# Include mechanisms - allow loading arbitrary files
|
||||||
|
'dynaconf_include': 'allows including other config files dynamically',
|
||||||
|
'dynaconf_includes': 'allows including other config files dynamically',
|
||||||
|
'includes': 'allows including other config files dynamically',
|
||||||
|
|
||||||
|
# Preload mechanisms - allow loading files before main config
|
||||||
|
'preload': 'allows preloading files with potential code execution',
|
||||||
|
'preload_for_dynaconf': 'allows preloading files with potential code execution',
|
||||||
|
'preloads': 'allows preloading files with potential code execution',
|
||||||
|
|
||||||
|
# Merge controls - could be used to manipulate config behavior
|
||||||
|
'dynaconf_merge': 'allows manipulating merge behavior',
|
||||||
|
'dynaconf_merge_enabled': 'allows manipulating merge behavior',
|
||||||
|
'merge_enabled': 'allows manipulating merge behavior',
|
||||||
|
|
||||||
|
# Loader controls - allow changing how configs are loaded
|
||||||
|
'loaders_for_dynaconf': 'allows overriding loaders to execute arbitrary code',
|
||||||
|
'loaders': 'allows overriding loaders to execute arbitrary code',
|
||||||
|
'core_loaders': 'allows overriding core loaders',
|
||||||
|
'core_loaders_for_dynaconf': 'allows overriding core loaders',
|
||||||
|
|
||||||
|
# Settings module - allows loading Python modules
|
||||||
|
'settings_module': 'allows loading Python modules with code execution',
|
||||||
|
'settings_file_for_dynaconf': 'could override settings file location',
|
||||||
|
'settings_files_for_dynaconf': 'could override settings file location',
|
||||||
|
|
||||||
|
# Environment variable prefix manipulation
|
||||||
|
'envvar_prefix': 'allows changing environment variable prefix',
|
||||||
|
'envvar_prefix_for_dynaconf': 'allows changing environment variable prefix',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check at the top level and in all sections
|
||||||
|
def check_dict(data, path="", max_depth=MAX_DEPTH):
|
||||||
|
if max_depth <= 0:
|
||||||
|
raise SecurityError(
|
||||||
|
f"Maximum nesting depth exceeded at {path}. "
|
||||||
|
f"Possible attempt to cause stack overflow."
|
||||||
|
)
|
||||||
|
|
||||||
|
for key, value in data.items():
|
||||||
|
full_path = f"{path}.{key}" if path else key
|
||||||
|
|
||||||
|
if key.lower() in forbidden_keys_to_reasons:
|
||||||
|
raise SecurityError(
|
||||||
|
f"Security error in {filename}: "
|
||||||
|
f"Forbidden directive '{key}' found at {full_path}. Reason: {forbidden_keys_to_reasons[key.lower()]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Recursively check nested dicts
|
||||||
|
if isinstance(value, dict):
|
||||||
|
check_dict(value, path=full_path, max_depth=(max_depth - 1))
|
||||||
|
|
||||||
|
check_dict(file_data, max_depth=MAX_DEPTH)
|
||||||
|
|
@ -38,10 +38,17 @@ def apply_repo_settings(pr_url):
|
||||||
os.write(fd, repo_settings)
|
os.write(fd, repo_settings)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
dynconf_kwargs = {'core_loaders': [], # DISABLE default loaders, otherwise will load toml files more than once.
|
||||||
|
'loaders': ['pr_agent.custom_merge_loader'],
|
||||||
|
# Use a custom loader to merge sections, but overwrite their overlapping values. Don't involve ENV variables.
|
||||||
|
'merge_enabled': True # Merge multiple files; ensures [XYZ] sections only overwrite overlapping keys, not whole sections.
|
||||||
|
}
|
||||||
|
|
||||||
new_settings = Dynaconf(settings_files=[repo_settings_file],
|
new_settings = Dynaconf(settings_files=[repo_settings_file],
|
||||||
# Disable all dynamic loading features
|
# Disable all dynamic loading features
|
||||||
load_dotenv=False, # Don't load .env files
|
load_dotenv=False, # Don't load .env files
|
||||||
merge_enabled=False, # Don't allow merging from other sources
|
envvar_prefix=False, # Drop DYNACONF for env. variables
|
||||||
|
**dynconf_kwargs
|
||||||
)
|
)
|
||||||
except TypeError as e:
|
except TypeError as e:
|
||||||
# Fallback for older Dynaconf versions that don't support these parameters
|
# Fallback for older Dynaconf versions that don't support these parameters
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,24 @@ class PRConfig:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
def _prepare_pr_configs(self) -> str:
|
def _prepare_pr_configs(self) -> str:
|
||||||
conf_file = get_settings().find_file("configuration.toml")
|
try:
|
||||||
conf_settings = Dynaconf(settings_files=[conf_file])
|
conf_file = get_settings().find_file("configuration.toml")
|
||||||
|
dynconf_kwargs = {'core_loaders': [], # DISABLE default loaders, otherwise will load toml files more than once.
|
||||||
|
'loaders': ['pr_agent.custom_merge_loader'],
|
||||||
|
# Use a custom loader to merge sections, but overwrite their overlapping values. Do not use ENV variables.
|
||||||
|
'merge_enabled': True
|
||||||
|
# Merge multiple TOML files; prevent full section overwrite—only overlapping keys in sections overwrite prior ones.
|
||||||
|
}
|
||||||
|
conf_settings = Dynaconf(settings_files=[conf_file],
|
||||||
|
# Security: Disable all dynamic loading features
|
||||||
|
load_dotenv=False, # Don't load .env files
|
||||||
|
envvar_prefix=False,
|
||||||
|
**dynconf_kwargs
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
get_logger().error("Caught exception during Dynaconf loading. Returning empty dict",
|
||||||
|
artifact={"exception": e})
|
||||||
|
conf_settings = {}
|
||||||
configuration_headers = [header.lower() for header in conf_settings.keys()]
|
configuration_headers = [header.lower() for header in conf_settings.keys()]
|
||||||
relevant_configs = {
|
relevant_configs = {
|
||||||
header: configs for header, configs in get_settings().to_dict().items()
|
header: configs for header, configs in get_settings().to_dict().items()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue