Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluator #43

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions blacklistfilter/evaluator/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
import csv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing licence and author

import json
from queue import Queue
import os
import sys
import signal
import time
from threading import Thread, Event
import requests
from datetime import datetime
import pytrap
import shutil
from optparse import OptionParser
from collections import defaultdict

#Enter an API key to obtain informations from nerd
NERD_API_KEY = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be loaded from configuration (see https://github.com/CESNET/Nemea-Detectors/tree/master/blacklistfilter/blacklist_downloader and @BLACKLISTFILTERDIR@ as an example) . It is not a best practice to modify installed scripts.


#Constant defined in Adaptive Filter
ADAPTIVE_BLACKLIST_ID = 999

parser = OptionParser(add_help_option=True)
parser.add_option("-i", "--ifcspec", dest="ifcspec",
help="TRAP IFC specifier", metavar="IFCSPEC")
parser.add_option("-c", "--csv-path", dest="csv", help="Path to the csv files generated by Split Evidence")
parser.add_option("-e","--evidence-path", dest="evidence", help="Path to the folder for storing data of unreported detections")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the config file is passed using a new option, you need not to use configure to fill the correct path as it is in blacklist_downloader.


#Signal handler
def SignalHandler(signal, frame):
global signal_recieved, trap_ctx
if signal_recieved:
print('Caught another SIGINT, exiting forcefully..')
os._exit(1)
trap_ctx.terminate()
signal_recieved = True

class Alert:
def __init__(self, client, nerd_info):
self.ip = client.ip_addr;
self.statistics = client.statistics
self.nerd_info = nerd_info

class MonitoredClient:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment: doc is missing... classes and methods should be documented.

def __init__(self, ip_addr, detection):
self.ip_addr = ip_addr
self.detection = detection
self.ports = set()
self.duration_in = 0.0
self.duration_out = 0.0
self.start = datetime.now()
self.end = datetime.utcfromtimestamp(0)
self.ips_contacted = set()
self.statistics = defaultdict(int)

def AddFlow(self, flow, src):
first = datetime.strptime(flow["time TIME_FIRST"], '%Y-%m-%dT%H:%M:%S.%f')
last = datetime.strptime(flow["time TIME_LAST"], '%Y-%m-%dT%H:%M:%S.%f')

if first < self.start:
self.start = first
if last > self.end:
self.end = last

if src:
self.duration_out += (last - first).total_seconds();
self.statistics["bytes_sent"] += int(flow["uint64 BYTES"])
self.statistics["packets_sent"] += int(flow["uint32 PACKETS"])
self.statistics["flows_sent"] += 1
self.ips_contacted.add(flow["ipaddr DST_IP"])
self.ports.add(flow["uint16 DST_PORT"])
else:
self.duration_in += (last - first).total_seconds();
self.statistics["bytes_recv"] += int(flow["uint64 BYTES"])
self.statistics["packets_recv"] += int(flow["uint32 PACKETS"])
self.statistics["flows_recv"] += 1

#Determine other statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean Determine other statistics? Do you mean "compute statistics for ... client? + Doc should be written according to https://www.python.org/dev/peps/pep-0257/

def process(self):
self.statistics['floats_per_sec_recv'] = self.statistics["flows_recv"] / (self.end - self.start).total_seconds() if (self.end - self.start).total_seconds() else 0.0
self.statistics['floats_per_sec_sent'] = self.statistics["flows_sent"] / (self.end - self.start).total_seconds() if (self.end - self.start).total_seconds() else 0.0
self.statistics['pkts_per_sec_recv'] = self.statistics["packets_recv"] / self.duration_in if self.duration_in else 0.0
self.statistics['pkts_per_sec_sent'] = self.statistics["packets_sent"] / self.duration_out if self.duration_out else 0.0
self.statistics['bytes_per_sec_recv'] = self.statistics["bytes_recv"] / self.duration_in if self.duration_in else 0.0
self.statistics['bytes_per_sec_sent'] = self.statistics["bytes_sent"] / self.duration_out if self.duration_out else 0.0
self.statistics['bytes_per_pkt_recv'] = self.statistics["bytes_recv"] / self.statistics["packets_recv"] if self.statistics["packets_recv"] else 0.0
self.statistics['bytes_per_pkt_sent'] = self.statistics["bytes_sent"] / self.statistics["packets_sent"] if self.statistics["packets_sent"] else 0.0
self.statistics['ips_contacted'] = len(self.ips_contacted)
self.statistics['ports'] = len(self.ports)

#Main class managing threads
class Controller:
def __init__(self, receiver_cnt = 1, worker_cnt = 2, sender_cnt = 1):
self.reciever_cnt = receiver_cnt
self.worker_cnt = worker_cnt
self.recv_queue = Queue()
self.send_queue = Queue()
self.receivers = []
self.workers = []
self.sender = Sender(self.send_queue)

for i in range(receiver_cnt):
self.receivers.append(Receiver(0, self.recv_queue))

for i in range(worker_cnt):
self.workers.append(Worker(self.recv_queue, self.send_queue))

def Start(self):
print("Starting threads...")

for r in self.receivers:
r.start()

for w in self.workers:
w.start()

self.sender.start()

while not signal_recieved:
time.sleep(2)

def Join(self):
print("Joining threads...")
for r in self.receivers:
r.join()

for i in range(len(self.workers)):
self.recv_queue.put(None)

for w in self.workers:
w.join()

self.send_queue.put(None)
self.sender.join()

#Sending thread
class Sender(Thread):
def __init__(self, queue):
self.queue = queue
Thread.__init__(self, name = "SendingThread")

#Request IP information from NERD
def nerd_query(self, api_key, ip_addr):
if api_key == None:
return None
base_url = 'https://nerd.cesnet.cz/nerd/api/v1'
headers = {
"Authorization": api_key
}
try:
req = requests.get(base_url + '/ip/{}/full'.format(ip_addr), headers=headers, timeout=3)
if req.status_code == 200:
return json.loads(req.text)
except requests.RequestException as e:
print('Failed to get info from NERD\n', e)
return None

#Send report to the output interface
def SendReport(self, client):
nerd_record = self.nerd_query(NERD_API_KEY, client.ip_addr)
alert = Alert(client, nerd_record)
trap_ctx.send(bytearray(json.dumps(alert.__dict__), "utf-8"))

#Move files related to unreported events for later use
def MoveToEvidence(self, client ):
os.mkdir(options.evidence + "/" + client["detection"]["id"])
shutil.move(options.csv+"/"+id+".csv", options.evidence + "/" + client["detection"]["id"] + "/" + client["ip_addr"].replace(".","_") + ".csv")

def run(self):
while True:
clients = self.queue.get()

for key in clients:
if (int(clients[key].statistics['bytes_sent']) == 0 or int(clients[key].statistics['flows_sent']) == 1):
try:
os.remove(options.csv+"/"+clients[key].detection["id"]+".csv")
except FileNotFoundError:
continue
continue
thresholds_reached = 0;
clients[key].process()
if (float(clients[key].statistics['bytes_per_pkt_sent']) > 187.1207):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constants should be loaded from configuration file as well.

thresholds_reached += 1
if (float(clients[key].statistics['bytes_per_pkt_recv']) > 227.5888):
thresholds_reached += 1
if (float(clients[key].statistics['bytes_recv']) / float(clients[key].statistics['bytes_sent']) < 2.5198):
thresholds_reached += 1
if (float(clients[key].statistics['packets_recv']) / float(clients[key].statistics['packets_sent']) < 1.4844):
thresholds_reached += 1
if (float(clients[key].statistics['flows_recv']) / float(clients[key].statistics['flows_sent']) < 1.9401):
thresholds_reached += 1
if (float(clients[key].statistics['flows_recv']) != 0):
if (float(clients[key].statistics['packets_recv']) / float(clients[key].statistics['flows_recv']) > 5.0973):
thresholds_reached += 1
else:
thresholds_reached += 1
if (float(clients[key].statistics['pps_recv']) < 3.7455):
thresholds_reached += 1
if (float(clients[key].statistics['pps_sent']) < 227.5888):
thresholds_reached += 1
if( float(cleints[key].statistics['ips_contacted'] > 124.7984
thresholds_reached += 1
if (thresholds_reached > 4):
self.SendReport(clients[key])
else:
self.MoveToEvidence(clients[key])



class Worker(Thread):

def __init__(self, recv_queue, send_queue):
Thread.__init__(self, name = "ConsumerThread" )

self.recv_queue = recv_queue
self.send_queue = send_queue

def join(self):
Thread.join(self, None)

def run(self):
while True:
detection = self.recv_queue.get()
if detection == None:
self.send_queue.put(None)
break;

id = detection["id"]

if not os.path.isfile(options.csv+"/"+id+".csv"):
print("Could not locate the file related to the event {}".format(id))
continue

clients = dict()

for event in detection["grouped_events"]:
for target in event["targets"]:
clients[target] = MonitoredClient(target, detection)

with open(options.csv+"/"+id+".csv", 'r') as f:
reader = csv.DictReader(f)
for line in reader:
flow = dict(line)
try:
if (int(flow["uint16 SRC_PORT"]) == 53 or int(flow["uint16 DST_PORT"]) == 53):
continue
except ValueError:
continue
if int(flow["uint64 SRC_BLACKLIST"]) == ADAPTIVE_BLACKLIST_ID:
try:
clients[flow["ipaddr SRC_IP"]].AddFlow(flow, True)
except KeyError:
clients[flow["ipaddr SRC_IP"]] = MonitoredClient(flow["ipaddr SRC_IP"],detection)
clients[flow["ipaddr SRC_IP"]].AddFlow(flow, True)
else:
try:
clients[flow["ipaddr DST_IP"]].AddFlow(flow, False)
except KeyError:
clients[flow["ipaddr DST_IP"]] = MonitoredClient(flow["ipaddr DST_IP"],detection)
clients[flow["ipaddr DST_IP"]].AddFlow(flow, False)

self.send_queue.put(clients.copy())
self.recv_queue.task_done()

class Receiver(Thread):
def __init__(self, ifc_in, queue):
self.ifc_in = ifc_in
trap_ctx.setRequiredFmt(self.ifc_in, pytrap.FMT_JSON, "blacklist_evidence")
self.queue = queue

self.stop = Event()

Thread.__init__(self, name = "ReceiverThread" )

def join(self):
print(("{}: Joining...").format(self.name))
self.stop.set()
Thread.join(self, None)

def run(self):
while not self.stop.is_set():
try:
data = trap_ctx.recv(0)
except pytrap.FormatMismatch:
print("Pytrap Format Mismatch")
break
except pytrap.TrapError:
print("Pytrap Error")
break
except pytrap.Terminated:
print("TRAP terminated")
break
if len(data) <= 1:
break

recv = json.loads(data.decode())
if recv["event_type"] != "BotnetDetection":
continue

self.queue.put(recv)

signal_recieved = False

if __name__ == '__main__':
options, args = parser.parse_args()
options.evidence.rstrip('/')
options.csv.rstrip('/')
signal.signal(signal.SIGINT, SignalHandler)

trap_ctx = pytrap.TrapCtx()
trap_ctx.init(sys.argv, 1, 1)

controller = Controller()
controller.Start()
controller.Join()

Loading