Skip to content

Commit

Permalink
New rule on tempfile.mktemp (#132)
Browse files Browse the repository at this point in the history
The mktemp function is vulnerable to a race condition where there is a
window where the file could be created in between when the filename was
created and the file is opened.

The solution is to use NamedTemporaryFile as an atomic operation.

Signed-off-by: Eric Brown <[email protected]>
  • Loading branch information
ericwb authored Aug 25, 2023
1 parent e9362fd commit 042a6ce
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 27 deletions.
19 changes: 5 additions & 14 deletions examples/complex.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
import getpass
import imaplib
import os
import tempfile


M = imaplib.IMAP4()

#M.starttls()

M.login(getpass.getuser(), getpass.getpass())
M.select()
typ, data = M.search(None, 'ALL')
for num in data[0].split():
typ, data = M.fetch(num, '(RFC822)')
print('Message %s\n%s\n' % (num, data[0][1]))
M.close()
M.logout()
filename = tempfile.mktemp('', 'tmp', None)
with os.open(filename, "w+", buffering=-1, encoding=None, errors=None, newline=None) as f:
f.write(b"Hello World!\n")
8 changes: 8 additions & 0 deletions precli/parsers/python.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2023 Secure Saurce LLC
import ast
import builtins
import re
from collections import namedtuple

Expand Down Expand Up @@ -31,6 +32,8 @@ def file_extension(self) -> str:
def visit_module(self, nodes: list[Node]):
self.suppressions = {}
self.current_symtab = SymbolTable("<module>")
for builtin in dir(builtins):
self.current_symtab.put(builtin, "import", builtin)
self.visit(nodes)
self.current_symtab = self.current_symtab.parent()

Expand Down Expand Up @@ -116,10 +119,15 @@ def visit_assignment(self, nodes: list[Node]):
right_hand = self.literal_value(nodes[2], default=nodes[2])
self.current_symtab.put(left_hand, "identifier", right_hand)
if nodes[2].type == "call":
(call_args, call_kwargs) = self.get_func_args(
nodes[2].children[1]
)
call = Call(
node=nodes[2],
name=right_hand,
name_qual=right_hand,
args=call_args,
kwargs=call_kwargs,
)
symbol = self.current_symtab.get(left_hand)
symbol.push_call(call)
Expand Down
48 changes: 35 additions & 13 deletions precli/renderers/detailed.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,41 +65,63 @@ def render(self, results: list[Result], metrics: Metrics):
)
self.console.print(code)

for fix in result.fixes:
if result.fixes:
self.console.print(
f"Suggested fix: {fix.description}",
f"Suggested fix: {result.fixes[0].description}",
style=style,
)

highlight_lines = set()
for fix in result.fixes:
highlight_lines.add(fix.deleted_location.start_line)
highlight_lines.add(fix.deleted_location.end_line)

for fix in result.fixes:
start_line = fix.deleted_location.start_line
end_line = fix.deleted_location.end_line
start_column = fix.deleted_location.start_column
end_column = fix.deleted_location.end_column
line_before = linecache.getline(
filename=result.location.file_name,
lineno=start_line - 1,
)

if (start_line - 1) in highlight_lines:
line_before = ""
before = 0
else:
line_before = linecache.getline(
filename=result.location.file_name,
lineno=start_line - 1,
)
before = 1

code = linecache.getline(
filename=result.location.file_name,
lineno=start_line,
)
line_after = linecache.getline(
filename=result.location.file_name,
lineno=start_line + 1,
)

if (start_line + 1) in highlight_lines:
line_after = ""
after = 0
else:
line_after = linecache.getline(
filename=result.location.file_name,
lineno=start_line + 1,
)
after = 1

code = (
code[:start_column]
+ fix.inserted_content
+ code[end_column:]
)
code = line_before + code + line_after
for _ in range(start_line - 2):
for _ in range(start_line - 1 - before):
code = "\n" + code

code = syntax.Syntax(
code,
"python",
line_numbers=True,
line_range=(start_line - 1, end_line + 1),
highlight_lines=(start_line, end_line),
line_range=(start_line - before, end_line + after),
highlight_lines=highlight_lines,
)
self.console.print(code)
self.console.print()
Expand Down
187 changes: 187 additions & 0 deletions precli/rules/python/stdlib/tempfile/mktemp_race_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright 2023 Secure Saurce LLC
r"""
==============================================
Insecure Temporary File in the Tempfile Module
==============================================
The tempfile.mktemp function in Python is a legacy method for creating
temporary files with a unique name. It is important to note that this function
is susceptible to race conditions, which can occur when multiple processes or
threads attempt to create temporary files concurrently. These race conditions
may lead to unintended behavior, data corruption, or security vulnerabilities
in your code.
-------
Example
-------
.. code-block:: python
:linenos:
:emphasize-lines: 4
import tempfile
filename = tempfile.mktemp(suffix='', prefix='tmp', dir=None)
with open(filename) as f:
f.write(b"Hello World!\n")
-----------
Remediation
-----------
To ensure the reliability and security of your temporary file management,
consider using NamedTemporaryFile. The tempfile.NamedTemporaryFile class
automatically handles the generation of unique filenames, proper file closure,
and cleanup when the file is no longer needed.
.. code-block:: python
:linenos:
:emphasize-lines: 4
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b"Hello World!\n")
.. seealso::
- `tempfile — Generate temporary files and directories <https://docs.python.org/3/library/tempfile.html#tempfile.mktemp>`_
- `CWE-377: Insecure Temporary File <https://cwe.mitre.org/data/definitions/377.html>`_
.. versionadded:: 1.0.0
""" # noqa: E501
from precli.core.fix import Fix
from precli.core.location import Location
from precli.core.result import Result
from precli.rules import Rule


