diff --git a/src/vunnel/providers/rhel/__init__.py b/src/vunnel/providers/rhel/__init__.py index 642e5f4a..a4bbc420 100644 --- a/src/vunnel/providers/rhel/__init__.py +++ b/src/vunnel/providers/rhel/__init__.py @@ -17,7 +17,7 @@ class Config: runtime: provider.RuntimeConfig = field( default_factory=lambda: provider.RuntimeConfig( result_store=result.StoreStrategy.SQLITE, - existing_results=result.ResultStatePolicy.KEEP, + existing_results=result.ResultStatePolicy.DELETE_BEFORE_WRITE, ), ) request_timeout: int = 125 diff --git a/src/vunnel/providers/rhel/parser.py b/src/vunnel/providers/rhel/parser.py index 4749ac1a..b4b85f3c 100644 --- a/src/vunnel/providers/rhel/parser.py +++ b/src/vunnel/providers/rhel/parser.py @@ -401,7 +401,7 @@ def _get_name_version(package): return name, version - def _parse_affected_release(self, cve_id, content): + def _parse_affected_release(self, cve_id: str, content) -> list[FixedIn]: fixed_ins = [] ars = content.get("affected_release", []) @@ -563,9 +563,9 @@ def _parse_affected_release(self, cve_id, content): return fixed_ins - def _parse_package_state(self, cve_id, content): - fixed_ins = [] - out_of_support = [] # Track items out of support to be able to add them if others are affected + def _parse_package_state(self, cve_id: str, fixed: list[FixedIn], content) -> list[FixedIn]: + affected: list[FixedIn] = [] + out_of_support: list[FixedIn] = [] # Track items out of support to be able to add them if others are affected pss = content.get("package_state", []) for item in pss: @@ -595,7 +595,7 @@ def _parse_package_state(self, cve_id, content): state = item.get("fix_state", None) if state in ["Affected", "Fix deferred"]: - fixed_ins.append( + affected.append( FixedIn( platform=platform, package=package_name, @@ -605,7 +605,7 @@ def _parse_package_state(self, cve_id, content): ) ) elif state in ["Will not fix"]: - fixed_ins.append( + affected.append( FixedIn( platform=platform, package=package_name, @@ -636,18 +636,24 @@ def _parse_package_state(self, cve_id, content): except: self.logger.exception(f"error parsing {cve_id} package state entity: {item}") - merged_fixed_ins = Parser._merge_out_of_support_affected(fixed_ins, out_of_support) + merged_fixed_ins = Parser._merge_out_of_support_affected(fixed, affected, out_of_support) return merged_fixed_ins @staticmethod - def _merge_out_of_support_affected(affected_fixed_ins: list[FixedIn], out_of_support: list[FixedIn]) -> list[FixedIn]: - if out_of_support and affected_fixed_ins: - merged = copy.deepcopy(affected_fixed_ins) + def _merge_out_of_support_affected( + fixed: list[FixedIn], affected: list[FixedIn], out_of_support: list[FixedIn] + ) -> list[FixedIn]: + if not out_of_support: + return affected + + if affected or fixed: + merged = copy.deepcopy(affected) + for oos in out_of_support: - for affected in affected_fixed_ins: + for r in affected + fixed: # A newer release is impacted, so assume out-of-support version is as well try: - if oos.package == affected.package and int(oos.platform) < int(affected.platform): + if oos.package == r.package and int(oos.platform) < int(r.platform): merged.append(oos) break except ValueError: @@ -655,7 +661,8 @@ def _merge_out_of_support_affected(affected_fixed_ins: list[FixedIn], out_of_sup merged.append(oos) break return merged - return affected_fixed_ins + + return affected def _parse_cve(self, cve_id, content): # logger.debug('Parsing {}'.format(cve_id)) @@ -663,7 +670,7 @@ def _parse_cve(self, cve_id, content): results = [] platform_artifacts = {} fins = self._parse_affected_release(cve_id, content) - nfins = self._parse_package_state(cve_id, content) + nfins = self._parse_package_state(cve_id, fins, content) platform_package_module_tuples = set() if fins or nfins: diff --git a/tests/quality/vulnerability-match-labels b/tests/quality/vulnerability-match-labels index 8fc653a0..7b961acc 160000 --- a/tests/quality/vulnerability-match-labels +++ b/tests/quality/vulnerability-match-labels @@ -1 +1 @@ -Subproject commit 8fc653a0b2dc6a73c71423030f5038ec28af8b43 +Subproject commit 7b961accb23b60ca0987b80fe6ee2700a6ec8aa4 diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index fd6ac759..3055898c 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -241,7 +241,7 @@ def test_config(monkeypatch) -> None: request_timeout: 125 runtime: existing_input: keep - existing_results: keep + existing_results: delete-before-write on_error: action: fail input: keep diff --git a/tests/unit/providers/rhel/test_rhel.py b/tests/unit/providers/rhel/test_rhel.py index 6042b402..62e9e6af 100644 --- a/tests/unit/providers/rhel/test_rhel.py +++ b/tests/unit/providers/rhel/test_rhel.py @@ -444,7 +444,7 @@ def test_parse_affected_releases(self, tmpdir, affected_releases, fixed_ins, moc def test_parse_package_state(self, tmpdir, mock_cve): driver = Parser(workspace=workspace.Workspace(tmpdir, "test", create=True)) - results = driver._parse_package_state(mock_cve.get("name"), mock_cve) + results = driver._parse_package_state([], mock_cve.get("name"), mock_cve) assert results and isinstance(results, list) and len(results) == 1 fixed_in = results[0] @@ -512,9 +512,10 @@ def test_get_name_version(self, package, name, version): @pytest.mark.parametrize( - "affected, out_of_support, expected", + "fixed, affected, out_of_support, expected", [ ( + [], [ FixedIn( module=None, @@ -551,6 +552,7 @@ def test_get_name_version(self, package, name, version): ], ), ( + [], [ FixedIn( module=None, @@ -580,6 +582,7 @@ def test_get_name_version(self, package, name, version): ], ), ( + [], [ FixedIn( module=None, @@ -601,6 +604,7 @@ def test_get_name_version(self, package, name, version): ], ), ( + [], [], [ FixedIn( @@ -614,6 +618,7 @@ def test_get_name_version(self, package, name, version): [], ), ( + [], [ FixedIn( module=None, @@ -642,10 +647,54 @@ def test_get_name_version(self, package, name, version): ), ], ), + ( + [ + FixedIn( + module=None, + platform="8", + package="foobar", + advisory=None, + version="1.2.3.4", + ) + ], + [], + [ + FixedIn( + module=None, + platform="6", + package="foobar", + advisory=None, + version=None, + ), + FixedIn( + module=None, + platform="7", + package="foobar", + advisory=None, + version=None, + ), + ], + [ + FixedIn( + module=None, + platform="6", + package="foobar", + advisory=None, + version=None, + ), + FixedIn( + module=None, + platform="7", + package="foobar", + advisory=None, + version=None, + ), + ], + ), ], ) -def test_out_of_support(affected, out_of_support, expected): - assert Parser._merge_out_of_support_affected(affected, out_of_support) == expected +def test_out_of_support(fixed, affected, out_of_support, expected): + assert Parser._merge_out_of_support_affected(fixed, affected, out_of_support) == expected @pytest.fixture