forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pythongh-120754: Add a strace helper and test set of syscalls for ope…
…n().read(), Take 2 (python#123413)
- Loading branch information
Showing
4 changed files
with
329 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
import re | ||
import sys | ||
import textwrap | ||
import unittest | ||
from dataclasses import dataclass | ||
from functools import cache | ||
from test import support | ||
from test.support.script_helper import run_python_until_end | ||
|
||
_strace_binary = "/usr/bin/strace" | ||
_syscall_regex = re.compile( | ||
r"(?P<syscall>[^(]*)\((?P<args>[^)]*)\)\s*[=]\s*(?P<returncode>.+)") | ||
_returncode_regex = re.compile( | ||
br"\+\+\+ exited with (?P<returncode>\d+) \+\+\+") | ||
|
||
|
||
@dataclass | ||
class StraceEvent: | ||
syscall: str | ||
args: list[str] | ||
returncode: str | ||
|
||
|
||
@dataclass | ||
class StraceResult: | ||
strace_returncode: int | ||
python_returncode: int | ||
|
||
"""The event messages generated by strace. This is very similar to the | ||
stderr strace produces with returncode marker section removed.""" | ||
event_bytes: bytes | ||
stdout: bytes | ||
stderr: bytes | ||
|
||
def events(self): | ||
"""Parse event_bytes data into system calls for easier processing. | ||
This assumes the program under inspection doesn't print any non-utf8 | ||
strings which would mix into the strace output.""" | ||
decoded_events = self.event_bytes.decode('utf-8') | ||
matches = [ | ||
_syscall_regex.match(event) | ||
for event in decoded_events.splitlines() | ||
] | ||
return [ | ||
StraceEvent(match["syscall"], | ||
[arg.strip() for arg in (match["args"].split(","))], | ||
match["returncode"]) for match in matches if match | ||
] | ||
|
||
def sections(self): | ||
"""Find all "MARK <X>" writes and use them to make groups of events. | ||
This is useful to avoid variable / overhead events, like those at | ||
interpreter startup or when opening a file so a test can verify just | ||
the small case under study.""" | ||
current_section = "__startup" | ||
sections = {current_section: []} | ||
for event in self.events(): | ||
if event.syscall == 'write' and len( | ||
event.args) > 2 and event.args[1].startswith("\"MARK "): | ||
# Found a new section, don't include the write in the section | ||
# but all events until next mark should be in that section | ||
current_section = event.args[1].split( | ||
" ", 1)[1].removesuffix('\\n"') | ||
if current_section not in sections: | ||
sections[current_section] = list() | ||
else: | ||
sections[current_section].append(event) | ||
|
||
return sections | ||
|
||
|
||
@support.requires_subprocess() | ||
def strace_python(code, strace_flags, check=True): | ||
"""Run strace and return the trace. | ||
Sets strace_returncode and python_returncode to `-1` on error.""" | ||
res = None | ||
|
||
def _make_error(reason, details): | ||
return StraceResult( | ||
strace_returncode=-1, | ||
python_returncode=-1, | ||
event_bytes=f"error({reason},details={details}) = -1".encode('utf-8'), | ||
stdout=res.out if res else b"", | ||
stderr=res.err if res else b"") | ||
|
||
# Run strace, and get out the raw text | ||
try: | ||
res, cmd_line = run_python_until_end( | ||
"-c", | ||
textwrap.dedent(code), | ||
__run_using_command=[_strace_binary] + strace_flags) | ||
except OSError as err: | ||
return _make_error("Caught OSError", err) | ||
|
||
if check and res.rc: | ||
res.fail(cmd_line) | ||
|
||
# Get out program returncode | ||
stripped = res.err.strip() | ||
output = stripped.rsplit(b"\n", 1) | ||
if len(output) != 2: | ||
return _make_error("Expected strace events and exit code line", | ||
stripped[-50:]) | ||
|
||
returncode_match = _returncode_regex.match(output[1]) | ||
if not returncode_match: | ||
return _make_error("Expected to find returncode in last line.", | ||
output[1][:50]) | ||
|
||
python_returncode = int(returncode_match["returncode"]) | ||
if check and python_returncode: | ||
res.fail(cmd_line) | ||
|
||
return StraceResult(strace_returncode=res.rc, | ||
python_returncode=python_returncode, | ||
event_bytes=output[0], | ||
stdout=res.out, | ||
stderr=res.err) | ||
|
||
|
||
def get_events(code, strace_flags, prelude, cleanup): | ||
# NOTE: The flush is currently required to prevent the prints from getting | ||
# buffered and done all at once at exit | ||
prelude = textwrap.dedent(prelude) | ||
code = textwrap.dedent(code) | ||
cleanup = textwrap.dedent(cleanup) | ||
to_run = f""" | ||
print("MARK prelude", flush=True) | ||
{prelude} | ||
print("MARK code", flush=True) | ||
{code} | ||
print("MARK cleanup", flush=True) | ||
{cleanup} | ||
print("MARK __shutdown", flush=True) | ||
""" | ||
trace = strace_python(to_run, strace_flags) | ||
all_sections = trace.sections() | ||
return all_sections['code'] | ||
|
||
|
||
def get_syscalls(code, strace_flags, prelude="", cleanup=""): | ||
"""Get the syscalls which a given chunk of python code generates""" | ||
events = get_events(code, strace_flags, prelude=prelude, cleanup=cleanup) | ||
return [ev.syscall for ev in events] | ||
|
||
|
||
# Moderately expensive (spawns a subprocess), so share results when possible. | ||
@cache | ||
def _can_strace(): | ||
res = strace_python("import sys; sys.exit(0)", [], check=False) | ||
assert res.events(), "Should have parsed multiple calls" | ||
|
||
return res.strace_returncode == 0 and res.python_returncode == 0 | ||
|
||
|
||
def requires_strace(): | ||
if sys.platform != "linux": | ||
return unittest.skip("Linux only, requires strace.") | ||
|
||
if support.check_sanitizer(address=True, memory=True): | ||
return unittest.skip("LeakSanitizer does not work under ptrace (strace, gdb, etc)") | ||
|
||
return unittest.skipUnless(_can_strace(), "Requires working strace") | ||
|
||
|
||
__all__ = ["get_events", "get_syscalls", "requires_strace", "strace_python", | ||
"StraceEvent", "StraceResult"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.