diff --git a/examples/complex.py b/examples/complex.py index 9cc3262b..9c1c1d94 100644 --- a/examples/complex.py +++ b/examples/complex.py @@ -1,16 +1,7 @@ -import getpass -import imaplib +import os +import tempfile -M = imaplib.IMAP4() - -#M.starttls() - -M.login(getpass.getuser(), getpass.getpass()) -M.select() -typ, data = M.search(None, 'ALL') -for num in data[0].split(): - typ, data = M.fetch(num, '(RFC822)') - print('Message %s\n%s\n' % (num, data[0][1])) -M.close() -M.logout() +filename = tempfile.mktemp('', 'tmp', None) +with os.open(filename, "w+", buffering=-1, encoding=None, errors=None, newline=None) as f: + f.write(b"Hello World!\n") diff --git a/precli/parsers/python.py b/precli/parsers/python.py index a9ebfa6c..2d582524 100644 --- a/precli/parsers/python.py +++ b/precli/parsers/python.py @@ -1,5 +1,6 @@ # Copyright 2023 Secure Saurce LLC import ast +import builtins import re from collections import namedtuple @@ -31,6 +32,8 @@ def file_extension(self) -> str: def visit_module(self, nodes: list[Node]): self.suppressions = {} self.current_symtab = SymbolTable("") + for builtin in dir(builtins): + self.current_symtab.put(builtin, "import", builtin) self.visit(nodes) self.current_symtab = self.current_symtab.parent() @@ -116,10 +119,15 @@ def visit_assignment(self, nodes: list[Node]): right_hand = self.literal_value(nodes[2], default=nodes[2]) self.current_symtab.put(left_hand, "identifier", right_hand) if nodes[2].type == "call": + (call_args, call_kwargs) = self.get_func_args( + nodes[2].children[1] + ) call = Call( node=nodes[2], name=right_hand, name_qual=right_hand, + args=call_args, + kwargs=call_kwargs, ) symbol = self.current_symtab.get(left_hand) symbol.push_call(call) diff --git a/precli/renderers/detailed.py b/precli/renderers/detailed.py index 789ecef9..694fbc44 100644 --- a/precli/renderers/detailed.py +++ b/precli/renderers/detailed.py @@ -65,41 +65,63 @@ def render(self, results: list[Result], metrics: Metrics): ) self.console.print(code) - for fix in result.fixes: + if result.fixes: self.console.print( - f"Suggested fix: {fix.description}", + f"Suggested fix: {result.fixes[0].description}", style=style, ) + + highlight_lines = set() + for fix in result.fixes: + highlight_lines.add(fix.deleted_location.start_line) + highlight_lines.add(fix.deleted_location.end_line) + + for fix in result.fixes: start_line = fix.deleted_location.start_line end_line = fix.deleted_location.end_line start_column = fix.deleted_location.start_column end_column = fix.deleted_location.end_column - line_before = linecache.getline( - filename=result.location.file_name, - lineno=start_line - 1, - ) + + if (start_line - 1) in highlight_lines: + line_before = "" + before = 0 + else: + line_before = linecache.getline( + filename=result.location.file_name, + lineno=start_line - 1, + ) + before = 1 + code = linecache.getline( filename=result.location.file_name, lineno=start_line, ) - line_after = linecache.getline( - filename=result.location.file_name, - lineno=start_line + 1, - ) + + if (start_line + 1) in highlight_lines: + line_after = "" + after = 0 + else: + line_after = linecache.getline( + filename=result.location.file_name, + lineno=start_line + 1, + ) + after = 1 + code = ( code[:start_column] + fix.inserted_content + code[end_column:] ) code = line_before + code + line_after - for _ in range(start_line - 2): + for _ in range(start_line - 1 - before): code = "\n" + code + code = syntax.Syntax( code, "python", line_numbers=True, - line_range=(start_line - 1, end_line + 1), - highlight_lines=(start_line, end_line), + line_range=(start_line - before, end_line + after), + highlight_lines=highlight_lines, ) self.console.print(code) self.console.print() diff --git a/precli/rules/python/stdlib/tempfile/mktemp_race_condition.py b/precli/rules/python/stdlib/tempfile/mktemp_race_condition.py new file mode 100644 index 00000000..e138e7fc --- /dev/null +++ b/precli/rules/python/stdlib/tempfile/mktemp_race_condition.py @@ -0,0 +1,187 @@ +# Copyright 2023 Secure Saurce LLC +r""" +============================================== +Insecure Temporary File in the Tempfile Module +============================================== + +The tempfile.mktemp function in Python is a legacy method for creating +temporary files with a unique name. It is important to note that this function +is susceptible to race conditions, which can occur when multiple processes or +threads attempt to create temporary files concurrently. These race conditions +may lead to unintended behavior, data corruption, or security vulnerabilities +in your code. + +------- +Example +------- + +.. code-block:: python + :linenos: + :emphasize-lines: 4 + + import tempfile + + + filename = tempfile.mktemp(suffix='', prefix='tmp', dir=None) + with open(filename) as f: + f.write(b"Hello World!\n") + +----------- +Remediation +----------- + +To ensure the reliability and security of your temporary file management, +consider using NamedTemporaryFile. The tempfile.NamedTemporaryFile class +automatically handles the generation of unique filenames, proper file closure, +and cleanup when the file is no longer needed. + +.. code-block:: python + :linenos: + :emphasize-lines: 4 + + import tempfile + + + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(b"Hello World!\n") + +.. seealso:: + + - `tempfile — Generate temporary files and directories `_ + - `CWE-377: Insecure Temporary File `_ + +.. versionadded:: 1.0.0 + +""" # noqa: E501 +from precli.core.fix import Fix +from precli.core.location import Location +from precli.core.result import Result +from precli.rules import Rule + + +class MktempRaceCondition(Rule): + def __init__(self, id: str): + super().__init__( + id=id, + name="insecure_temporary_file", + full_descr=__doc__, + cwe_id=377, + message="The function '{}' can allow insecure ways of creating " + "temporary files and directories that can lead to race " + "conditions.", + targets=("call"), + wildcards={ + "os.*": [ + "open", + ], + "tempfile.*": [ + "mktemp", + ], + }, + ) + + def analyze(self, context: dict, **kwargs: dict) -> Result: + call = kwargs.get("call") + + # TODO: the builtin open can't be found because not imported + if call.name_qualified in ["open"]: + file_arg = call.get_argument(position=0, name="file") + + if ( + file_arg.node.type == "identifier" + and file_arg.value == "tempfile.mktemp" + ): + """ + open( + file, + mode='r', + buffering=-1, + encoding=None, + errors=None, + newline=None, + closefd=True, + opener=None + ) + """ + arg_list = [] + mode = call.get_argument(position=1, name="mode").node + mode = mode.text.decode() if mode is not None else '"r"' + arg_list.append(f"mode={mode}") + + buff = call.get_argument(position=2, name="buffering").node + if buff is not None: + arg_list.append(f"buffering={buff.text.decode()}") + + enc = call.get_argument(position=3, name="encoding").node + if enc is not None: + arg_list.append(f"encoding={enc.text.decode()}") + + newline = call.get_argument(position=5, name="newline").node + if newline is not None: + arg_list.append(f"newline={newline.text.decode()}") + + # mktemp(suffix='', prefix='tmp', dir=None) + symbol = context["symtab"].get(file_arg.node.text.decode()) + init_call = symbol.call_history[0] + + suffix = init_call.get_argument(position=0, name="suffix").node + if suffix is not None: + arg_list.append(f"suffix={suffix.text.decode()}") + + prefix = init_call.get_argument(position=1, name="prefix").node + if prefix is not None: + arg_list.append(f"prefix={prefix.text.decode()}") + + dirs = init_call.get_argument(position=2, name="dir").node + if dirs is not None: + arg_list.append(f"dir={dirs.text.decode()}") + + arg_list.append("delete=False") + + errors = call.get_argument(position=4, name="errors").node + if errors is not None: + arg_list.append(f"errors={errors.text.decode()}") + arg_str = ", ".join(arg_list) + + """ + NamedTemporaryFile( + mode='w+b', + buffering=-1, + encoding=None, + newline=None, + suffix=None, + prefix=None, + dir=None, + delete=True, + *, + errors=None + ) + """ + fixes = [ + Fix( + description="Use the 'NamedTemporaryFile' class to " + "generate a unique filename, do proper file closure, " + "and cleanup.", + deleted_location=Location(node=init_call.node.parent), + inserted_content="", + ), + Fix( + description="Use the 'NamedTemporaryFile' class to " + "generate a unique filename, do proper file closure, " + "and cleanup.", + deleted_location=Location(node=call.node), + inserted_content=( + f"tempfile.NamedTemporaryFile({arg_str})" + ), + ), + ] + + return Result( + rule_id=self.id, + location=Location( + file_name=context["file_name"], + node=call.function_node, + ), + message=self.message.format(file_arg.value), + fixes=fixes, + ) diff --git a/setup.cfg b/setup.cfg index 600c4b5a..d8c00324 100644 --- a/setup.cfg +++ b/setup.cfg @@ -84,6 +84,9 @@ precli.rules.python = # precli/rules/python/stdlib/telnetlib/telnetlib_cleartext.py PRE0018 = precli.rules.python.stdlib.telnetlib.telnetlib_cleartext:TelnetlibCleartext + # precli/rules/python/stdlib/tempfile/mktemp_race_condition.py + PRE0019 = precli.rules.python.stdlib.tempfile.mktemp_race_condition:MktempRaceCondition + # precli/rules/python/third_party/cryptography/cryptography_weak_hash.py PRE0501 = precli.rules.python.third_party.cryptography.cryptography_weak_hash:CryptographyWeakHash diff --git a/tests/unit/rules/python/stdlib/tempfile/__init__.py b/tests/unit/rules/python/stdlib/tempfile/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp.py b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp.py new file mode 100644 index 00000000..96e3f97f --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp.py @@ -0,0 +1,4 @@ +import tempfile + + +filename = tempfile.mktemp() diff --git a/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_args_open.py b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_args_open.py new file mode 100644 index 00000000..fdeb15a1 --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_args_open.py @@ -0,0 +1,7 @@ +import tempfile + + +filename = tempfile.mktemp("", "tmp", dir=None) +f = open(filename, "w+") +f.write(b"Hello World!\n") +f.close() diff --git a/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_args_with_open_args.py b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_args_with_open_args.py new file mode 100644 index 00000000..052e4279 --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_args_with_open_args.py @@ -0,0 +1,8 @@ +import tempfile + + +filename = tempfile.mktemp("", "tmp", dir=None) +with open( + filename, "w+", buffering=-1, encoding=None, errors=None, newline=None +) as f: + f.write(b"Hello World!\n") diff --git a/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_open.py b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_open.py new file mode 100644 index 00000000..a4f65601 --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_open.py @@ -0,0 +1,7 @@ +import tempfile + + +filename = tempfile.mktemp() +f = open(filename, "w+") +f.write(b"Hello World!\n") +f.close() diff --git a/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_with_open.py b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_with_open.py new file mode 100644 index 00000000..122e814f --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_with_open.py @@ -0,0 +1,6 @@ +import tempfile + + +filename = tempfile.mktemp() +with open(filename, "w+") as f: + f.write(b"Hello World!\n") diff --git a/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_with_open_multiline.py b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_with_open_multiline.py new file mode 100644 index 00000000..052e4279 --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/examples/tempfile_mktemp_with_open_multiline.py @@ -0,0 +1,8 @@ +import tempfile + + +filename = tempfile.mktemp("", "tmp", dir=None) +with open( + filename, "w+", buffering=-1, encoding=None, errors=None, newline=None +) as f: + f.write(b"Hello World!\n") diff --git a/tests/unit/rules/python/stdlib/tempfile/test_mktemp_race_condition.py b/tests/unit/rules/python/stdlib/tempfile/test_mktemp_race_condition.py new file mode 100644 index 00000000..cc6cc9df --- /dev/null +++ b/tests/unit/rules/python/stdlib/tempfile/test_mktemp_race_condition.py @@ -0,0 +1,110 @@ +# Copyright 2023 Secure Saurce LLC +import os + +from precli.core.level import Level +from precli.rules import Rule +from tests.unit.rules.python import test_case + + +class MktempRaceConditionTests(test_case.TestCase): + def setUp(self): + super().setUp() + self.base_path = os.path.join( + "tests", + "unit", + "rules", + "python", + "stdlib", + "tempfile", + "examples", + ) + + def test_smtp_cleartext_rule_meta(self): + rule = Rule.get_by_id("PRE0019") + self.assertEqual("PRE0019", rule.id) + self.assertEqual("insecure_temporary_file", rule.name) + self.assertEqual("", rule.help_url) + self.assertEqual(True, rule.default_config.enabled) + self.assertEqual(Level.WARNING, rule.default_config.level) + self.assertEqual(-1.0, rule.default_config.rank) + self.assertEqual("377", rule.cwe.cwe_id) + + def test_tempfile_mktemp(self): + results = self.parser.parse( + os.path.join(self.base_path, "tempfile_mktemp.py") + ) + self.assertEqual(0, len(results)) + + def test_tempfile_mktemp_args_open(self): + results = self.parser.parse( + os.path.join(self.base_path, "tempfile_mktemp_args_open.py") + ) + self.assertEqual(1, len(results)) + result = results[0] + self.assertEqual("PRE0019", result.rule_id) + self.assertEqual(5, result.location.start_line) + self.assertEqual(5, result.location.end_line) + self.assertEqual(4, result.location.start_column) + self.assertEqual(8, result.location.end_column) + self.assertEqual(Level.WARNING, result.level) + self.assertEqual(-1.0, result.rank) + + def test_tempfile_mktemp_args_with_open_args(self): + results = self.parser.parse( + os.path.join( + self.base_path, "tempfile_mktemp_args_with_open_args.py" + ) + ) + self.assertEqual(1, len(results)) + result = results[0] + self.assertEqual("PRE0019", result.rule_id) + self.assertEqual(5, result.location.start_line) + self.assertEqual(5, result.location.end_line) + self.assertEqual(5, result.location.start_column) + self.assertEqual(9, result.location.end_column) + self.assertEqual(Level.WARNING, result.level) + self.assertEqual(-1.0, result.rank) + + def test_tempfile_mktemp_open(self): + results = self.parser.parse( + os.path.join(self.base_path, "tempfile_mktemp_open.py") + ) + self.assertEqual(1, len(results)) + result = results[0] + self.assertEqual("PRE0019", result.rule_id) + self.assertEqual(5, result.location.start_line) + self.assertEqual(5, result.location.end_line) + self.assertEqual(4, result.location.start_column) + self.assertEqual(8, result.location.end_column) + self.assertEqual(Level.WARNING, result.level) + self.assertEqual(-1.0, result.rank) + + def test_tempfile_mktemp_with_open(self): + results = self.parser.parse( + os.path.join(self.base_path, "tempfile_mktemp_with_open.py") + ) + self.assertEqual(1, len(results)) + result = results[0] + self.assertEqual("PRE0019", result.rule_id) + self.assertEqual(5, result.location.start_line) + self.assertEqual(5, result.location.end_line) + self.assertEqual(5, result.location.start_column) + self.assertEqual(9, result.location.end_column) + self.assertEqual(Level.WARNING, result.level) + self.assertEqual(-1.0, result.rank) + + def test_tempfile_mktemp_with_open_multiline(self): + results = self.parser.parse( + os.path.join( + self.base_path, "tempfile_mktemp_with_open_multiline.py" + ) + ) + self.assertEqual(1, len(results)) + result = results[0] + self.assertEqual("PRE0019", result.rule_id) + self.assertEqual(5, result.location.start_line) + self.assertEqual(5, result.location.end_line) + self.assertEqual(5, result.location.start_column) + self.assertEqual(9, result.location.end_column) + self.assertEqual(Level.WARNING, result.level) + self.assertEqual(-1.0, result.rank)