class MktempRaceCondition(Rule):
def __init__(self, id: str):
super().__init__(
id=id,
name="insecure_temporary_file",
full_descr=__doc__,
cwe_id=377,
message="The function '{}' can allow insecure ways of creating "
"temporary files and directories that can lead to race "
"conditions.",
targets=("call"),
wildcards={
"os.*": [
"open",
],
"tempfile.*": [
"mktemp",
],
},
)

def analyze(self, context: dict, **kwargs: dict) -> Result:
call = kwargs.get("call")

# TODO: the builtin open can't be found because not imported
if call.name_qualified in ["open"]:
file_arg = call.get_argument(position=0, name="file")

if (
file_arg.node.type == "identifier"
and file_arg.value == "tempfile.mktemp"
):
"""
open(
file,
mode='r',
buffering=-1,
encoding=None,
errors=None,
newline=None,
closefd=True,
opener=None
)
"""
arg_list = []
mode = call.get_argument(position=1, name="mode").node
mode = mode.text.decode() if mode is not None else '"r"'
arg_list.append(f"mode={mode}")

buff = call.get_argument(position=2, name="buffering").node
if buff is not None:
arg_list.append(f"buffering={buff.text.decode()}")

enc = call.get_argument(position=3, name="encoding").node
if enc is not None:
arg_list.append(f"encoding={enc.text.decode()}")

newline = call.get_argument(position=5, name="newline").node
if newline is not None:
arg_list.append(f"newline={newline.text.decode()}")

# mktemp(suffix='', prefix='tmp', dir=None)
symbol = context["symtab"].get(file_arg.node.text.decode())
init_call = symbol.call_history[0]

suffix = init_call.get_argument(position=0, name="suffix").node
if suffix is not None:
arg_list.append(f"suffix={suffix.text.decode()}")

prefix = init_call.get_argument(position=1, name="prefix").node
if prefix is not None:
arg_list.append(f"prefix={prefix.text.decode()}")

dirs = init_call.get_argument(position=2, name="dir").node
if dirs is not None:
arg_list.append(f"dir={dirs.text.decode()}")

arg_list.append("delete=False")

errors = call.get_argument(position=4, name="errors").node
if errors is not None:
arg_list.append(f"errors={errors.text.decode()}")
arg_str = ", ".join(arg_list)

"""
NamedTemporaryFile(
mode='w+b',
buffering=-1,
encoding=None,
newline=None,
suffix=None,
prefix=None,
dir=None,
delete=True,
*,
errors=None
)
"""
fixes = [
Fix(
description="Use the 'NamedTemporaryFile' class to "
"generate a unique filename, do proper file closure, "
"and cleanup.",
deleted_location=Location(node=init_call.node.parent),
inserted_content="",
),
Fix(
description="Use the 'NamedTemporaryFile' class to "
"generate a unique filename, do proper file closure, "
"and cleanup.",
deleted_location=Location(node=call.node),
inserted_content=(
f"tempfile.NamedTemporaryFile({arg_str})"
),
),
]

return Result(
rule_id=self.id,
location=Location(
file_name=context["file_name"],
node=call.function_node,
),
message=self.message.format(file_arg.value),
fixes=fixes,
)
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ precli.rules.python =
# precli/rules/python/stdlib/telnetlib/telnetlib_cleartext.py
PRE0018 = precli.rules.python.stdlib.telnetlib.telnetlib_cleartext:TelnetlibCleartext

# precli/rules/python/stdlib/tempfile/mktemp_race_condition.py
PRE0019 = precli.rules.python.stdlib.tempfile.mktemp_race_condition:MktempRaceCondition

# precli/rules/python/third_party/cryptography/cryptography_weak_hash.py
PRE0501 = precli.rules.python.third_party.cryptography.cryptography_weak_hash:CryptographyWeakHash

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import tempfile


filename = tempfile.mktemp()
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import tempfile


filename = tempfile.mktemp("", "tmp", dir=None)
f = open(filename, "w+")
f.write(b"Hello World!\n")
f.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import tempfile


filename = tempfile.mktemp("", "tmp", dir=None)
with open(
filename, "w+", buffering=-1, encoding=None, errors=None, newline=None
) as f:
f.write(b"Hello World!\n")
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import tempfile


filename = tempfile.mktemp()
f = open(filename, "w+")
f.write(b"Hello World!\n")
f.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import tempfile


filename = tempfile.mktemp()
with open(filename, "w+") as f:
f.write(b"Hello World!\n")
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import tempfile


filename = tempfile.mktemp("", "tmp", dir=None)
with open(
filename, "w+", buffering=-1, encoding=None, errors=None, newline=None
) as f:
f.write(b"Hello World!\n")
Loading

0 comments on commit 042a6ce

Please sign in to comment.