Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix code scanning alert - Uncontrolled data used in path expression #16

Open
1 task
yihong1120 opened this issue Jul 27, 2024 · 1 comment
Open
1 task

Comments

@yihong1120
Copy link
Owner

Tracking issue for:

Copy link

codeautopilot bot commented Jul 27, 2024

Potential solution

The plan to solve the bug involves validating and sanitizing user inputs and external data used in path expressions across all relevant files. This will mitigate the risk of path traversal attacks and ensure the security of the application. The solution includes implementing URL validation, sanitizing file paths, and adding error handling and logging mechanisms.

What is causing this bug?

The bug is caused by the lack of validation and sanitization of user inputs and external data used in path expressions. Specifically:

  1. In src/stream_capture.py, the stream URL is taken directly from user input without validation.
  2. In src/live_stream_detection.py, parameters like output_folder, stream_url, and model_key are used without proper validation.
  3. In src/stream_viewer.py, the stream URL is used directly to initialize the video capture object without validation.
  4. In src/model_fetcher.py, the model_name is used to construct file paths without sanitization, leading to potential path traversal vulnerabilities.

Code

Here are the implementation details and code snippets for each file:

src/stream_capture.py

URL Validation

from urllib.parse import urlparse

def is_valid_url(url: str) -> bool:
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except ValueError:
        return False

# Usage
if not is_valid_url(args.url):
    raise ValueError("Invalid URL provided")

src/live_stream_detection.py

Sanitizing output_folder

from pathlib import Path

def sanitize_path(path: str) -> str:
    safe_base = Path("/safe/directory").resolve()
    resolved_path = (safe_base / path).resolve()
    if not resolved_path.is_relative_to(safe_base):
        raise ValueError("Unsafe path detected")
    return str(resolved_path)

# Usage in the constructor
self.output_folder = sanitize_path(output_folder) if output_folder else None

Validating stream_url

from urllib.parse import urlparse

def validate_url(url: str) -> str:
    result = urlparse(url)
    if all([result.scheme, result.netloc]):
        return url
    else:
        raise ValueError("Invalid URL")

# Usage in the run_detection method
stream_url = validate_url(stream_url)

Sanitizing model_key

import re

def sanitize_model_key(model_key: str) -> str:
    if not re.match(r'^[a-zA-Z0-9_-]+$', model_key):
        raise ValueError("Invalid model key")
    return model_key

# Usage in the constructor and generate_detections_local method
self.model_key = sanitize_model_key(model_key)
model_path = Path('models/pt/') / f"best_{self.model_key}.pt"

src/stream_viewer.py

URL Validation

from urllib.parse import urlparse

