Skip to content

Commit

Permalink
DP-1710 | Introduce timeout for EmsSinkTask#put (#48)
Browse files Browse the repository at this point in the history
* introduce timeout for EmsSinkTask#put
* add e2e test
* avoid ambiguity around java TimeoutException
  • Loading branch information
afiore authored Aug 18, 2023
1 parent a726a66 commit 8f4ffd2
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.commons.validator.routines.UrlValidator
import java.io.File
import java.net.URL
import java.nio.file.Path
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

final case class EmsSinkConfig(
sinkName: String,
Expand All @@ -50,6 +52,7 @@ final case class EmsSinkConfig(
embedKafkaMetadata: Boolean,
useInMemoryFileSystem: Boolean,
allowNullsAsPks: Boolean,
sinkPutTimeout: FiniteDuration,
)

object EmsSinkConfig {
Expand Down Expand Up @@ -120,6 +123,10 @@ object EmsSinkConfig {
includeEmbeddedMetadata = PropertiesHelper.getBoolean(props, EMBED_KAFKA_EMBEDDED_METADATA_KEY).getOrElse(
EMBED_KAFKA_EMBEDDED_METADATA_DEFAULT,
)
sinkPutTimeout = FiniteDuration(
PropertiesHelper.getLong(props, SINK_PUT_TIMEOUT_KEY).getOrElse(SINK_PUT_TIMEOUT_DEFAULT),
TimeUnit.MILLISECONDS,
)
} yield EmsSinkConfig(
sinkName,
url,
Expand All @@ -142,5 +149,6 @@ object EmsSinkConfig {
includeEmbeddedMetadata,
useInMemoryFs,
allowNullsAsPks,
sinkPutTimeout,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,9 @@ object EmsSinkConfigConstants {
"Rather than writing to the host file system, buffer parquet data files in memory"
val USE_IN_MEMORY_FS_DEFAULT = false

val SINK_PUT_TIMEOUT_KEY = s"${CONNECTOR_PREFIX}.sink.put.timeout.ms"
val SINK_PUT_TIMEOUT_DOC =
"The maximum time (in milliseconds) for the connector task to complete the upload of a single Parquet file before being flagged as failed. Note: this value should always be lower than max.poll.interval.ms"
val SINK_PUT_TIMEOUT_DEFAULT = 288000L // 4.8 minutes

}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,13 @@ object EmsSinkConfigDef {
Importance.MEDIUM,
USE_IN_MEMORY_FS_DOC,
)
.define(
SINK_PUT_TIMEOUT_KEY,
Type.LONG,
SINK_PUT_TIMEOUT_DEFAULT,
Importance.HIGH,
SINK_PUT_TIMEOUT_DOC,
)
}

class EmsSinkConfigDef() extends ConfigDef with LazyLogging
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import com.celonis.kafka.connect.ems.config.EmsSinkConfig
import com.celonis.kafka.connect.ems.errors.ErrorPolicy
import com.celonis.kafka.connect.ems.errors.ErrorPolicy.Retry
import com.celonis.kafka.connect.ems.model._
import com.celonis.kafka.connect.ems.sink.EmsSinkTask.PutTimeoutException
import com.celonis.kafka.connect.ems.sink.EmsSinkTask.StopTimeout
import com.celonis.kafka.connect.ems.storage.EmsUploader
import com.celonis.kafka.connect.ems.storage.FileSystemOperations
import com.celonis.kafka.connect.ems.storage.Writer
Expand All @@ -35,12 +37,30 @@ import com.typesafe.scalalogging.StrictLogging
import okhttp3.OkHttpClient
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask

import java.util
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

object EmsSinkTask {
private val StopTimeout: FiniteDuration = 5.seconds
final case class PutTimeoutException(configuredTimeout: FiniteDuration) extends Throwable {
override def getMessage =
s"The EmsSinkTask#put() operation timed out after $configuredTimeout. Please try restarting the connector task"
}

object PutTimeoutException {
def adaptFromThrowable(configuredTimeout: FiniteDuration): PartialFunction[Throwable, PutTimeoutException] = {
case error: TimeoutException if error.getMessage.contains(configuredTimeout.toString()) =>
PutTimeoutException(configuredTimeout)
}
}
}

class EmsSinkTask extends SinkTask with StrictLogging {

private var writerManager: WriterManager[IO] = _
Expand All @@ -52,6 +72,7 @@ class EmsSinkTask extends SinkTask with StrictLogging {
private var transformer: RecordTransformer = _
private val emsSinkConfigurator: EmsSinkConfigurator = new DefaultEmsSinkConfigurator
private var okHttpClient: OkHttpClient = _
private var emsSinkConfig: EmsSinkConfig = _

override def version(): String = Version.implementationVersion

Expand Down Expand Up @@ -100,10 +121,11 @@ class EmsSinkTask extends SinkTask with StrictLogging {
)
}

maxRetries = config.retries.retries
retriesLeft = maxRetries
errorPolicy = config.errorPolicy
transformer = RecordTransformer.fromConfig(config)
maxRetries = config.retries.retries
retriesLeft = maxRetries
errorPolicy = config.errorPolicy
transformer = RecordTransformer.fromConfig(config)
emsSinkConfig = config
}

override def put(records: util.Collection[SinkRecord]): Unit = {
Expand All @@ -125,10 +147,14 @@ class EmsSinkTask extends SinkTask with StrictLogging {
else IO(())
} yield ()

io.attempt.unsafeRunSync() match {
case Left(value) =>
io.timeoutAndForget(emsSinkConfig.sinkPutTimeout)
.adaptError(PutTimeoutException.adaptFromThrowable(emsSinkConfig.sinkPutTimeout))
.attempt.unsafeRunSync() match {
case Left(error) if error.isInstanceOf[PutTimeoutException] =>
throw new ConnectException(error.getMessage)
case Left(error) =>
retriesLeft -= 1
errorPolicy.handle(value, retriesLeft)
errorPolicy.handle(error, retriesLeft)
case Right(_) =>
retriesLeft = maxRetries
}
Expand Down Expand Up @@ -207,14 +233,14 @@ class EmsSinkTask extends SinkTask with StrictLogging {

override def stop(): Unit = {
(for {
_ <- IO(logger.debug(s"[{}] EmsSinkTask.Stop", sinkName))
_ <- IO(logger.warn(s"[{}] EmsSinkTask.Stop", sinkName))
_ <- Option(writerManager).fold(IO(()))(_.close)
_ <- Option(okHttpClient).fold(IO(()))(c => IO(c.dispatcher().executorService().shutdown()))
_ <- Option(okHttpClient).fold(IO(()))(c => IO(c.dispatcher().executorService().shutdownNow()).void)
_ <- Option(okHttpClient).fold(IO(()))(c => IO(c.connectionPool().evictAll()))
_ <- Option(okHttpClient).fold(IO(()))(c => IO(c.cache().close()))
} yield ()).attempt.unsafeRunSync() match {

} yield ()).timeoutAndForget(StopTimeout).attempt.unsafeRunSync() match {
case Left(value) =>
logger.warn(s"[$sinkName]There was an error stopping the EmsSinkTask", value)
logger.warn(s"[{}] There was an error stopping the EmsSinkTask: {}", sinkName, value)
case Right(_) =>
}
writerManager = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ final class RecordTransformer(
),
)
v <- IO.fromEither(DataConverter.apply(transformedValue))
_ <- IO(logger.info("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation))
_ <- IO(logger.debug("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation))
value <- obfuscation.fold(IO.pure(v)) { o =>
IO.fromEither(v.obfuscate(o).leftMap(FailedObfuscationException))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.scalatest.matchers.should.Matchers
import java.io.File
import java.net.URL
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Try
Expand Down Expand Up @@ -71,6 +72,7 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {
embedKafkaMetadata = true,
useInMemoryFileSystem = false,
allowNullsAsPks = false,
sinkPutTimeout = FiniteDuration(288000L, TimeUnit.MILLISECONDS),
)

test(s"parse the configuration from properties") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD
embedKafkaMetadata = false,
useInMemoryFileSystem = false,
allowNullsAsPks = false,
sinkPutTimeout = 4.minutes,
)
val config = Map(
ENDPOINT_KEY -> sinkConfig.url.toString,
Expand All @@ -93,6 +94,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD
FALLBACK_VARCHAR_LENGTH_KEY -> sinkConfig.fallbackVarCharLengths.map(_.toString).orNull,
OBFUSCATION_TYPE_KEY -> "fix",
OBFUSCATED_FIELDS_KEY -> "b.x",
SINK_PUT_TIMEOUT_KEY -> 4.minutes.toMillis.toString,
)
task.start(config.asJava)

Expand Down
38 changes: 38 additions & 0 deletions src/e2e/scala/com/celonis/kafka/connect/ems/ErrorPolicyTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.celonis.kafka.connect.ems.testcontainers.connect.EmsConnectorConfigur
import com.celonis.kafka.connect.ems.testcontainers.scalatest.KafkaConnectContainerPerSuite
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnectionCut
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withConnector
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.connect.withParquetUploadLatency
import com.celonis.kafka.connect.ems.testcontainers.scalatest.fixtures.mockserver.withMockResponse
import org.mockserver.verify.VerificationTimes
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -121,4 +122,41 @@ class ErrorPolicyTests extends AnyFunSuite with KafkaConnectContainerPerSuite wi
}
}
}

test("honours connect.ems.sink.put.timeout.ms") {

val sourceTopic = randomTopicName()
val emsTable = randomEmsTable()

val emsConnector = new EmsConnectorConfiguration("ems")
.withConfig(TOPICS_KEY, sourceTopic)
.withConfig(ENDPOINT_KEY, proxyServerUrl)
.withConfig(AUTHORIZATION_KEY, "AppKey key")
.withConfig(TARGET_TABLE_KEY, emsTable)
.withConfig(COMMIT_RECORDS_KEY, 1)
.withConfig(COMMIT_SIZE_KEY, 1000000L)
.withConfig(COMMIT_INTERVAL_KEY, 3600000)
.withConfig(TMP_DIRECTORY_KEY, "/tmp/")
.withConfig(ERROR_POLICY_KEY, "RETRY")
.withConfig(SINK_PUT_TIMEOUT_KEY, "250")

withConnector(emsConnector) {
withParquetUploadLatency(1.second) { // trigger SINK_PUT_TIMEOUT by injecting a 1 second latency
sendDummyAvroRecord(sourceTopic)

val consumer = new WaitingConsumer
kafkaConnectContainer.followOutput(consumer, OutputType.STDOUT)
consumer.waitUntil(
(frame: OutputFrame) => frame.getUtf8String.contains("timed out after 250"),
30,
TimeUnit.SECONDS,
)
}

eventually(timeout(60 seconds)) {
val status = kafkaConnectClient.getConnectorStatus(emsConnector.name)
status.tasks.head.state should be("FAILED")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,24 @@ import com.celonis.kafka.connect.ems.testcontainers.connect.KafkaConnectClient
import eu.rekawek.toxiproxy.Proxy
import eu.rekawek.toxiproxy.model.ToxicDirection

import scala.concurrent.duration.FiniteDuration

object connect {

def withParquetUploadLatency(
latency: FiniteDuration,
)(testCode: => Unit,
)(
implicit
proxy: Proxy) = {
proxy.toxics().latency("LATENCY_UPSTREAM", ToxicDirection.UPSTREAM, latency.toMillis)
try {
testCode
} finally {
proxy.toxics().get("LATENCY_UPSTREAM").remove()
}
}

def withConnectionCut(
testCode: => Any,
)(
Expand Down

0 comments on commit 8f4ffd2

Please sign in to comment.