From e0580d228151c986daa7ebf589f18d17ad0b8b7a Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 2 Aug 2023 11:53:50 +0200 Subject: [PATCH] Address review feedback --- .../sinks/KinesisSink.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 21c4f7957..59b995faf 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -221,8 +221,14 @@ class KinesisSink private ( ) // If Kinesis was already unhealthy, the background check is already running. // It can happen when the collector switches back and forth between Kinesis and SQS. - if (kinesisHealthy) checkKinesisHealth() - kinesisHealthy = false + if (kinesisHealthy) { + this.synchronized { + if (kinesisHealthy) { + kinesisHealthy = false + checkKinesisHealth() + } + } + } writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries) case None => log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis") @@ -240,8 +246,14 @@ class KinesisSink private ( } else { // If SQS was already unhealthy, the background check is already running. // It can happen when the collector switches back and forth between Kinesis and SQS. - if (sqsHealthy) checkSqsHealth() - sqsHealthy = false + if (sqsHealthy) { + this.synchronized { + if (sqsHealthy) { + sqsHealthy = false + checkSqsHealth() + } + } + } log.error( s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis" )