diff --git a/src/eazyserver/core/kafka_connector.py b/src/eazyserver/core/kafka_connector.py index 1396dd0..e9f0477 100644 --- a/src/eazyserver/core/kafka_connector.py +++ b/src/eazyserver/core/kafka_connector.py @@ -11,16 +11,16 @@ from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import TopicPartition -def dict_to_binary(the_dict): - binary = ' '.join(format(ord(letter), 'b') for letter in the_dict) - return binary +# def dict_to_binary(the_dict): +# binary = ' '.join(format(ord(letter), 'b') for letter in the_dict) +# return binary -def binary_to_dict(the_binary): - jsn = ''.join(chr(int(x, 2)) for x in the_binary.split()) - return jsn +# def binary_to_dict(the_binary): +# jsn = ''.join(chr(int(x, 2)) for x in the_binary.split()) +# return jsn def kafka_to_dict(kafka_msg): - msg = json.loads(binary_to_dict(kafka_msg.value())) + msg = json.loads(kafka_msg.value()) kafka_msg_id = "{id}:{topic}:{partition}:{offset}".format(**{ "id":msg["_id"],"offset":kafka_msg.offset(), "partition": kafka_msg.partition(), "topic":kafka_msg.topic() }) msg["_kafka__id"]= kafka_msg_id return msg @@ -30,7 +30,7 @@ def dict_to_kafka(output,source_data): if output["source_id"] == data["_id"]: output["_kafka_source_id"] = data["_kafka__id"] break - kafka_msg = dict_to_binary(json.dumps(output)) + kafka_msg = json.dumps(output) return kafka_msg # TODO: Move/Add formatOutput to behaviour base class