Skip to content

Commit

Permalink
FIX: More generic way of using different (de)-serializers
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Waldbauer <[email protected]>
  • Loading branch information
waldbauer-certat committed Jul 13, 2022
1 parent 5697faf commit d32bdb8
Show file tree
Hide file tree
Showing 21 changed files with 132 additions and 113 deletions.
2 changes: 1 addition & 1 deletion intelmq/bin/intelmqdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def main():
if queue_name in pipeline_pipes:
if runtime_config[pipeline_pipes[queue_name]]['group'] == 'Parser' and json.loads(msg)['__type'] == 'Event':
print('Event converted to Report automatically.')
msg = message.Report(message.MessageFactory.unserialize(msg)).serialize()
msg = message.Report(message.MessageFactory.deserialize(msg)).serialize()
else:
print(red("The given queue '{}' is not configured. Please retry with a valid queue.".format(queue_name)))
break
Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/collectors/amqp/collector_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def process(self):
self.logger.exception('Error receiving messages.')
else:
if self.expect_intelmq_message:
message = MessageFactory.unserialize(body.decode())
message = MessageFactory.deserialize(body.decode())
self.send_message(message, auto_add=False)
else:
report = self.new_report()
Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/parsers/json/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def process(self):
lines = [base64_decode(report['raw'])]