class StreamViewer:
    def __init__(self, stream_url: str, window_name: str = 'Stream Viewer'):
        if not self.is_valid_url(stream_url):
            raise ValueError(f"Invalid stream URL: {stream_url}")

        self.stream_url = stream_url
        self.window_name = window_name
        self.cap = cv2.VideoCapture(self.stream_url)

    def is_valid_url(self, url: str) -> bool:
        parsed_url = urlparse(url)
        if parsed_url.scheme not in ['http', 'https', 'rtsp']:
            return False

        allowed_domains = ['kctmc.nat.gov.tw']
        if parsed_url.hostname not in allowed_domains:
            return False

        return True

    def display_stream(self):
        while True:
            ret, frame = self.cap.read()
            if ret:
                cv2.imshow(self.window_name, frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            else:
                print('Failed to retrieve frame.')
                break

        self.release_resources()

    def release_resources(self):
        self.cap.release()
        cv2.destroyAllWindows()

if __name__ == '__main__':
    video_url = 'https://cctv4.kctmc.nat.gov.tw/50204bfc/'
    viewer = StreamViewer(video_url)
    viewer.display_stream()

src/model_fetcher.py

Sanitizing model_name

from pathlib import Path
import requests

def sanitize_model_name(model_name: str) -> str:
    if not model_name.isalnum() and "_" not in model_name:
        raise ValueError("Invalid model name")
    return model_name

def download_model(model_name, url):
    model_name = sanitize_model_name(model_name)
    LOCAL_MODEL_DIRECTORY = Path('models/pt/')
    LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name

    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return

    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(local_file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"'{model_name}' saved to '{local_file_path}'.")
    else:
        print(f"Error downloading '{model_name}': {response.status_code}")

def main():
    MODEL_URLS = {
        'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
        'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
    }

    for model_name, url in MODEL_URLS.items():
        download_model(model_name, url)

if __name__ == '__main__':
    main()

How to replicate the bug

  1. For src/stream_capture.py:

    • Run the script with a malformed or malicious URL as a command-line argument.
    • Example: python src/stream_capture.py --url "http://malicious.url"
  2. For src/live_stream_detection.py:

    • Pass a malicious output_folder, stream_url, or model_key to the LiveStreamDetector class.
    • Example: detector = LiveStreamDetector(output_folder="../../etc/passwd", stream_url="http://malicious.url", model_key="../../etc/passwd")
  3. For src/stream_viewer.py:

    • Initialize the StreamViewer class with a malicious URL.
    • Example: viewer = StreamViewer("http://malicious.url")
  4. For src/model_fetcher.py:

    • Add a malicious model_name to the MODEL_URLS dictionary.
    • Example: MODEL_URLS = {'../../etc/passwd': 'http://malicious.url'}

By following these steps, you can replicate the bug and observe the potential security vulnerabilities caused by uncontrolled data used in path expressions.

Click here to create a Pull Request with the proposed solution

Files used for this task:

Changes on src/stream_capture.py

Analysis Report for src/stream_capture.py

Overview

The file src/stream_capture.py is responsible for capturing frames from a video stream. It includes functionalities to initialize the stream, capture frames at specified intervals, check internet speed, and select stream quality based on the speed.

Potential Security Issue

The primary concern mentioned in the issue is "Uncontrolled data used in path expression," which typically refers to vulnerabilities where user inputs or external data are used to construct file paths without proper validation. This can lead to security issues such as path traversal attacks.

Key Areas to Review

  1. Stream URL Handling:

    • The stream URL is taken as an input from the user via command-line arguments.
    • The URL is used directly in methods like initialise_stream and select_quality_based_on_speed.
  2. Path Expressions:

    • Although the primary functionality revolves around video streams, it's crucial to ensure that any path expressions derived from user inputs are sanitized.

Detailed Analysis

Stream URL Handling

The stream URL is provided by the user through command-line arguments and is used in several methods:

  • initialise_stream(self, stream_url: str) -> None
  • select_quality_based_on_speed(self) -> str | None
  • capture_generic_frames(self) -> Generator[tuple[cv2.Mat, float], None, None>

Code Snippet

parser = argparse.ArgumentParser(
    description='Capture video stream frames.',
)
parser.add_argument(
    '--url',
    type=str,
    help='Live stream URL',
    required=True,
)
args = parser.parse_args()

stream_capture = StreamCapture(args.url)

Potential Issues

  1. Unvalidated URL:

    • The URL is taken directly from user input without any validation or sanitization.
    • If the URL is malformed or contains malicious content, it could lead to unexpected behavior or security vulnerabilities.
  2. Path Traversal:

    • Although the current implementation does not directly construct file paths from the URL, it's good practice to validate and sanitize any user input that could potentially be used in path expressions.

Recommendations

URL Validation

Implement a validation mechanism to ensure that the provided URL is well-formed and points to a legitimate video stream. This can be done using regular expressions or URL parsing libraries.

Example

from urllib.parse import urlparse

def is_valid_url(url: str) -> bool:
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except ValueError:
        return False

# Usage
if not is_valid_url(args.url):
    raise ValueError("Invalid URL provided")

Sanitization

Ensure that any user input is sanitized before being used in the application. This includes stripping out any potentially harmful characters or sequences.

Additional Security Measures

  • Error Handling: Implement robust error handling to manage exceptions that may arise from invalid URLs or failed stream initializations.
  • Logging: Log any suspicious activities or errors for further analysis.

Conclusion

The primary concern in src/stream_capture.py is the handling of the stream URL provided by the user. By implementing URL validation and sanitization, we can mitigate potential security risks associated with uncontrolled data used in path expressions. The current implementation does not directly construct file paths from the URL, but it's essential to follow best practices to ensure the overall security of the application.

Changes on src/live_stream_detection.py

Bug Analysis Report for src/live_stream_detection.py

Summary

The file src/live_stream_detection.py is part of a live stream detection system using YOLOv8 with SAHI. The primary concern is the potential for uncontrolled data being used in path expressions, which could lead to security vulnerabilities such as path traversal attacks.

Key Areas of Concern

  1. User Inputs in Path Expressions:

    • The output_folder parameter in the LiveStreamDetector class constructor.
    • The stream_url parameter in the run_detection method.
    • The model_key parameter in the generate_detections_local method.
  2. External Data in Path Expressions:

    • The model_path in the generate_detections_local method.

Detailed Analysis

1. output_folder Parameter

The output_folder parameter is used to specify the folder where detected frames are saved. This parameter is directly taken from user input without any validation or sanitization.

Potential Issue:

  • If not properly sanitized, this could allow path traversal attacks, where an attacker could specify a path that leads outside the intended directory.

Recommendation:

  • Validate and sanitize the output_folder parameter to ensure it does not contain any malicious path traversal sequences.

2. stream_url Parameter

The stream_url parameter is used to specify the URL of the live stream. This parameter is directly taken from user input without any validation or sanitization.

Potential Issue:

  • Although this parameter is not directly used in path expressions, it is crucial to validate URLs to prevent potential injection attacks or accessing unauthorized resources.

Recommendation:

  • Validate the stream_url parameter to ensure it is a well-formed URL and points to an authorized resource.

3. model_key Parameter

The model_key parameter is used to specify the model key for detection. This parameter is directly taken from user input without any validation or sanitization.

Potential Issue:

  • This parameter is used to construct the model_path in the generate_detections_local method. If not properly sanitized, it could lead to path traversal attacks.

Recommendation:

  • Validate and sanitize the model_key parameter to ensure it does not contain any malicious path traversal sequences.

4. model_path in generate_detections_local Method

The model_path is constructed using the model_key parameter and a predefined directory.

Potential Issue:

  • If the model_key parameter is not sanitized, it could lead to path traversal attacks.

Recommendation:

  • Ensure that the model_key parameter is sanitized before constructing the model_path.

Code Recommendations

Sanitizing output_folder

from pathlib import Path

def sanitize_path(path: str) -> str:
    # Resolve the path to its absolute form and ensure it is within a safe directory
    safe_base = Path("/safe/directory").resolve()
    resolved_path = (safe_base / path).resolve()
    if not resolved_path.is_relative_to(safe_base):
        raise ValueError("Unsafe path detected")
    return str(resolved_path)

# Usage in the constructor
self.output_folder = sanitize_path(output_folder) if output_folder else None

Validating stream_url

from urllib.parse import urlparse

def validate_url(url: str) -> str:
    result = urlparse(url)
    if all([result.scheme, result.netloc]):
        return url
    else:
        raise ValueError("Invalid URL")

# Usage in the run_detection method
stream_url = validate_url(stream_url)

Sanitizing model_key

import re

def sanitize_model_key(model_key: str) -> str:
    if not re.match(r'^[a-zA-Z0-9_-]+$', model_key):
        raise ValueError("Invalid model key")
    return model_key

# Usage in the constructor and generate_detections_local method
self.model_key = sanitize_model_key(model_key)
model_path = Path('models/pt/') / f"best_{self.model_key}.pt"

Conclusion

The primary issue in this file is the lack of validation and sanitization of user inputs and external data used in path expressions. By implementing the recommended changes, we can mitigate the risk of path traversal attacks and ensure the security of the application.

Changes on src/stream_viewer.py

Analysis Report for src/stream_viewer.py

Overview

The StreamViewer class in src/stream_viewer.py is designed to handle the viewing of video streams from various sources such as RTSP and HTTP. The class initializes with a stream URL and a window name, captures frames from the video stream, and displays them in a window.

Potential Security Issue

The primary concern in this file, as per the issue description, is the use of uncontrolled data in path expressions. Specifically, the stream_url parameter is taken as an input and used directly to initialize the video capture object (cv2.VideoCapture). This could potentially lead to security issues if the stream_url is not properly validated or sanitized.

Key Areas to Review

  1. Initialization of stream_url:

    • The stream_url is passed directly to cv2.VideoCapture without any validation.
    • If stream_url is derived from user input or an external source, it could be manipulated to point to unintended resources.
  2. Display Stream Method:

    • The method display_stream continuously captures and displays frames from the video stream.
    • There is no direct path manipulation here, but the integrity of the stream URL is crucial.

Recommendations

  1. Sanitize and Validate stream_url:

    • Ensure that the stream_url is validated to confirm it is a legitimate URL and points to an expected resource.
    • Consider using a whitelist of allowed domains or patterns to restrict the URLs that can be used.
  2. Error Handling:

    • Improve error handling to provide more informative messages and possibly log suspicious activities.
    • For example, if the stream URL fails to initialize, it should provide a clear error message.

Example Implementation

Here is an example of how you might implement URL validation and sanitization:

from urllib.parse import urlparse
import re

class StreamViewer:
    """
    A class to handle the viewing of video streams (RTSP, HTTP, etc.).
    """

    def __init__(self, stream_url: str, window_name: str = 'Stream Viewer'):
        """
        Initialises the StreamViewer instance with a stream URL
        and a window name.

        Args:
            stream_url (str): The URL of the video stream.
            window_name (str): The name of the window where the stream
                               will be displayed.
        """
        if not self.is_valid_url(stream_url):
            raise ValueError(f"Invalid stream URL: {stream_url}")

        self.stream_url = stream_url
        self.window_name = window_name
        self.cap = cv2.VideoCapture(self.stream_url)

    def is_valid_url(self, url: str) -> bool:
        """
        Validates the given URL to ensure it is a proper HTTP/RTSP URL.

        Args:
            url (str): The URL to validate.

        Returns:
            bool: True if the URL is valid, False otherwise.
        """
        parsed_url = urlparse(url)
        if parsed_url.scheme not in ['http', 'https', 'rtsp']:
            return False

        # Optional: Add more checks, such as allowed domains or patterns
        allowed_domains = ['kctmc.nat.gov.tw']
        if parsed_url.hostname not in allowed_domains:
            return False

        return True

    def display_stream(self):
        """
        Displays the video stream in a window.

        Continuously captures frames from the video stream and displays them.
        The loop breaks when 'q' is pressed or if the stream cannot be
        retrieved.
        """
        while True:
            # Capture the next frame from the stream.
            ret, frame = self.cap.read()

            # If the frame was successfully retrieved.
            if ret:
                # Display the video frame.
                cv2.imshow(self.window_name, frame)

                # Break the loop if 'q' is pressed.
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            else:
                print('Failed to retrieve frame.')
                break

        # Release the video capture object and close all OpenCV windows.
        self.release_resources()

    def release_resources(self):
        """
        Releases resources used by the StreamViewer.
        """
        self.cap.release()
        cv2.destroyAllWindows()


if __name__ == '__main__':
    # Replace 'video_url' with your stream URL.
    video_url = (
        'https://cctv4.kctmc.nat.gov.tw/50204bfc/'
    )
    viewer = StreamViewer(video_url)
    viewer.display_stream()

Conclusion

The primary issue in this file is the lack of validation for the stream_url parameter. By implementing URL validation and sanitization, we can mitigate the risk of using uncontrolled data in path expressions and ensure the security of the application.

Changes on src/model_fetcher.py

Analysis Report for src/model_fetcher.py

Overview

The src/model_fetcher.py script is responsible for downloading model files from specified URLs and saving them to a local directory. The main function iterates over a dictionary of model names and URLs, invoking the download_model function to handle the download process.

Potential Security Issue

The primary concern in this script is the use of uncontrolled data in path expressions, which can lead to security vulnerabilities such as path traversal attacks. Specifically, the model_name parameter is used to construct file paths without any validation or sanitization.

Detailed Analysis

  1. Path Construction:

    LOCAL_MODEL_DIRECTORY = Path('models/pt/')
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name
    • The model_name is directly used to construct the local_file_path. If model_name contains malicious input (e.g., ../ sequences), it could lead to path traversal, allowing an attacker to write files outside the intended directory.
  2. File Existence Check:

    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return
    • This check ensures that the file is not downloaded if it already exists, but it does not mitigate the risk of path traversal.
  3. File Writing:

    with open(local_file_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
    • The file is written to the path specified by local_file_path, which could be manipulated if model_name is not properly sanitized.

Recommendations

To mitigate the risk of path traversal and ensure the safety of file paths, the following steps should be taken:

  1. Sanitize model_name:

    • Ensure that model_name does not contain any path traversal sequences or other malicious input. This can be done by validating the input against a whitelist of allowed characters or patterns.
  2. Use Safe Path Construction:

    • Use functions that safely join paths and prevent traversal, such as os.path.join with additional checks.
  3. Implement Validation:

    • Add validation logic to ensure that the constructed path is within the intended directory.

Example Implementation

Here is an example of how to implement these recommendations:

from __future__ import annotations

from pathlib import Path
from typing import TypedDict
import os
import requests


class ModelInfo(TypedDict):
    model_name: str
    url: str


def sanitize_model_name(model_name: str) -> str:
    # Allow only alphanumeric characters and underscores in model names
    if not model_name.isalnum() and "_" not in model_name:
        raise ValueError("Invalid model name")
    return model_name


def download_model(model_name, url):
    """
    Download a model file if it doesn't already exist.

    Args:
        model_name (str): The name of the model file.
        url (str): The URL of the model file.
    """
    # Sanitize the model name
    model_name = sanitize_model_name(model_name)

    # Define the local directory to store the model files
    LOCAL_MODEL_DIRECTORY = Path('models/pt/')

    # Ensure the local directory exists
    LOCAL_MODEL_DIRECTORY.mkdir(parents=True, exist_ok=True)
    # Build the full local file path
    local_file_path = LOCAL_MODEL_DIRECTORY / model_name

    # Check if the model already exists
    if local_file_path.exists():
        print(f"'{model_name}' exists. Skipping download.")
        return

    # Send an HTTP GET request to fetch the model file
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        # Begin the download and write to the file
        with open(local_file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"'{model_name}' saved to '{local_file_path}'.")
    else:
        print(f"Error downloading '{model_name}': {response.status_code}")


def main():
    # Define the URLs for the model files
    MODEL_URLS = {
        'best_yolov8l.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8l.pt',
        'best_yolov8x.pt': 'http://changdar-server.mooo.com:28000/models/best_yolov8x.pt',
    }

    # Iterate over all models and download them if they don't already exist
    for model_name, url in MODEL_URLS.items():
        download_model(model_name, url)


if __name__ == '__main__':
    main()

Conclusion

The primary issue in the src/model_fetcher.py file is the lack of sanitization and validation of the model_name parameter, which can lead to path traversal vulnerabilities. By implementing the recommended changes, the script can be made more secure and resilient against such attacks.

Disclaimer: This comment was entirely generated using AI. Be aware that the information provided may be incorrect.

Current plan usage: 40.38%

Have feedback or need help?
Discord
Documentation
[email protected]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant