From 8f4ffd244ce5ddfc53de632d5ce0638c0c513c04 Mon Sep 17 00:00:00 2001 From: Andrea Fiore Date: Fri, 18 Aug 2023 15:52:19 +0200 Subject: [PATCH] DP-1710 | Introduce timeout for EmsSinkTask#put (#48) * introduce timeout for EmsSinkTask#put * add e2e test * avoid ambiguity around java TimeoutException --- .../connect/ems/config/EmsSinkConfig.scala | 8 +++ .../ems/config/EmsSinkConfigConstants.scala | 5 ++ .../connect/ems/config/EmsSinkConfigDef.scala | 7 +++ .../kafka/connect/ems/sink/EmsSinkTask.scala | 50 ++++++++++++++----- .../connect/transform/RecordTransformer.scala | 2 +- .../ems/config/EmsSinkConfigTest.scala | 2 + .../ems/sink/EmsSinkTaskObfuscationTest.scala | 2 + .../kafka/connect/ems/ErrorPolicyTests.scala | 38 ++++++++++++++ .../scalatest/fixtures/connect.scala | 16 ++++++ 9 files changed, 117 insertions(+), 13 deletions(-) diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala index db94db5e..f322f6e1 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala @@ -27,6 +27,8 @@ import org.apache.commons.validator.routines.UrlValidator import java.io.File import java.net.URL import java.nio.file.Path +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration final case class EmsSinkConfig( sinkName: String, @@ -50,6 +52,7 @@ final case class EmsSinkConfig( embedKafkaMetadata: Boolean, useInMemoryFileSystem: Boolean, allowNullsAsPks: Boolean, + sinkPutTimeout: FiniteDuration, ) object EmsSinkConfig { @@ -120,6 +123,10 @@ object EmsSinkConfig { includeEmbeddedMetadata = PropertiesHelper.getBoolean(props, EMBED_KAFKA_EMBEDDED_METADATA_KEY).getOrElse( EMBED_KAFKA_EMBEDDED_METADATA_DEFAULT, ) + sinkPutTimeout = FiniteDuration( + PropertiesHelper.getLong(props, SINK_PUT_TIMEOUT_KEY).getOrElse(SINK_PUT_TIMEOUT_DEFAULT), + TimeUnit.MILLISECONDS, + ) } yield EmsSinkConfig( sinkName, url, @@ -142,5 +149,6 @@ object EmsSinkConfig { includeEmbeddedMetadata, useInMemoryFs, allowNullsAsPks, + sinkPutTimeout, ) } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala index 9045815d..fb6349a2 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala @@ -187,4 +187,9 @@ object EmsSinkConfigConstants { "Rather than writing to the host file system, buffer parquet data files in memory" val USE_IN_MEMORY_FS_DEFAULT = false + val SINK_PUT_TIMEOUT_KEY = s"${CONNECTOR_PREFIX}.sink.put.timeout.ms" + val SINK_PUT_TIMEOUT_DOC = + "The maximum time (in milliseconds) for the connector task to complete the upload of a single Parquet file before being flagged as failed. Note: this value should always be lower than max.poll.interval.ms" + val SINK_PUT_TIMEOUT_DEFAULT = 288000L // 4.8 minutes + } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala index c1622755..b7c436ae 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala @@ -402,6 +402,13 @@ object EmsSinkConfigDef { Importance.MEDIUM, USE_IN_MEMORY_FS_DOC, ) + .define( + SINK_PUT_TIMEOUT_KEY, + Type.LONG, + SINK_PUT_TIMEOUT_DEFAULT, + Importance.HIGH, + SINK_PUT_TIMEOUT_DOC, + ) } class EmsSinkConfigDef() extends ConfigDef with LazyLogging diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala index 3cebe1e1..6f412cec 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTask.scala @@ -25,6 +25,8 @@ import com.celonis.kafka.connect.ems.config.EmsSinkConfig import com.celonis.kafka.connect.ems.errors.ErrorPolicy import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry import com.celonis.kafka.connect.ems.model._ +import com.celonis.kafka.connect.ems.sink.EmsSinkTask.PutTimeoutException +import com.celonis.kafka.connect.ems.sink.EmsSinkTask.StopTimeout import com.celonis.kafka.connect.ems.storage.EmsUploader import com.celonis.kafka.connect.ems.storage.FileSystemOperations import com.celonis.kafka.connect.ems.storage.Writer @@ -35,12 +37,30 @@ import com.typesafe.scalalogging.StrictLogging import okhttp3.OkHttpClient import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition } +import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord import org.apache.kafka.connect.sink.SinkTask import java.util +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +object EmsSinkTask { + private val StopTimeout: FiniteDuration = 5.seconds + final case class PutTimeoutException(configuredTimeout: FiniteDuration) extends Throwable { + override def getMessage = + s"The EmsSinkTask#put() operation timed out after $configuredTimeout. Please try restarting the connector task" + } + + object PutTimeoutException { + def adaptFromThrowable(configuredTimeout: FiniteDuration): PartialFunction[Throwable, PutTimeoutException] = { + case error: TimeoutException if error.getMessage.contains(configuredTimeout.toString()) => + PutTimeoutException(configuredTimeout) + } + } +} + class EmsSinkTask extends SinkTask with StrictLogging { private var writerManager: WriterManager[IO] = _ @@ -52,6 +72,7 @@ class EmsSinkTask extends SinkTask with StrictLogging { private var transformer: RecordTransformer = _ private val emsSinkConfigurator: EmsSinkConfigurator = new DefaultEmsSinkConfigurator private var okHttpClient: OkHttpClient = _ + private var emsSinkConfig: EmsSinkConfig = _ override def version(): String = Version.implementationVersion @@ -100,10 +121,11 @@ class EmsSinkTask extends SinkTask with StrictLogging { ) } - maxRetries = config.retries.retries - retriesLeft = maxRetries - errorPolicy = config.errorPolicy - transformer = RecordTransformer.fromConfig(config) + maxRetries = config.retries.retries + retriesLeft = maxRetries + errorPolicy = config.errorPolicy + transformer = RecordTransformer.fromConfig(config) + emsSinkConfig = config } override def put(records: util.Collection[SinkRecord]): Unit = { @@ -125,10 +147,14 @@ class EmsSinkTask extends SinkTask with StrictLogging { else IO(()) } yield () - io.attempt.unsafeRunSync() match { - case Left(value) => + io.timeoutAndForget(emsSinkConfig.sinkPutTimeout) + .adaptError(PutTimeoutException.adaptFromThrowable(emsSinkConfig.sinkPutTimeout)) + .attempt.unsafeRunSync() match { + case Left(error) if error.isInstanceOf[PutTimeoutException] => + throw new ConnectException(error.getMessage) + case Left(error) => retriesLeft -= 1 - errorPolicy.handle(value, retriesLeft) + errorPolicy.handle(error, retriesLeft) case Right(_) => retriesLeft = maxRetries } @@ -207,14 +233,14 @@ class EmsSinkTask extends SinkTask with StrictLogging { override def stop(): Unit = { (for { - _ <- IO(logger.debug(s"[{}] EmsSinkTask.Stop", sinkName)) + _ <- IO(logger.warn(s"[{}] EmsSinkTask.Stop", sinkName)) _ <- Option(writerManager).fold(IO(()))(_.close) - _ <- Option(okHttpClient).fold(IO(()))(c => IO(c.dispatcher().executorService().shutdown())) + _ <- Option(okHttpClient).fold(IO(()))(c => IO(c.dispatcher().executorService().shutdownNow()).void) _ <- Option(okHttpClient).fold(IO(()))(c => IO(c.connectionPool().evictAll())) - _ <- Option(okHttpClient).fold(IO(()))(c => IO(c.cache().close())) - } yield ()).attempt.unsafeRunSync() match { + + } yield ()).timeoutAndForget(StopTimeout).attempt.unsafeRunSync() match { case Left(value) => - logger.warn(s"[$sinkName]There was an error stopping the EmsSinkTask", value) + logger.warn(s"[{}] There was an error stopping the EmsSinkTask: {}", sinkName, value) case Right(_) => } writerManager = null diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala index 02e1a9aa..f3e01110 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala @@ -57,7 +57,7 @@ final class RecordTransformer( ), ) v <- IO.fromEither(DataConverter.apply(transformedValue)) - _ <- IO(logger.info("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation)) + _ <- IO(logger.debug("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation)) value <- obfuscation.fold(IO.pure(v)) { o => IO.fromEither(v.obfuscate(o).leftMap(FailedObfuscationException)) } diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala index 8a6b9e0e..0427cba7 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala @@ -40,6 +40,7 @@ import org.scalatest.matchers.should.Matchers import java.io.File import java.net.URL import java.util.UUID +import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.util.Try @@ -71,6 +72,7 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { embedKafkaMetadata = true, useInMemoryFileSystem = false, allowNullsAsPks = false, + sinkPutTimeout = FiniteDuration(288000L, TimeUnit.MILLISECONDS), ) test(s"parse the configuration from properties") { diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala index afbfb811..5066ad64 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala @@ -74,6 +74,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD embedKafkaMetadata = false, useInMemoryFileSystem = false, allowNullsAsPks = false, + sinkPutTimeout = 4.minutes, ) val config = Map( ENDPOINT_KEY -> sinkConfig.url.toString, @@ -93,6 +94,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD FALLBACK_VARCHAR_LENGTH_KEY -> sinkConfig.fallbackVarCharLengths.map(_.toString).orNull, OBFUSCATION_TYPE_KEY -> "fix", OBFUSCATED_FIELDS_KEY -> "b.x", + SINK_PUT_TIMEOUT_KEY -> 4.minutes.toMillis.toString, ) task.start(config.asJava) diff --git a/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala b/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala index 6308a264..a13b2e54 100644 --- a/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala +++ b/src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala @@ -9,6 +9,7 @@ import com.celonis.kafka.connect.ems.testcontainers.connect.EmsConnectorConfigur import com.celonis.kafka.connect.ems.testcontainers.scalatest.KafkaConnectContainerPerSuite import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnectionCut import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnector +import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withParquetUploadLatency import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.mockserver.withMockResponse import org.mockserver.verify.VerificationTimes import org.scalatest.funsuite.AnyFunSuite @@ -121,4 +122,41 @@ class ErrorPolicyTests extends AnyFunSuite with KafkaConnectContainerPerSuite wi } } } + + test("honours connect.ems.sink.put.timeout.ms") { + + val sourceTopic = randomTopicName() + val emsTable = randomEmsTable() + + val emsConnector = new EmsConnectorConfiguration("ems") + .withConfig(TOPICS_KEY, sourceTopic) + .withConfig(ENDPOINT_KEY, proxyServerUrl) + .withConfig(AUTHORIZATION_KEY, "AppKey key") + .withConfig(TARGET_TABLE_KEY, emsTable) + .withConfig(COMMIT_RECORDS_KEY, 1) + .withConfig(COMMIT_SIZE_KEY, 1000000L) + .withConfig(COMMIT_INTERVAL_KEY, 3600000) + .withConfig(TMP_DIRECTORY_KEY, "/tmp/") + .withConfig(ERROR_POLICY_KEY, "RETRY") + .withConfig(SINK_PUT_TIMEOUT_KEY, "250") + + withConnector(emsConnector) { + withParquetUploadLatency(1.second) { // trigger SINK_PUT_TIMEOUT by injecting a 1 second latency + sendDummyAvroRecord(sourceTopic) + + val consumer = new WaitingConsumer + kafkaConnectContainer.followOutput(consumer, OutputType.STDOUT) + consumer.waitUntil( + (frame: OutputFrame) => frame.getUtf8String.contains("timed out after 250"), + 30, + TimeUnit.SECONDS, + ) + } + + eventually(timeout(60 seconds)) { + val status = kafkaConnectClient.getConnectorStatus(emsConnector.name) + status.tasks.head.state should be("FAILED") + } + } + } } diff --git a/test-common/src/main/scala/com/celonis/kafka/connect/ems/testcontainers/scalatest/fixtures/connect.scala b/test-common/src/main/scala/com/celonis/kafka/connect/ems/testcontainers/scalatest/fixtures/connect.scala index 7b00ae2a..6eda12e5 100644 --- a/test-common/src/main/scala/com/celonis/kafka/connect/ems/testcontainers/scalatest/fixtures/connect.scala +++ b/test-common/src/main/scala/com/celonis/kafka/connect/ems/testcontainers/scalatest/fixtures/connect.scala @@ -21,8 +21,24 @@ import com.celonis.kafka.connect.ems.testcontainers.connect.KafkaConnectClient import eu.rekawek.toxiproxy.Proxy import eu.rekawek.toxiproxy.model.ToxicDirection +import scala.concurrent.duration.FiniteDuration + object connect { + def withParquetUploadLatency( + latency: FiniteDuration, + )(testCode: => Unit, + )( + implicit + proxy: Proxy) = { + proxy.toxics().latency("LATENCY_UPSTREAM", ToxicDirection.UPSTREAM, latency.toMillis) + try { + testCode + } finally { + proxy.toxics().get("LATENCY_UPSTREAM").remove() + } + } + def withConnectionCut( testCode: => Any, )(