Skip to content

Commit

Permalink
Bump netty to remove vulnerabilities (#56)
Browse files Browse the repository at this point in the history
* Bump netty to remove vulnerabilities

* Add validation to fallback varchar length config
  • Loading branch information
nicmart authored Oct 10, 2023
1 parent 8e7ec1a commit ad77023
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ object EmsSinkConfig {
PropertiesHelper.getInt(props, FALLBACK_VARCHAR_LENGTH_KEY) match {
case Some(value) =>
if (value <= 0) error(FALLBACK_VARCHAR_LENGTH_KEY, FALLBACK_VARCHAR_LENGTH_DOC)
else if (value > FALLBACK_VARCHAR_LENGTH_MAX)
error(FALLBACK_VARCHAR_LENGTH_KEY, FALLBACK_VARCHAR_LENGTH_DOC)
else value.some.asRight[String]
case None => None.asRight
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ object EmsSinkConfigConstants {

val FALLBACK_VARCHAR_LENGTH_KEY = s"$CONNECTOR_PREFIX.data.fallback.varchar.length"
val FALLBACK_VARCHAR_LENGTH_DOC =
"Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS"
"Optional parameter representing the STRING (VARCHAR) length when the schema is created in EMS. Must be greater than 0 and smaller or equal than 65000"
val FALLBACK_VARCHAR_LENGTH_DEFAULT: Integer = null
val FALLBACK_VARCHAR_LENGTH_MAX: Integer = 65000

val DEBUG_KEEP_TMP_FILES_KEY: String = s"$CONNECTOR_PREFIX.debug.keep.parquet.files"
val DEBUG_KEEP_TMP_FILES_DOC: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.celonis.kafka.connect.ems.sink
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.COMMIT_RECORDS_KEY
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.COMMIT_SIZE_KEY
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.FALLBACK_VARCHAR_LENGTH_MAX
import org.apache.kafka.connect.errors.ConnectException
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -74,4 +75,24 @@ class EmsSinkConfiguratorTest extends AnyFunSuite with Matchers {
val thrown = the[ConnectException] thrownBy emsSinkConfigurator.getEmsSinkConfig(props)
thrown.getMessage should include regex "^.*Uploading the data to EMS requires a record count greater than 0.*$"
}

test(s"throws exception when $FALLBACK_VARCHAR_LENGTH_MAX is too big") {
val props = Map(
"name" -> "ems",
EmsSinkConfigConstants.ENDPOINT_KEY -> "https://celonis.cloud",
EmsSinkConfigConstants.AUTHORIZATION_KEY -> "AppKey key",
EmsSinkConfigConstants.TARGET_TABLE_KEY -> "target-table",
EmsSinkConfigConstants.COMMIT_RECORDS_KEY -> "1",
EmsSinkConfigConstants.COMMIT_SIZE_KEY -> "1000000",
EmsSinkConfigConstants.COMMIT_INTERVAL_KEY -> "3600000",
EmsSinkConfigConstants.TMP_DIRECTORY_KEY -> "/tmp/",
EmsSinkConfigConstants.ERROR_POLICY_KEY -> "CONTINUE",
EmsSinkConfigConstants.FALLBACK_VARCHAR_LENGTH_KEY -> "65001",
).asJava

val thrown = the[ConnectException] thrownBy emsSinkConfigurator.getEmsSinkConfig(props)
thrown.getMessage should include regex "^.*Must be greater than 0 and smaller or equal than 65000.*$"
}
}


2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Dependencies {
val hadoopVersion = "3.3.4"
val woodstockVersion = "5.4.0"

val nettyVersion = "4.1.89.Final"
val nettyVersion = "4.1.97.Final"

val nimbusJoseJwtVersion = "9.22"

Expand Down

0 comments on commit ad77023

Please sign in to comment.