diff --git a/sigma/processing/transformations.py b/sigma/processing/transformations.py index 4362da5..5156731 100644 --- a/sigma/processing/transformations.py +++ b/sigma/processing/transformations.py @@ -1,14 +1,17 @@ from abc import ABC, abstractmethod +from collections import defaultdict from functools import partial from sigma.conditions import ConditionOR, SigmaCondition from typing import ( Any, + ClassVar, Iterable, List, Dict, Literal, Optional, Set, + Tuple, Union, Pattern, Iterator, @@ -299,6 +302,153 @@ def apply_value( """ +@dataclass +class HashesFieldsDetectionItemTransformation(DetectionItemTransformation): + """ + Transforms the 'Hashes' field in Sigma rules by creating separate detection items for each hash type. + + This transformation replaces the generic 'Hashes' field with specific fields for each hash algorithm, + optionally prefixing the field names. It supports various hash formats and can auto-detect hash types + based on their length. + + Attributes: + valid_hash_algos (List[str]): List of supported hash algorithms. + field_prefix (str): Prefix to add to the new field names. + drop_algo_prefix (bool): If True, omits the algorithm name from the new field name. + hash_lengths (Dict[int, str]): Mapping of hash lengths to their corresponding algorithms. + + Example: + Input: + Hashes: + - 'SHA1=5F1CBC3D99558307BC1250D084FA968521482025' + - 'MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7' + Output: + FileSHA1: '5F1CBC3D99558307BC1250D084FA968521482025' + FileMD5: '987B65CD9B9F4E9A1AFD8F8B48CF64A7' + """ + + valid_hash_algos: List[str] + field_prefix: str = "" + drop_algo_prefix: bool = False + hash_lengths: ClassVar[Dict[int, str]] = {32: "MD5", 40: "SHA1", 64: "SHA256", 128: "SHA512"} + + def apply_detection_item( + self, detection_item: SigmaDetectionItem + ) -> Optional[Union[SigmaDetection, SigmaDetectionItem]]: + """ + Applies the transformation to a single detection item. + + Args: + detection_item (SigmaDetectionItem): The detection item to transform. + + Returns: + Optional[Union[SigmaDetection, SigmaDetectionItem]]: A new SigmaDetection object containing + the transformed detection items, or None if no valid hashes were found. + + Raises: + Exception: If no valid hash algorithms were found in the detection item. + """ + algo_dict = self._parse_hash_values(detection_item.value) + + if not algo_dict: + raise Exception( + f"No valid hash algo found in Hashes field. Please use one of the following: {', '.join(self.valid_hash_algos)}" + ) + + return self._create_new_detection_items(algo_dict) + + def _parse_hash_values( + self, values: Union[SigmaString, List[SigmaString]] + ) -> Dict[str, List[str]]: + """ + Parses the hash values from the detection item. + + Args: + values (Union[SigmaString, List[SigmaString]]): The hash values to parse. + + Returns: + Dict[str, List[str]]: A dictionary mapping field names to lists of hash values. + """ + algo_dict = defaultdict(list) + if not isinstance(values, list): + values = [values] + + for value in values: + hash_algo, hash_value = self._extract_hash_algo_and_value(value.to_plain()) + if hash_algo: + field_name = self._get_field_name(hash_algo) + algo_dict[field_name].append(hash_value) + + return algo_dict + + def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]: + """ + Extracts the hash algorithm and value from a string. + + Args: + value (str): The string containing the hash algorithm and value. + + Returns: + Tuple[str, str]: A tuple containing the hash algorithm and value. + """ + parts = value.split("|") if "|" in value else value.split("=") + if len(parts) == 2: + hash_algo, hash_value = parts + hash_algo = hash_algo.lstrip("*").upper() + else: + hash_value = parts[0] + hash_algo = self._determine_hash_algo_by_length(hash_value) + + return (hash_algo, hash_value) if hash_algo in self.valid_hash_algos else ("", hash_value) + + def _determine_hash_algo_by_length(self, hash_value: str) -> str: + """ + Determines the hash algorithm based on the length of the hash value. + + Args: + hash_value (str): The hash value to analyze. + + Returns: + str: The determined hash algorithm, or an empty string if not recognized. + """ + return self.hash_lengths.get(len(hash_value), "") + + def _get_field_name(self, hash_algo: str) -> str: + """ + Generates the field name for a given hash algorithm. + + Args: + hash_algo (str): The hash algorithm. + + Returns: + str: The generated field name. + """ + return f"{self.field_prefix}{'' if self.drop_algo_prefix else hash_algo}" + + def _create_new_detection_items(self, algo_dict: Dict[str, List[str]]) -> SigmaDetection: + """ + Creates new detection items based on the parsed hash values. + + Args: + algo_dict (Dict[str, List[str]]): A dictionary mapping field names to lists of hash values. + + Returns: + SigmaDetection: A new SigmaDetection object containing the created detection items. + """ + return SigmaDetection( + detection_items=[ + SigmaDetectionItem( + field=k if k != "keyword" else None, + modifiers=[], + value=[SigmaString(x) for x in v], + ) + for k, v in algo_dict.items() + if k + ], + item_linking=ConditionOR, + ) + + class StringValueTransformation(ValueTransformation): """ Base class for transformations that operate on SigmaString values. @@ -1052,6 +1202,7 @@ def apply( "field_name_prefix_mapping": FieldPrefixMappingTransformation, "field_name_transform": FieldFunctionTransformation, "drop_detection_item": DropDetectionItemTransformation, + "hashes_fields": HashesFieldsDetectionItemTransformation, "field_name_suffix": AddFieldnameSuffixTransformation, "field_name_prefix": AddFieldnamePrefixTransformation, "wildcard_placeholders": WildcardPlaceholderTransformation, diff --git a/tests/test_processing_transformations.py b/tests/test_processing_transformations.py index a72d359..fc0a433 100644 --- a/tests/test_processing_transformations.py +++ b/tests/test_processing_transformations.py @@ -39,6 +39,7 @@ ValueListPlaceholderTransformation, QueryExpressionPlaceholderTransformation, ReplaceStringTransformation, + HashesFieldsDetectionItemTransformation, ) from sigma.processing.pipeline import ProcessingPipeline, ProcessingItem from sigma.processing.conditions import ( @@ -1787,3 +1788,124 @@ def class_filter(c): for cls in inspect.getmembers(transformations_module, class_filter): assert cls[1] in classes_with_identifiers + + +@pytest.fixture +def hashes_transformation(): + return HashesFieldsDetectionItemTransformation( + valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"], + field_prefix="File", + drop_algo_prefix=False, + ) + + +def test_hashes_transformation_single_hash(hashes_transformation): + detection_item = SigmaDetectionItem( + "Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")] + ) + result = hashes_transformation.apply_detection_item(detection_item) + assert isinstance(result, SigmaDetection) + assert len(result.detection_items) == 1 + assert result.detection_items[0].field == "FileSHA1" + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] + + +def test_hashes_transformation_multiple_hashes(hashes_transformation): + detection_item = SigmaDetectionItem( + "Hashes", + [], + [ + SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"), + SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"), + ], + ) + result = hashes_transformation.apply_detection_item(detection_item) + assert isinstance(result, SigmaDetection) + assert len(result.detection_items) == 2 + assert result.detection_items[0].field == "FileSHA1" + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] + assert result.detection_items[1].field == "FileMD5" + assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")] + assert result.item_linking == ConditionOR + + +def test_hashes_transformation_drop_algo_prefix(): + transformation = HashesFieldsDetectionItemTransformation( + valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"], + field_prefix="File", + drop_algo_prefix=True, + ) + detection_item = SigmaDetectionItem( + "Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")] + ) + result = transformation.apply_detection_item(detection_item) + assert isinstance(result, SigmaDetection) + assert len(result.detection_items) == 1 + assert result.detection_items[0].field == "File" + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] + + +def test_hashes_transformation_invalid_hash(hashes_transformation): + detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("INVALID=123456")]) + with pytest.raises(Exception, match="No valid hash algo found"): + hashes_transformation.apply_detection_item(detection_item) + + +def test_hashes_transformation_mixed_valid_invalid(hashes_transformation): + detection_item = SigmaDetectionItem( + "Hashes", + [], + [ + SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"), + SigmaString("INVALID=123456"), + SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"), + ], + ) + result = hashes_transformation.apply_detection_item(detection_item) + assert isinstance(result, SigmaDetection) + assert len(result.detection_items) == 2 + assert result.detection_items[0].field == "FileSHA1" + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] + assert result.detection_items[1].field == "FileMD5" + assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")] + + +def test_hashes_transformation_auto_detect_hash_type(hashes_transformation): + detection_item = SigmaDetectionItem( + "Hashes", + [], + [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1 + SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5 + SigmaString("A" * 64), # SHA256 + SigmaString("B" * 128), # SHA512 + ], + ) + result = hashes_transformation.apply_detection_item(detection_item) + assert isinstance(result, SigmaDetection) + assert len(result.detection_items) == 4 + assert result.detection_items[0].field == "FileSHA1" + assert result.detection_items[1].field == "FileMD5" + assert result.detection_items[2].field == "FileSHA256" + assert result.detection_items[3].field == "FileSHA512" + + +def test_hashes_transformation_pipe_separator(hashes_transformation): + detection_item = SigmaDetectionItem( + "Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")] + ) + result = hashes_transformation.apply_detection_item(detection_item) + assert isinstance(result, SigmaDetection) + assert len(result.detection_items) == 1 + assert result.detection_items[0].field == "FileSHA1" + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ]