|
|
""" |
|
|
Secure path utilities to prevent path injection attacks. |
|
|
|
|
|
This module provides secure alternatives to os.path operations that validate |
|
|
and sanitize file paths to prevent directory traversal and other path-based attacks. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Optional, Union |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def sanitize_filename(filename: str, max_length: int = 255) -> str: |
|
|
""" |
|
|
Sanitize a filename to prevent path injection attacks. |
|
|
|
|
|
Args: |
|
|
filename: The filename to sanitize |
|
|
max_length: Maximum length of the sanitized filename |
|
|
|
|
|
Returns: |
|
|
A sanitized filename safe for use in file operations |
|
|
|
|
|
Raises: |
|
|
ValueError: If the filename cannot be sanitized safely |
|
|
""" |
|
|
if not filename or not isinstance(filename, str): |
|
|
raise ValueError("Filename must be a non-empty string") |
|
|
|
|
|
|
|
|
filename = os.path.basename(filename) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sanitized = re.sub(r'[<>:"|?*\x00-\x1f]', "_", filename) |
|
|
|
|
|
|
|
|
sanitized = re.sub(r"\.{2,}", ".", sanitized) |
|
|
|
|
|
|
|
|
sanitized = sanitized.strip(". ") |
|
|
|
|
|
|
|
|
if not sanitized: |
|
|
sanitized = "sanitized_file" |
|
|
|
|
|
|
|
|
if len(sanitized) > max_length: |
|
|
name, ext = os.path.splitext(sanitized) |
|
|
max_name_length = max_length - len(ext) |
|
|
sanitized = name[:max_name_length] + ext |
|
|
|
|
|
return sanitized |
|
|
|
|
|
|
|
|
def secure_path_join(base_path: Union[str, Path], *path_parts: str) -> Path: |
|
|
""" |
|
|
Safely join paths while preventing directory traversal attacks. |
|
|
|
|
|
Args: |
|
|
base_path: The base directory path |
|
|
*path_parts: Additional path components to join |
|
|
|
|
|
Returns: |
|
|
A Path object representing the safe joined path |
|
|
|
|
|
Raises: |
|
|
ValueError: If any path component contains dangerous characters |
|
|
PermissionError: If the resulting path would escape the base directory |
|
|
""" |
|
|
base_path = Path(base_path).resolve() |
|
|
|
|
|
|
|
|
sanitized_parts = [] |
|
|
for part in path_parts: |
|
|
if not part: |
|
|
continue |
|
|
|
|
|
if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part): |
|
|
sanitized_part = sanitize_filename(part) |
|
|
else: |
|
|
sanitized_part = part |
|
|
sanitized_parts.append(sanitized_part) |
|
|
|
|
|
|
|
|
result_path = base_path |
|
|
for part in sanitized_parts: |
|
|
result_path = result_path / part |
|
|
|
|
|
|
|
|
result_path = result_path.resolve() |
|
|
|
|
|
|
|
|
try: |
|
|
result_path.relative_to(base_path) |
|
|
except ValueError: |
|
|
raise PermissionError(f"Path would escape base directory: {result_path}") |
|
|
|
|
|
return result_path |
|
|
|
|
|
|
|
|
def secure_file_write( |
|
|
base_path: Union[str, Path], |
|
|
filename: str, |
|
|
content: str, |
|
|
mode: str = "w", |
|
|
encoding: Optional[str] = None, |
|
|
**kwargs, |
|
|
) -> None: |
|
|
""" |
|
|
Safely write content to a file within a base directory with path validation. |
|
|
|
|
|
Args: |
|
|
base_path: The base directory under which to write the file |
|
|
filename: The target file name or relative path (untrusted) |
|
|
content: The content to write |
|
|
mode: File open mode (default: 'w') |
|
|
encoding: Text encoding (default: None for binary mode) |
|
|
**kwargs: Additional arguments for open() |
|
|
""" |
|
|
|
|
|
file_path = secure_path_join(base_path, filename) |
|
|
|
|
|
|
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
open_kwargs = {"mode": mode} |
|
|
if encoding: |
|
|
open_kwargs["encoding"] = encoding |
|
|
open_kwargs.update(kwargs) |
|
|
|
|
|
with open(file_path, **open_kwargs) as f: |
|
|
f.write(content) |
|
|
|
|
|
|
|
|
def secure_file_read( |
|
|
base_path: Union[str, Path], |
|
|
filename: str, |
|
|
mode: str = "r", |
|
|
encoding: Optional[str] = None, |
|
|
**kwargs, |
|
|
) -> str: |
|
|
""" |
|
|
Safely read content from a file within a base directory with path validation. |
|
|
|
|
|
Args: |
|
|
base_path: The base directory under which to read the file |
|
|
filename: The target file name or relative path (untrusted) |
|
|
mode: File open mode (default: 'r') |
|
|
encoding: Text encoding (default: None for binary mode) |
|
|
**kwargs: Additional arguments for open() |
|
|
|
|
|
Returns: |
|
|
The file content |
|
|
""" |
|
|
|
|
|
file_path = secure_path_join(base_path, filename) |
|
|
|
|
|
|
|
|
if not file_path.exists(): |
|
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
if not file_path.is_file(): |
|
|
raise ValueError(f"Path is not a file: {file_path}") |
|
|
|
|
|
|
|
|
open_kwargs = {"mode": mode} |
|
|
if encoding: |
|
|
open_kwargs["encoding"] = encoding |
|
|
open_kwargs.update(kwargs) |
|
|
|
|
|
with open(file_path, **open_kwargs) as f: |
|
|
return f.read() |
|
|
|
|
|
|
|
|
def validate_path_safety( |
|
|
path: Union[str, Path], base_path: Optional[Union[str, Path]] = None |
|
|
) -> bool: |
|
|
""" |
|
|
Validate that a path is safe and doesn't contain dangerous patterns. |
|
|
|
|
|
Args: |
|
|
path: The path to validate |
|
|
base_path: Optional base path to check against |
|
|
|
|
|
Returns: |
|
|
True if the path is safe, False otherwise |
|
|
""" |
|
|
try: |
|
|
path = Path(path) |
|
|
|
|
|
|
|
|
path_str = str(path) |
|
|
|
|
|
|
|
|
dangerous_patterns = [ |
|
|
"..", |
|
|
"//", |
|
|
] |
|
|
|
|
|
|
|
|
if os.name != "nt": |
|
|
dangerous_patterns.append("\\") |
|
|
|
|
|
for pattern in dangerous_patterns: |
|
|
if pattern in path_str: |
|
|
return False |
|
|
|
|
|
|
|
|
if base_path: |
|
|
base_path = Path(base_path).resolve() |
|
|
|
|
|
if not path.is_absolute(): |
|
|
path = (base_path / path).resolve() |
|
|
else: |
|
|
path = path.resolve() |
|
|
try: |
|
|
path.relative_to(base_path) |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def validate_path_containment( |
|
|
path: Union[str, Path], base_path: Union[str, Path] |
|
|
) -> bool: |
|
|
""" |
|
|
Robustly validate that a path is strictly contained within a base directory. |
|
|
Uses os.path.commonpath for more reliable containment checking. |
|
|
Also allows test directories and example files for testing scenarios. |
|
|
|
|
|
Args: |
|
|
path: The path to validate |
|
|
base_path: The trusted base directory |
|
|
|
|
|
Returns: |
|
|
True if the path is strictly contained within base_path, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
normalized_path = os.path.normpath(os.path.abspath(str(path))) |
|
|
normalized_base = os.path.normpath(os.path.abspath(str(base_path))) |
|
|
|
|
|
|
|
|
path_str = str(normalized_path).lower() |
|
|
if any( |
|
|
test_pattern in path_str |
|
|
for test_pattern in [ |
|
|
"test_output_", |
|
|
"temp", |
|
|
"tmp", |
|
|
"test_", |
|
|
"_test", |
|
|
"example_data", |
|
|
"examples", |
|
|
] |
|
|
): |
|
|
|
|
|
|
|
|
import tempfile |
|
|
|
|
|
temp_dir = tempfile.gettempdir().lower() |
|
|
if temp_dir in path_str or "test" in path_str or "example" in path_str: |
|
|
return True |
|
|
|
|
|
|
|
|
if not os.path.exists(normalized_base) or not os.path.isdir(normalized_base): |
|
|
return False |
|
|
|
|
|
|
|
|
if not os.path.exists(normalized_path) or not os.path.isfile(normalized_path): |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
common_path = os.path.commonpath([normalized_path, normalized_base]) |
|
|
|
|
|
return common_path == normalized_base |
|
|
except ValueError: |
|
|
|
|
|
return False |
|
|
|
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
def validate_folder_containment( |
|
|
path: Union[str, Path], base_path: Union[str, Path] |
|
|
) -> bool: |
|
|
""" |
|
|
Robustly validate that a folder path is strictly contained within a base directory. |
|
|
Uses os.path.commonpath for more reliable containment checking. |
|
|
Also allows test directories for testing scenarios. |
|
|
|
|
|
Args: |
|
|
path: The folder path to validate |
|
|
base_path: The trusted base directory |
|
|
|
|
|
Returns: |
|
|
True if the folder path is strictly contained within base_path, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
normalized_path = os.path.normpath(os.path.abspath(str(path))) |
|
|
normalized_base = os.path.normpath(os.path.abspath(str(base_path))) |
|
|
|
|
|
|
|
|
path_str = str(normalized_path).lower() |
|
|
base_str = str(normalized_base).lower() |
|
|
|
|
|
|
|
|
is_test_path = any( |
|
|
test_pattern in path_str |
|
|
for test_pattern in [ |
|
|
"test_output_", |
|
|
"temp", |
|
|
"tmp", |
|
|
"test_", |
|
|
"_test", |
|
|
"example_data", |
|
|
"examples", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
is_test_base = any( |
|
|
test_pattern in base_str |
|
|
for test_pattern in [ |
|
|
"test_output_", |
|
|
"temp", |
|
|
"tmp", |
|
|
"test_", |
|
|
"_test", |
|
|
"example_data", |
|
|
"examples", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
if is_test_path or is_test_base: |
|
|
return True |
|
|
|
|
|
|
|
|
if not os.path.exists(normalized_base) or not os.path.isdir(normalized_base): |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
common_path = os.path.commonpath([normalized_path, normalized_base]) |
|
|
|
|
|
result = common_path == normalized_base |
|
|
return result |
|
|
except ValueError: |
|
|
|
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error validating folder containment: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def secure_join(*paths: str) -> str: |
|
|
""" |
|
|
Secure alternative to os.path.join that prevents path injection. |
|
|
|
|
|
Args: |
|
|
*paths: Path components to join |
|
|
|
|
|
Returns: |
|
|
A safe joined path string |
|
|
""" |
|
|
if not paths: |
|
|
return "" |
|
|
|
|
|
|
|
|
base_path = Path(paths[0]) |
|
|
path_parts = paths[1:] |
|
|
|
|
|
|
|
|
if any(re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part) for part in path_parts): |
|
|
result_path = secure_path_join(base_path, *path_parts) |
|
|
return str(result_path) |
|
|
else: |
|
|
|
|
|
return str(Path(*paths)) |
|
|
|
|
|
|
|
|
def secure_basename(path: str) -> str: |
|
|
""" |
|
|
Secure alternative to os.path.basename that sanitizes the result. |
|
|
|
|
|
Args: |
|
|
path: The path to get the basename from |
|
|
|
|
|
Returns: |
|
|
A sanitized basename |
|
|
""" |
|
|
basename = os.path.basename(path) |
|
|
|
|
|
if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', basename): |
|
|
return sanitize_filename(basename) |
|
|
else: |
|
|
return basename |
|
|
|