Skip to content

Commit

Permalink
Fixed issue where the CloudWatch Logstream was recreated on each call…
Browse files Browse the repository at this point in the history
… to CloudWatchHandler.flush()
  • Loading branch information
Scott Collins committed Nov 13, 2024
1 parent 81d75a6 commit 0418f9d
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions src/pds/ingress/util/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def __init__(self, log_group_name, api_gateway_config, capacity=1024):
self.creation_time = datetime.now().strftime("%s")
self._bearer_token = None
self._node_id = None
self._stream_created = False

@property
def bearer_token(self):
Expand Down Expand Up @@ -340,24 +341,29 @@ def send_log_events_to_cloud_watch(self, log_events):
api_gateway_region = self.api_gateway_config["region"]
api_gateway_stage = self.api_gateway_config["stage"]

# Create the log stream for the current run.
# If the stream already exists, attempting to recreate should not raise an error.
log_stream_name = f"pds-ingress-client-{self.node_id}-{self.creation_time}"
api_gateway_resource = "createstream"

api_gateway_url = api_gateway_template.format(
id=api_gateway_id, region=api_gateway_region, stage=api_gateway_stage, resource=api_gateway_resource
)
payload = {"logGroupName": self.log_group_name, "logStreamName": log_stream_name}
headers = {
"Authorization": self.bearer_token,
"UserGroup": NodeUtil.node_id_to_group_name(self.node_id),
"content-type": "application/json",
"x-amz-docs-region": api_gateway_region,
}

response = requests.post(api_gateway_url, data=json.dumps(payload), headers=headers)
response.raise_for_status()
# Create the log stream for the current run, if it hasn't been already.
if not self._stream_created:
api_gateway_resource = "createstream"

api_gateway_url = api_gateway_template.format(
id=api_gateway_id, region=api_gateway_region, stage=api_gateway_stage, resource=api_gateway_resource
)

headers = {
"Authorization": self.bearer_token,
"UserGroup": NodeUtil.node_id_to_group_name(self.node_id),
"content-type": "application/json",
"x-amz-docs-region": api_gateway_region,
}

response = requests.post(api_gateway_url, data=json.dumps(payload), headers=headers)
response.raise_for_status()

# Can now skip this step for subsequent calls to flush()
self._stream_created = True

# Now submit logged content to the newly created log stream
api_gateway_resource = "log"
Expand Down

0 comments on commit 0418f9d

Please sign in to comment.