Skip to content

Commit

Permalink
feat: add ability to pass in stream offset
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosmmatos committed Oct 12, 2023
1 parent bb5cadd commit 1e25276
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 2 deletions.
4 changes: 4 additions & 0 deletions config/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
# Exclude events originating from certain cloud environments (AWS, Azure, GCP, or unrecognized)
# detections_exclude_clouds =

# Pass in the offset to start the stream from. This is useful to prevent duplicate events.
# Alternatively, use EVENTS_OFFSET env variable. (default: 0)
#offset = 0

[logging]
# Uncomment to request logging level (ERROR, WARN, INFO, DEBUG). Alternatively, use
# LOG_LEVEL env variable.
Expand Down
1 change: 1 addition & 0 deletions config/defaults.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ backends =
severity_threshold = 2
older_than_days_threshold = 21
detections_exclude_clouds =
offset = 0

[logging]
level = INFO
Expand Down
1 change: 1 addition & 0 deletions fig/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class FigConfig(configparser.SafeConfigParser):
['logging', 'level', 'LOG_LEVEL'],
['events', 'severity_threshold', 'EVENTS_SEVERITY_THRESHOLD'],
['events', 'older_than_days_threshold', 'EVENTS_OLDER_THAN_DAYS_THRESHOLD'],
['events', 'offset', 'EVENTS_OFFSET'],
['falcon', 'cloud_region', 'FALCON_CLOUD_REGION'],
['falcon', 'client_id', 'FALCON_CLIENT_ID'],
['falcon', 'client_secret', 'FALCON_CLIENT_SECRET'],
Expand Down
15 changes: 13 additions & 2 deletions fig/falcon/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,19 @@ class StreamingThread(StoppableThread):
def __init__(self, stream: Stream, queue, relevant_event_types, *args, **kwargs):
kwargs['name'] = kwargs.get('name', 'cs_stream')
super().__init__(*args, **kwargs)

# Obtain offset values from config and queue
config_offset = int(config.get('events', 'offset'))
queue_offset = queue.last_offset(stream.feed_id)

# Logic to determine what value to use for self.offset
if config_offset == 0 and queue_offset == 0:
self.offset = 0
else:
self.offset = max(config_offset, queue_offset)

self.stream = stream
self.conn = StreamingConnection(self.stream, queue.last_offset(self.stream.feed_id), relevant_event_types)
self.conn = StreamingConnection(self.stream, self.offset, relevant_event_types)
self.queue = queue
self.relevant_event_types = relevant_event_types
self.event_count = 0
Expand Down Expand Up @@ -124,7 +135,7 @@ def log_event(self, event):


class StreamingConnection():
def __init__(self, stream: Stream, last_seen_offset=0, relevant_event_types=None):
def __init__(self, stream: Stream, last_seen_offset, relevant_event_types=None):
self.stream = stream
self.connection = None
self.last_seen_offset = last_seen_offset
Expand Down

0 comments on commit 1e25276

Please sign in to comment.