Skip to content

Commit

Permalink
Refactor into run, tool, artifacts (#261)
Browse files Browse the repository at this point in the history
New class objects closely matching SARIF.

* New Tool class to represent information about precli itself
* New Run class to represent an instance of a run of the analysis
* New Artifact class representing the files, source language, and
contents.

Signed-off-by: Eric Brown <[email protected]>
  • Loading branch information
ericwb authored Feb 5, 2024
1 parent 007a2ff commit 50157d8
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 174 deletions.
1 change: 1 addition & 0 deletions precli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from importlib import metadata


__author__ = metadata.metadata("precli")["Author"]
__version__ = metadata.version("precli")
161 changes: 25 additions & 136 deletions precli/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,26 @@
# Copyright 2024 Secure Saurce LLC
import argparse
import io
import logging
import os
import sys
import tempfile
import traceback
import zipfile
from urllib.parse import urljoin
from urllib.parse import urlparse

import requests
from ignorelib import IgnoreFilterManager
from pygments import lexers
from rich import progress
from rich import syntax

import precli
from precli.core import loader
from precli.core.level import Level
from precli.core.metrics import Metrics
from precli.core.result import Result
from precli.core.artifact import Artifact
from precli.core.run import Run
from precli.core.tool import Tool
from precli.renderers.detailed import Detailed
from precli.renderers.json import Json
from precli.renderers.plain import Plain


LOG = logging.getLogger(__name__)
PROGRESS_THRESHOLD = 50


def _init_logger(log_level=logging.INFO):
"""Initialize the logger.
:param debug: Whether to enable debug mode
:return: An instantiated logging instance
"""
LOG.handlers = []
logging.captureWarnings(True)
LOG.setLevel(log_level)
logging.getLogger("urllib3").setLevel(log_level)
handler = logging.StreamHandler(sys.stderr)
LOG.addHandler(handler)
LOG.debug("logging initialized")


def setup_arg_parser():
parser = argparse.ArgumentParser(
description="precli - a static analysis security tool",
Expand Down Expand Up @@ -177,8 +153,7 @@ def file_to_url(owner, repo, branch, target, root, file):


def discover_files(targets: list[str], recursive: bool):
file_list = []
file_map = {}
artifacts = []

for target in targets:
if target.startswith("https://github.com"):
Expand All @@ -203,118 +178,26 @@ def discover_files(targets: list[str], recursive: bool):
for file in files:
if not preignore_mgr.is_ignored(file):
path = os.path.join(root, file)
file_list.append(path)
artifact = Artifact(path)
if repo:
file_map[path] = file_to_url(
artifact.uri = file_to_url(
owner, repo, branch, target, root, file
)
artifacts.append(artifact)
else:
files = os.listdir(path=target)
for file in files:
if not (
gitignore_mgr.is_ignored(file)
or preignore_mgr.is_ignored(file)
):
file_list.append(os.path.join(target, file))
artifact = Artifact(os.path.join(target, file))
artifacts.append(artifact)
else:
file_list.append(target)

return file_list, file_map
artifact = Artifact(target)
artifacts.append(artifact)


def run_checks(parsers: dict, file_list: list[str]) -> list[Result]:
"""Runs through all files in the scope
:return: -
"""
# if we have problems with a file, we'll remove it from the file_list
# and add it to the skipped list instead
new_file_list = list(file_list)
files_skipped = []
if (
len(file_list) > PROGRESS_THRESHOLD
and LOG.getEffectiveLevel() <= logging.INFO
):
files = progress.track(file_list)
else:
files = file_list

results = []
lines = 0
for fname in files:
try:
if fname == "-":
open_fd = os.fdopen(sys.stdin.fileno(), "rb", 0)
fdata = io.BytesIO(open_fd.read())
new_file_list = [
"<stdin>" if x == "-" else x for x in new_file_list
]
results += parse_file(
parsers, "<stdin>", fdata, new_file_list, files_skipped
)
else:
with open(fname, "rb") as fdata:
lines += sum(1 for _ in fdata)
with open(fname, "rb") as fdata:
results += parse_file(
parsers, fname, fdata, new_file_list, files_skipped
)
except OSError as e:
files_skipped.append((fname, e.strerror))
new_file_list.remove(fname)

metrics = Metrics(
files=len(new_file_list),
files_skipped=len(files_skipped),
lines=lines,
errors=sum(result.level == Level.ERROR for result in results),
warnings=sum(result.level == Level.WARNING for result in results),
notes=sum(result.level == Level.NOTE for result in results),
)

return results, metrics


def parse_file(
parsers: dict,
fname: str,
fdata: io.BufferedReader,
new_file_list: list,
files_skipped: list,
) -> list[Result]:
try:
data = fdata.read()

lexer_name = syntax.Syntax.guess_lexer(fname, data)
if lexer_name == "default":
lexer = lexers.guess_lexer(data)
lexer_name = lexer.aliases[0] if lexer.aliases else lexer.name

if lexer_name in parsers.keys():
LOG.debug("working on file : %s", fname)
parser = parsers[lexer_name]
return parser.parse(fname, data)
except KeyboardInterrupt:
sys.exit(2)
except SyntaxError as e:
print(
f"Syntax error while parsing file. ({e.filename}, "
f"line {e.lineno})",
file=sys.stderr,
)
files_skipped.append((fname, e))
new_file_list.remove(fname)
except Exception as e:
LOG.error(
f"Exception occurred when executing rules against "
f'{fname}. Run "precli --debug {fname}" to see the full '
f"traceback."
)
files_skipped.append((fname, "Exception while parsing file"))
new_file_list.remove(fname)
LOG.debug(f" Exception string: {e}")
LOG.debug(f" Exception traceback: {traceback.format_exc()}")
return []
return artifacts


def main():
Expand All @@ -325,7 +208,6 @@ def main():
or os.getenv("DEBUG") is not None
else logging.INFO
)
_init_logger(debug)

# Setup the command line arguments
args = setup_arg_parser()
Expand All @@ -335,12 +217,18 @@ def main():
parsers = loader.load_parsers(enabled, disabled)

# Compile a list of the targets
file_list, file_map = discover_files(args.targets, args.recursive)
artifacts = discover_files(args.targets, args.recursive)

# Initialize the run
tool = Tool("precli", precli.__author__, precli.__version__)
run = Run(tool, parsers, artifacts, debug)

results, metrics = run_checks(parsers, file_list)
# Invoke the run
run.invoke()

# Set the location url in the result if original target was URL based
for result in results:
"""
for result in run.results:
net_loc = file_map.get(result.location.file_name)
if net_loc is not None:
if result.location.start_line != result.location.end_line:
Expand All @@ -351,16 +239,17 @@ def main():
else:
lines = f"L{result.location.start_line}"
result.location.url = f"{net_loc}#{lines}"
"""

if args.json is True:
json = Json(args.no_color)
json.render(results, metrics)
json.render(run.results, run.metrics)
elif args.plain is True:
plain = Plain(args.no_color)
plain.render(results, metrics)
plain.render(run.results, run.metrics)
else:
detailed = Detailed(args.no_color)
detailed.render(results, metrics)
detailed.render(run.results, run.metrics)


if __name__ == "__main__":
Expand Down
86 changes: 86 additions & 0 deletions precli/core/artifact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2024 Secure Saurce LLC


class Artifact:
def __init__(self, file_name: str, uri: str = None):
self._file_name = file_name
# TODO: if uri is None, use file:///
self._uri = uri
self._contents = None
self._language = None

@property
def file_name(self) -> str:
"""
The name of the file.
:return: file name
:rtype: str
"""
return self._file_name

@file_name.setter
def file_name(self, file_name):
"""
Set the file name
:param str file_name: file name
"""
self._file_name = file_name

@property
def uri(self) -> str:
"""
The URI of the artifact.
:return: URI
:rtype: str
"""
return self._uri

@uri.setter
def uri(self, uri):
"""
Set the artifact URI.
:param str uri: URI
"""
self._uri = uri

@property
def contents(self) -> str:
"""
The contents of the artifact.
:return: typically file contents
:rtype: str
"""
return self._contents

@contents.setter
def contents(self, contents) -> str:
"""
Set the contents (for typically the file).
:param str contents: file contents
"""
self._contents = contents

@property
def language(self) -> str:
"""
The programming language for this artifact.
:return: programming language name
:rtype: str
"""
return self._language

@language.setter
def language(self, language) -> str:
"""
Set the programming language.
:param str language: program language
"""
self._language = language
Loading

0 comments on commit 50157d8

Please sign in to comment.