for line in lines:
new_event = MessageFactory.unserialize(line,
new_event = MessageFactory.deserialize(line,
harmonization=self.harmonization,
default_type='Event',
use_packer="json")
Expand Down
13 changes: 6 additions & 7 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import inspect
import io
import json
import msgpack
import logging
import os
import re
Expand Down Expand Up @@ -99,6 +98,7 @@ class Bot(object):
statistics_host: str = "127.0.0.1"
statistics_password: Optional[str] = None
statistics_port: int = 6379
pipeline_use_packer: str = os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')

_message_processed_verb: str = 'Processed'

Expand Down Expand Up @@ -322,7 +322,7 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True,
self.__disconnect_pipelines()

except exceptions.UnserializationError as exc:
self.logger.exception('Could not unserialize message from pipeline. No retries useful.')
self.logger.exception('Could not deserialize message from pipeline. No retries useful.')

# ensure that we do not re-process the faulty message
self.__error_retries_counter = self.error_max_retries + 1
Expand Down Expand Up @@ -657,7 +657,7 @@ def receive_message(self):
return self.receive_message()

try:
self.__current_message = libmessage.MessageFactory.unserialize(message,
self.__current_message = libmessage.MessageFactory.deserialize(message,
harmonization=self.harmonization)
except exceptions.InvalidKey as exc:
# In case a incoming message is malformed an does not conform with the currently
Expand Down Expand Up @@ -808,7 +808,7 @@ def __init_logger(self):

def __log_configuration_parameter(self, config_name: str, option: str, value: Any):
if "password" in option or "token" in option:
value = "HIDDEN"
value = "<redacted>"

message = "{} configuration: parameter {!r} loaded with value {!r}." \
.format(config_name.title(), option, value)
Expand Down Expand Up @@ -1369,9 +1369,8 @@ def export_event(self, event: libmessage.Event,
if 'raw' in event:
del event['raw']
if return_type is str:
return event.to_json(hierarchical=self.hierarchical,
with_type=self.with_type,
jsondict_as_string=self.jsondict_as_string)
return event.to_pack("JSON", hierarchical=self.hierarchical,
with_type=self.with_type)
else:
retval = event.to_dict(hierarchical=self.hierarchical,
with_type=self.with_type,
Expand Down
2 changes: 1 addition & 1 deletion intelmq/lib/bot_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def outputappend(self, msg):
def arg2msg(self, msg):
default_type = "Report" if (self.runtime_configuration.get("group", None) == "Parser" or isinstance(self.instance, ParserBot)) else "Event"
try:
msg = MessageFactory.unserialize(msg, default_type=default_type)
msg = MessageFactory.deserialize(msg, default_type=default_type)
except (Exception, KeyError, TypeError, ValueError) as exc:
if exists(msg):
with open(msg, "r") as f:
Expand Down
11 changes: 10 additions & 1 deletion intelmq/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,13 @@ class UnserializationError(IntelMQException, ValueError):
"""
def __init__(self, exception: Exception = None, object: bytes = None):
self.object = object
super().__init__("Could not unserialize message%s." % exception)
super().__init__("Could not deserialize message, %s." % exception)


class SerializationError(IntelMQException, ValueError):
"""
Unrecoverable error during message serialization
"""
def __init__(self, exception: Exception = None, object: bytes = None):
self.object = object
super().__init__("Could not serialize message, %s." % exception)
56 changes: 28 additions & 28 deletions intelmq/lib/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,27 @@
Use MessageFactory to get a Message object (types Report and Event).
"""
import hashlib
import importlib
import inspect
import json
import re
import warnings
from collections import defaultdict
from typing import Any, Dict, Iterable, Optional, Sequence, Union
import msgpack

import intelmq.lib.exceptions as exceptions
import intelmq.lib.harmonization
from intelmq import HARMONIZATION_CONF_FILE
from intelmq.lib import utils
from intelmq.lib.packers.packer import Packer

__all__ = ['Event', 'Message', 'MessageFactory', 'Report']
VALID_MESSSAGE_TYPES = ('Event', 'Message', 'Report')


class MessageFactory(object):
"""
unserialize: JSON encoded message to object
deserialize: JSON encoded message to object
serialize: object to JSON encoded object
"""

Expand All @@ -43,7 +45,7 @@ def from_dict(message: dict, harmonization=None,
default_type: If '__type' is not present in message, the given type will be used
See also:
MessageFactory.unserialize
MessageFactory.deserialize
MessageFactory.serialize
"""
if default_type and "__type" not in message:
Expand All @@ -59,8 +61,8 @@ def from_dict(message: dict, harmonization=None,
return class_reference(message, auto=True, harmonization=harmonization)

@staticmethod
def unserialize(raw_message: bytes, harmonization: dict = None,
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
def deserialize(raw_message: bytes, harmonization: dict = None,
default_type: Optional[str] = None, use_packer: str = "MsgPack") -> dict:
"""
Takes JSON-encoded Message object, returns instance of correct class.
Expand All @@ -73,19 +75,18 @@ def unserialize(raw_message: bytes, harmonization: dict = None,
MessageFactory.from_dict
MessageFactory.serialize
"""
message = Message.unserialize(raw_message, use_packer=use_packer)
message = Message.deserialize(raw_message, use_packer=use_packer)
return MessageFactory.from_dict(message, harmonization=harmonization,
default_type=default_type)

@staticmethod
def serialize(message) -> bytes:
def serialize(message, use_packer: str = 'MsgPack') -> bytes:
"""
Takes instance of message-derived class and makes JSON-encoded Message.
The class is saved in __type attribute.
"""
raw_message = Message.serialize(message)
return raw_message
return Message.serialize(message, use_packer=use_packer)


class Message(dict):
Expand Down Expand Up @@ -305,34 +306,33 @@ def copy(self):
return retval

def deep_copy(self):
return MessageFactory.unserialize(MessageFactory.serialize(self),
return MessageFactory.deserialize(MessageFactory.serialize(self),
harmonization={self.__class__.__name__.lower(): self.harmonization_config})

def __str__(self):
return self.serialize(use_packer="json")
return self.serialize(use_packer="JSON")

def serialize(self, use_packer: str = "msgpack"):
def serialize(self, use_packer: str = "MsgPack"):
delete_type = False
if '__type' not in self:
delete_type = True
self['__type'] = self.__class__.__name__

if use_packer == "json":
packed = json.dumps(self)
else:
packed = msgpack.packb(self)
try:
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'), inspect.isclass)[0][1]()
packed = packer.serialize(data=self)
except Exception as exc:
raise exceptions.SerializationError(exception=exc, object=self)

if delete_type:
del self['__type']
return packed

@staticmethod
def unserialize(message: bytes, use_packer: str = "msgpack"):
def deserialize(message: bytes, use_packer: str = "MsgPack"):
try:
if use_packer == "json":
return json.loads(message)
else:
return msgpack.unpackb(message, raw=False)
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
return packer.deserialize(data=message)
except Exception as exc:
raise exceptions.UnserializationError(exception=exc, object=message)

Expand Down Expand Up @@ -485,13 +485,13 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,

return new_dict

def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)

def to_msgpack(self, hierarchical=False, with_type=False):
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return msgpack.packb(msgpack_dict)
def to_pack(self, use_packer="MsgPack", hierarchical=False, with_type=False, **kwargs):
try:
packer: Packer = getattr(intelmq.lib.message, f"Packer{use_packer}")()
data = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return packer.serialize(data, **kwargs)
except Exception as exc:
raise exceptions.SerializationError(exception=exc, object=self)

def __eq__(self, other: dict) -> bool:
"""
Expand Down
Empty file added intelmq/lib/packers/__init__.py
Empty file.
Empty file.
13 changes: 13 additions & 0 deletions intelmq/lib/packers/json/packer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from intelmq.lib.packers.packer import Packer
import json


class JSON(Packer):
def __init__(self) -> None:
super().__init__()

def serialize(self, data, **kwargs) -> bytes:
return json.dumps(data, **kwargs)

def deserialize(self, data, **kwargs) -> object:
return json.loads(data, **kwargs)
1 change: 1 addition & 0 deletions intelmq/lib/packers/msgpack/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
msgpack>=0.5
Empty file.
21 changes: 21 additions & 0 deletions intelmq/lib/packers/msgpack/packer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from intelmq.lib.packers.packer import Packer
from intelmq.lib import exceptions


try:
import msgpack
except:
msgpack = None


class MsgPack(Packer):
def __init__(self) -> None:
if msgpack is None:
raise exceptions.MissingDependencyError("msgpack")
super().__init__()

def serialize(self, data, **kwargs) -> bytes:
return msgpack.packb(data, **kwargs)

def deserialize(self, data, **kwargs) -> object:
return msgpack.unpackb(data, raw=False, **kwargs)
9 changes: 9 additions & 0 deletions intelmq/lib/packers/packer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class Packer():
def __init__(self) -> None:
pass

def serialize(self, data: bytes, **kwargs) -> bytes:
raise NotImplementedError()

def deserialize(self, data: bytes, **kwargs) -> object:
raise NotImplementedError()
8 changes: 4 additions & 4 deletions intelmq/lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def setUpClass(cls):
elif cls.bot_type != 'collector' and cls.default_input_message == '':
cls.default_input_message = {'__type': 'Event'}
if type(cls.default_input_message) is dict:
cls.default_input_message = msgpack.packb(cls.default_input_message)
cls.default_input_message = message.MessageFactory.serialize(cls.default_input_message, os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))

if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \
Expand Down Expand Up @@ -321,7 +321,7 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
""" Test if report has required fields. """
if self.bot_type == 'collector':
for report_data in self.get_output_queue():
report = message.MessageFactory.unserialize(report_data,
report = message.MessageFactory.deserialize(report_data,
harmonization=self.harmonization)
self.assertIsInstance(report, message.Report)
self.assertIn('raw', report)
Expand All @@ -330,7 +330,7 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
""" Test if event has required fields. """
if self.bot_type == 'parser':
for event_data in self.get_output_queue():
event = message.MessageFactory.unserialize(event_data,
event = message.MessageFactory.deserialize(event_data,
harmonization=self.harmonization)
self.assertIsInstance(event, message.Event)
self.assertIn('classification.type', event)
Expand Down Expand Up @@ -522,7 +522,7 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d
event = self.get_output_queue(path=path)[queue_pos]
self.assertIsInstance(event, bytes)

event_dict = msgpack.unpackb(event, raw=False)
event_dict = message.MessageFactory.deserialize(event, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
if isinstance(expected_msg, (message.Event, message.Report)):
expected = expected_msg.to_dict(with_type=True)
else:
Expand Down
6 changes: 3 additions & 3 deletions intelmq/tests/bots/collectors/tcp/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_random_input(self):
thread.start()
self.run_bot()
self.assertOutputQueueLen(2)
generated_report = MessageFactory.unserialize(self.get_output_queue()[1], harmonization=self.harmonization,
generated_report = MessageFactory.deserialize(self.get_output_queue()[1], harmonization=self.harmonization,
default_type='Event')
self.assertEqual(base64_decode(generated_report['raw']), ORIGINAL_DATA.split(SEPARATOR)[1])

Expand All @@ -121,9 +121,9 @@ def test_intelmq_exchange(self):
self.assertOutputQueueLen(msg_count)

for i, msg in enumerate(self.get_output_queue()):
report = MessageFactory.unserialize(msg, harmonization=self.harmonization, default_type='Event')
report = MessageFactory.deserialize(msg, harmonization=self.harmonization, default_type='Event')

output = MessageFactory.unserialize(utils.base64_decode(report["raw"]),
output = MessageFactory.deserialize(utils.base64_decode(report["raw"]),
harmonization=self.harmonization,
default_type='Event',
use_packer="json")
Expand Down
5 changes: 3 additions & 2 deletions intelmq/tests/bots/experts/cymru_whois/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import msgpack
import os
import unittest

import intelmq.lib.test as test
from intelmq.bots.experts.cymru_whois.expert import CymruExpertBot
from work.intelmq.intelmq.lib import message

EXAMPLE_INPUT = {"__type": "Event",
"source.ip": "78.104.144.2", # example.com
Expand Down Expand Up @@ -93,7 +94,7 @@ def test_6to4_result(self):
"""
self.input_message = EXAMPLE_6TO4_INPUT
self.run_bot()
actual = msgpack.loads(self.get_output_queue()[0])
actual = message.MessageFactory.serialize(self.get_output_queue()[0], use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual)
self.assertIn("source.asn", actual)
self.assertIn("source.as_name", actual)
Expand Down
3 changes: 1 addition & 2 deletions intelmq/tests/bots/experts/idea/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# -*- coding: utf-8 -*-
import unittest
import json
import msgpack

import intelmq.lib.test as test
from intelmq.lib.message import MessageFactory
Expand Down Expand Up @@ -89,7 +88,7 @@ def test_conversion(self):
# the data from the "output" field and compare after removing ID's
event = self.get_output_queue()[0]
self.assertIsInstance(event, bytes)
event_dict = MessageFactory.unserialize(event)
event_dict = MessageFactory.deserialize(event, use_packer=os.environ.get('INTELMQ_USE_PACKER', 'MsgPack'))
self.assertIsInstance(event_dict, dict)
self.assertTrue(b"output" in event_dict)
idea_event = json.loads(event_dict["output"])
Expand Down
2 changes: 0 additions & 2 deletions intelmq/tests/bots/parsers/json/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import base64
import os
import unittest
import json
import msgpack

import intelmq.lib.test as test
from intelmq.bots.parsers.json.parser import JSONParserBot
Expand Down
Loading

0 comments on commit d32bdb8

Please sign in to comment.