From ad77023e73126d36d3eaffde08a28f791c2790e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Martini?= Date: Tue, 10 Oct 2023 11:50:24 +0200 Subject: [PATCH] Bump netty to remove vulnerabilities (#56) * Bump netty to remove vulnerabilities * Add validation to fallback varchar length config --- .../connect/ems/config/EmsSinkConfig.scala | 2 ++ .../ems/config/EmsSinkConfigConstants.scala | 3 ++- .../ems/sink/EmsSinkConfiguratorTest.scala | 21 +++++++++++++++++++ project/Dependencies.scala | 2 +- 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala index f322f6e1..3d029eef 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfig.scala @@ -68,6 +68,8 @@ object EmsSinkConfig { PropertiesHelper.getInt(props, FALLBACK_VARCHAR_LENGTH_KEY) match { case Some(value) => if (value <= 0) error(FALLBACK_VARCHAR_LENGTH_KEY, FALLBACK_VARCHAR_LENGTH_DOC) + else if (value > FALLBACK_VARCHAR_LENGTH_MAX) + error(FALLBACK_VARCHAR_LENGTH_KEY, FALLBACK_VARCHAR_LENGTH_DOC) else value.some.asRight[String] case None => None.asRight } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala index f10e97f9..ea017308 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala @@ -87,8 +87,9 @@ object EmsSinkConfigConstants { val FALLBACK_VARCHAR_LENGTH_KEY = s"$CONNECTOR_PREFIX.data.fallback.varchar.length" val FALLBACK_VARCHAR_LENGTH_DOC = - "Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS" + "Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS. Must be greater than 0 and smaller or equal than 65000" val FALLBACK_VARCHAR_LENGTH_DEFAULT: Integer = null + val FALLBACK_VARCHAR_LENGTH_MAX: Integer = 65000 val DEBUG_KEEP_TMP_FILES_KEY: String = s"$CONNECTOR_PREFIX.debug.keep.parquet.files" val DEBUG_KEEP_TMP_FILES_DOC: String = diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkConfiguratorTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkConfiguratorTest.scala index f1613dc6..40a655df 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkConfiguratorTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkConfiguratorTest.scala @@ -18,6 +18,7 @@ package com.celonis.kafka.connect.ems.sink import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.COMMIT_RECORDS_KEY import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.COMMIT_SIZE_KEY +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.FALLBACK_VARCHAR_LENGTH_MAX import org.apache.kafka.connect.errors.ConnectException import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -74,4 +75,24 @@ class EmsSinkConfiguratorTest extends AnyFunSuite with Matchers { val thrown = the[ConnectException] thrownBy emsSinkConfigurator.getEmsSinkConfig(props) thrown.getMessage should include regex "^.*Uploading the data to EMS requires a record count greater than 0.*$" } + + test(s"throws exception when $FALLBACK_VARCHAR_LENGTH_MAX is too big") { + val props = Map( + "name" -> "ems", + EmsSinkConfigConstants.ENDPOINT_KEY -> "https://celonis.cloud", + EmsSinkConfigConstants.AUTHORIZATION_KEY -> "AppKey key", + EmsSinkConfigConstants.TARGET_TABLE_KEY -> "target-table", + EmsSinkConfigConstants.COMMIT_RECORDS_KEY -> "1", + EmsSinkConfigConstants.COMMIT_SIZE_KEY -> "1000000", + EmsSinkConfigConstants.COMMIT_INTERVAL_KEY -> "3600000", + EmsSinkConfigConstants.TMP_DIRECTORY_KEY -> "/tmp/", + EmsSinkConfigConstants.ERROR_POLICY_KEY -> "CONTINUE", + EmsSinkConfigConstants.FALLBACK_VARCHAR_LENGTH_KEY -> "65001", + ).asJava + + val thrown = the[ConnectException] thrownBy emsSinkConfigurator.getEmsSinkConfig(props) + thrown.getMessage should include regex "^.*Must be greater than 0 and smaller or equal than 65000.*$" + } } + + diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dbcd3c9b..28d69a1d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -71,7 +71,7 @@ object Dependencies { val hadoopVersion = "3.3.4" val woodstockVersion = "5.4.0" - val nettyVersion = "4.1.89.Final" + val nettyVersion = "4.1.97.Final" val nimbusJoseJwtVersion = "9.22"