diff --git a/build.sbt b/build.sbt index 2400fda..7e48e55 100644 --- a/build.sbt +++ b/build.sbt @@ -26,7 +26,6 @@ lazy val root = (project in file(".")) "org.scalatest" %% "scalatest" % "3.2.19", "com.typesafe" % "config" % "1.4.3", "io.gatling" % "gatling-jsonpath" % "3.11.5", - "com.lihaoyi" %% "requests" % "0.9.0", "com.lihaoyi" %% "os-lib" % "0.10.3", "ch.qos.logback" % "logback-classic" % "1.5.6" % Runtime, "dev.zio" %% "zio" % zioVersion, diff --git a/docker-compose.yml b/docker-compose.yml index 6fab859..4fe79d3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -77,7 +77,7 @@ services: environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092' CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' - CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://primary-ksqldb-server:8088" + CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO @@ -87,10 +87,10 @@ services: CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 - primary-ksqldb-server: + ksqldb-server: image: confluentinc/cp-ksqldb-server:${CONFLUENT_KAFKA_VERSION} - hostname: primary-ksqldb-server - container_name: primary-ksqldb-server + hostname: ksqldb-server + container_name: ksqldb-server depends_on: - kafka - schema-registry @@ -107,8 +107,6 @@ services: KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" create-topics: - profiles: - - test image: confluentinc/cp-enterprise-kafka:${CONFLUENT_KAFKA_VERSION} volumes: - ./src/test/resources/scripts:/opt/kapoeira/scripts @@ -126,8 +124,6 @@ services: echo Creating subjects... && \ ./createSchemas.sh'" create-streams: - profiles: - - test image: confluentinc/cp-ksqldb-cli:${CONFLUENT_KAFKA_VERSION} container_name: create-streams volumes: @@ -142,7 +138,7 @@ services: - -f - /opt/kapoeira/scripts/createStreams.ksql - -- - - http://primary-ksqldb-server:8088 + - http://ksqldb-server:8088 kapoeira: profiles: - test diff --git a/src/main/scala/com/lectra/kapoeira/domain/RecordRead.scala b/src/main/scala/com/lectra/kapoeira/domain/RecordRead.scala index 38d821b..7ae6990 100644 --- a/src/main/scala/com/lectra/kapoeira/domain/RecordRead.scala +++ b/src/main/scala/com/lectra/kapoeira/domain/RecordRead.scala @@ -21,7 +21,6 @@ package com.lectra.kapoeira.domain import com.lectra.kapoeira.domain import com.lectra.kapoeira.domain.Services.ReadHeaders -import java.nio.charset.StandardCharsets import scala.util.Try /** The record read from the cucumber Datatable directly, or a File. @@ -29,7 +28,7 @@ import scala.util.Try final case class RecordRead( topicAlias: String, key: String, - value: Array[Byte], + value: String, headers: Map[String, Any] ) @@ -54,11 +53,11 @@ trait RecordData[T] { /** Facilitate import of all implicits. */ trait RecordReadImplicits - extends InterpotaleImplicits + extends InterpolateImplicits with RecordDataImplicits with RecordDataFromFileImplicits -trait InterpotaleImplicits { +trait InterpolateImplicits { implicit val interpolateString: Interpolate[String] = (t: String, ctx: BackgroundContext) => ctx.substituteVariablesIn(t) @@ -70,8 +69,7 @@ trait InterpotaleImplicits { ctx .substituteVariablesIn(t.key), ctx - .substituteVariablesIn(new String(t.value)) - .getBytes(StandardCharsets.UTF_8), + .substituteVariablesIn(t.value), interpolateMap(t.headers, ctx) ) } @@ -123,7 +121,7 @@ trait RecordDataFromFileImplicits { domain.RecordRead( t.topicAlias, columns(0), - columns(1).getBytes(StandardCharsets.UTF_8), + columns(1), Try(columns(2)) .map(headersString => readHeaders.readHeaders(headersString)) .getOrElse(Map.empty) @@ -141,7 +139,7 @@ trait RecordDataFromFileImplicits { RecordRead( t.topicAlias, t.key, - line.getBytes(StandardCharsets.UTF_8), + line, Map.empty ) }) @@ -159,7 +157,7 @@ trait RecordDataFromFileImplicits { RecordRead( t.topicAlias, t.key, - line.getBytes(StandardCharsets.UTF_8), + line, Map.empty ) ) @@ -179,7 +177,7 @@ trait RecordDataImplicits { override def read(t: KeyValueRecord): RecordRead = RecordRead( t.topicAlias, t.key, - t.value.getBytes(StandardCharsets.UTF_8), + t.value, t.headers ) } diff --git a/src/main/scala/com/lectra/kapoeira/glue/package.scala b/src/main/scala/com/lectra/kapoeira/glue/package.scala index 63d30ef..05b6a29 100644 --- a/src/main/scala/com/lectra/kapoeira/glue/package.scala +++ b/src/main/scala/com/lectra/kapoeira/glue/package.scala @@ -25,6 +25,7 @@ import com.lectra.kapoeira.domain._ import com.lectra.kapoeira.kafka.KapoeiraConsumer._ import com.lectra.kapoeira.kafka.{KapoeiraAdmin, KapoeiraConsumer} import com.typesafe.scalalogging.LazyLogging +import io.confluent.kafka.schemaregistry.json.jackson.Jackson import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition @@ -123,7 +124,7 @@ package object glue extends LazyLogging with RecordReadImplicits { } } - private[glue] val objectMapper = new ObjectMapper() + private[glue] val objectMapper = Jackson.newObjectMapper(); implicit class RecordReadOps(recordRead: RecordRead) { def jsonHeaders: Try[Map[String, Array[Byte]]] = Try( diff --git a/src/main/scala/com/lectra/kapoeira/kafka/DataType.scala b/src/main/scala/com/lectra/kapoeira/kafka/DataType.scala index 3fc698b..313f164 100644 --- a/src/main/scala/com/lectra/kapoeira/kafka/DataType.scala +++ b/src/main/scala/com/lectra/kapoeira/kafka/DataType.scala @@ -18,34 +18,26 @@ */ package com.lectra.kapoeira.kafka import com.fasterxml.jackson.databind.JsonNode -import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer} -import io.confluent.kafka.serializers.json.{KafkaJsonSchemaDeserializer, KafkaJsonSchemaSerializer} +import io.confluent.kafka.serializers.KafkaAvroDeserializer +import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer import org.apache.avro.generic.GenericData -import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.kafka.common.serialization.StringDeserializer sealed trait DataType[A] { type DeserializerT - type SerializerT val classDeserializer: Class[DeserializerT] - val classSerializer: Class[SerializerT] } trait AvroType[T] extends DataType[T] { type DeserializerT = KafkaAvroDeserializer - type SerializerT = KafkaAvroSerializer val classDeserializer: Class[KafkaAvroDeserializer] = classOf[KafkaAvroDeserializer] - val classSerializer: Class[KafkaAvroSerializer] = classOf[KafkaAvroSerializer] } case object StringType extends DataType[String] { type DeserializerT = StringDeserializer - type SerializerT = StringSerializer val classDeserializer: Class[StringDeserializer] = classOf[StringDeserializer] - val classSerializer: Class[StringSerializer] = classOf[StringSerializer] } case object JsonType extends DataType[JsonNode] { type DeserializerT = KafkaJsonSchemaDeserializer[JsonNode] - type SerializerT = KafkaJsonSchemaSerializer[JsonNode] val classDeserializer: Class[KafkaJsonSchemaDeserializer[JsonNode]] = classOf[KafkaJsonSchemaDeserializer[JsonNode]] - val classSerializer: Class[KafkaJsonSchemaSerializer[JsonNode]] = classOf[KafkaJsonSchemaSerializer[JsonNode]] } object DataType { implicit val avroType: DataType[Any] = new AvroType[Any] {} diff --git a/src/main/scala/com/lectra/kapoeira/kafka/KapoeiraProducer.scala b/src/main/scala/com/lectra/kapoeira/kafka/KapoeiraProducer.scala index 63824cf..b1b40d8 100644 --- a/src/main/scala/com/lectra/kapoeira/kafka/KapoeiraProducer.scala +++ b/src/main/scala/com/lectra/kapoeira/kafka/KapoeiraProducer.scala @@ -18,131 +18,56 @@ */ package com.lectra.kapoeira.kafka -import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.lectra.kapoeira.Config._ -import com.lectra.kapoeira.domain.SubjectFormat.{Avro, Json} import com.lectra.kapoeira.domain._ import com.lectra.kapoeira.glue.RecordReadOps +import com.lectra.kapoeira.kafka.SchemaRegistry._ import com.typesafe.scalalogging.LazyLogging -import io.confluent.kafka.schemaregistry.avro.{AvroSchema, AvroSchemaUtils} -import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils -import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData import org.apache.kafka.clients.producer._ -import requests.RequestAuth -import zio.{Scope, Task, ZIO} +import org.apache.kafka.common.serialization.ByteArraySerializer +import zio.{Task, ZIO} import java.util.Properties -import scala.util.{Failure, Try} object KapoeiraProducer extends LazyLogging { - private val requestAuth : RequestAuth = { - if (KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE=="USER_INFO") { - (KAFKA_SCHEMA_REGISTRY_BASIC_KEY, KAFKA_SCHEMA_REGISTRY_BASIC_SECRET) - } else RequestAuth.Empty - } - - private def serializeJson(subject: SubjectConfig, bytes: Array[Byte]): JsonNode = { - val schemaString = - requests - .get( - url = s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions/latest/schema", - auth = requestAuth, - verifySslCerts = false - ).text() - val value = new String(bytes) - val mapper = new ObjectMapper() - val schemaJson = mapper.readTree(schemaString) - val valueJson: JsonNode = mapper.readTree(value) - JsonSchemaUtils.envelope(schemaJson, valueJson) - } - - private def serializeAvro(subject: SubjectConfig, bytes: Array[Byte]): GenericData.Record = { - - val schemaVersions = - requests - .get( - url = s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions", - auth = requestAuth, - verifySslCerts = false - ).text() - val versions: Array[String] = schemaVersions.replace("[", "").replace("]", "").split(",") - - val init: Try[GenericData.Record] = Failure[GenericData.Record](new Exception(s"No schema version found for subject ${subject.name}")) - - versions.foldRight(init) { (version, acc) => - if (acc.isFailure) { - val schemaString = - requests - .get( - url = s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions/$version/schema", - auth = requestAuth, - verifySslCerts = false - ).text() - val parser = new Schema.Parser() - val schema = parser.parse(schemaString) - Try(AvroSchemaUtils - .toObject(new String(bytes), new AvroSchema(schema)) - .asInstanceOf[GenericData.Record]) - } - else { - acc - } - }.get - + private val producerProperties = { + val kafkaParams = new Properties() + kafkaProducerProperties.foreach { case (key, value) => + kafkaParams.put(key, value) + } + // specific options + kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getCanonicalName) + kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getCanonicalName) + kafkaParams.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") + kafkaParams.put(ProducerConfig.ACKS_CONFIG, "1") + kafkaParams.put(ProducerConfig.RETRIES_CONFIG, "0") + kafkaParams } - private def producer[K: DataType, V: DataType](topicConfig: TopicConfig): ZIO[ - Any with Scope, - Throwable, - KafkaProducer[Any, Any] - ] = { - ZIO - .acquireRelease(ZIO.attempt { - val kafkaParams = new Properties() - kafkaProducerProperties.foreach { case (key, value) => - kafkaParams.put(key, value) + private val producerZIO = ZIO.acquireRelease(ZIO.attempt { + new KafkaProducer[Array[Byte], Array[Byte]](producerProperties) + }) { producer => + ZIO + .attempt { + producer.flush() + producer.close() } - // specific options - kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, implicitly[DataType[K]].classSerializer) - kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, implicitly[DataType[V]].classSerializer) - kafkaParams.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") - kafkaParams.put(ProducerConfig.ACKS_CONFIG, "1") - kafkaParams.put(ProducerConfig.RETRIES_CONFIG, "0") - kafkaParams.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, "true") - new KafkaProducer[Any, Any](kafkaParams) - }) { producer => - ZIO - .attempt { - producer.flush() - producer.close() - } - .catchAll(err => ZIO.succeed(err.printStackTrace())) - } - } + .catchAll(err => ZIO.succeed(err.printStackTrace())) + } - private def produce[K, V]( - producer: KafkaProducer[K, V], - topic: String, - key: K, - headers: Map[String, Array[Byte]], - recordValue: V - ): Task[Unit] = ZIO.async[Any, Throwable, Unit] { case callback => - val record = new ProducerRecord[K, V](topic, key, recordValue) + private def produce( + producer: KafkaProducer[Array[Byte], Array[Byte]], + topic: String, + headers: Map[String, Array[Byte]], + key: Array[Byte], + recordValue: Array[Byte] + ): Task[Unit] = ZIO.async[Any, Throwable, Unit] { callback => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, key, recordValue) headers.foreach { case (k, v) => record.headers().add(k, v) } - producer.send( - record, - new Callback { - override def onCompletion( - metadata: RecordMetadata, - exception: Exception - ): Unit = callback(ZIO.unit) - } - ) + producer.send(record, (_: RecordMetadata, _: Exception) => callback(ZIO.unit)) } def run( @@ -153,60 +78,9 @@ object KapoeiraProducer extends LazyLogging { ): Task[Unit] = { for { headers <- ZIO.fromTry(record.jsonHeaders) - resource = ((keySubjectConfig, valueSubjectConfig) match { - case (Some(keySubConf), Some(valueSubConf)) => - (keySubConf.format, valueSubConf.format) match { - case (Avro, Avro) => producer[GenericData.Record, GenericData.Record] _ - case (Avro, Json) => producer[GenericData.Record, JsonNode] _ - case (Json, Avro) => producer[JsonNode, GenericData.Record] _ - case (Json, Json) => producer[JsonNode, JsonNode] _ - } - case (None, Some(valueSubConf)) => - valueSubConf.format match { - case Avro => producer[String, GenericData.Record] _ - case Json => producer[String, JsonNode] _ - } - case (Some(keySubConf), None) => - keySubConf.format match { - case Avro => producer[GenericData.Record, String] _ - case Json => producer[JsonNode, String] _ - } - case _ => producer[String, String] _ - })(topicConfig) - _ <- ZIO.scoped(resource.flatMap { producer => - val keyParsed = keySubjectConfig - .map(subject => - subject.format match { - case SubjectFormat.Avro => serializeAvro(subject, record.key.getBytes()) - case SubjectFormat.Json => serializeJson(subject, record.key.getBytes()) - } - ) - .getOrElse(record.key) - val valueParsed = valueSubjectConfig - .map(subject => - subject.format match { - case SubjectFormat.Avro => serializeAvro(subject, record.value) - case SubjectFormat.Json => serializeJson(subject, record.value) - } - ) - .getOrElse(new String(record.value)) - produce(producer, topicConfig.topicName, keyParsed, headers, valueParsed) - }) + key = serialize(keySubjectConfig, topicConfig, record.key, isKey = true) + value = serialize(valueSubjectConfig, topicConfig, record.value, isKey = false) + _ <- ZIO.scoped(producerZIO.flatMap(produce(_, topicConfig.topicName, headers, key, value))) } yield () } - - object CustomCallback extends Callback { - override def onCompletion( - metadata: RecordMetadata, - exception: Exception - ): Unit = - if (exception == null) { - logger.debug( - s"PRODUCER (async) - partition=${metadata.partition()} - offset=${metadata.offset()}" - ) - } else { - logger.error(exception.getMessage) - } - } - } diff --git a/src/main/scala/com/lectra/kapoeira/kafka/SchemaRegistry.scala b/src/main/scala/com/lectra/kapoeira/kafka/SchemaRegistry.scala new file mode 100644 index 0000000..08a9d92 --- /dev/null +++ b/src/main/scala/com/lectra/kapoeira/kafka/SchemaRegistry.scala @@ -0,0 +1,99 @@ +package com.lectra.kapoeira.kafka + +import com.fasterxml.jackson.databind.JsonNode +import com.lectra.kapoeira.Config.{KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE, KAFKA_SCHEMA_REGISTRY_BASIC_KEY, KAFKA_SCHEMA_REGISTRY_BASIC_SECRET, KAFKA_SCHEMA_REGISTRY_URL} +import com.lectra.kapoeira.domain.SubjectFormat.{Avro, Json} +import com.lectra.kapoeira.domain.{SubjectConfig, TopicConfig} +import io.confluent.kafka.schemaregistry.SchemaProvider +import io.confluent.kafka.schemaregistry.avro.{AvroSchema, AvroSchemaProvider, AvroSchemaUtils} +import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClientConfig} +import io.confluent.kafka.schemaregistry.json.jackson.Jackson +import io.confluent.kafka.schemaregistry.json.{JsonSchemaProvider, JsonSchemaUtils} +import io.confluent.kafka.serializers.json.{KafkaJsonSchemaSerializer, KafkaJsonSchemaSerializerConfig} +import io.confluent.kafka.serializers.{AbstractKafkaSchemaSerDeConfig, KafkaAvroSerializer} +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.kafka.common.serialization.StringSerializer + +import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava} +import scala.util.{Failure, Try} + +object SchemaRegistry { + + private val KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE_USER_INFO = "USER_INFO" + + private val schemaProviderList: java.util.List[SchemaProvider] = java.util.List.of(new AvroSchemaProvider, new JsonSchemaProvider) + private val schemaRegistryClient = new CachedSchemaRegistryClient( + KAFKA_SCHEMA_REGISTRY_URL, + AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, + schemaProviderList, + if (KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE==KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE_USER_INFO) { + Map[String, Any]( + SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE -> KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE_USER_INFO, + SchemaRegistryClientConfig.USER_INFO_CONFIG -> s"$KAFKA_SCHEMA_REGISTRY_BASIC_KEY:$KAFKA_SCHEMA_REGISTRY_BASIC_SECRET" + ).asJava + } else Map.empty[String, Any].asJava + ) + + // STRING + private val kafkaStringSerializerConfig = Map.empty[String, Any].asJava + private val kafkaStringKeySerializer = new StringSerializer + kafkaStringKeySerializer.configure(kafkaStringSerializerConfig, true) + private val kafkaStringValueSerializer = new StringSerializer + kafkaStringValueSerializer.configure(kafkaStringSerializerConfig, false) + + // AVRO + private val kafkaAvroSerializerConfig = Map[String, Any]( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> KAFKA_SCHEMA_REGISTRY_URL, + AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS -> "true" + ).asJava + private val kafkaAvroKeySerializer = new KafkaAvroSerializer(schemaRegistryClient) + kafkaAvroKeySerializer.configure(kafkaAvroSerializerConfig, true) + private val kafkaAvroValueSerializer = new KafkaAvroSerializer(schemaRegistryClient) + kafkaAvroValueSerializer.configure(kafkaAvroSerializerConfig, false) + + // JSON + private val kafkaJsonSerializerConfig = Map[String, Any]( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> KAFKA_SCHEMA_REGISTRY_URL, + AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS -> "true", + KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA -> "true" + ).asJava + private val kafkaJsonKeySerializer = new KafkaJsonSchemaSerializer[JsonNode](schemaRegistryClient) + kafkaJsonKeySerializer.configure(kafkaJsonSerializerConfig, true) + private val kafkaJsonValueSerializer = new KafkaJsonSchemaSerializer[JsonNode](schemaRegistryClient) + kafkaJsonValueSerializer.configure(kafkaJsonSerializerConfig, false) + + private val objectMapper = Jackson.newObjectMapper(); + + private def toJsonObject(subjectName: String, input: String): JsonNode = { + val schemaString = schemaRegistryClient.getLatestSchemaMetadata(subjectName).getSchema + val schemaJson = objectMapper.readTree(schemaString) + val valueJson: JsonNode = objectMapper.readTree(input) + JsonSchemaUtils.envelope(schemaJson, valueJson) + } + + private def toAvroObject(subjectName: String, input: String): GenericData.Record = { + val versions: Seq[Integer] = schemaRegistryClient.getAllVersions(subjectName).asScala.toSeq + val init: Try[GenericData.Record] = Failure[GenericData.Record](new Exception(s"No schema version found for subject $subjectName")) + versions.foldRight(init) { (version, acc) => + if (acc.isFailure) { + val schemaString = schemaRegistryClient.getByVersion(subjectName, version, false).getSchema + val parser = new Schema.Parser() + val schema = parser.parse(schemaString) + Try(AvroSchemaUtils + .toObject(input, new AvroSchema(schema)) + .asInstanceOf[GenericData.Record]) + } else acc + }.get + } + + private[kafka] def serialize(subjectConfig: Option[SubjectConfig], topicConfig: TopicConfig, input: String, isKey: Boolean) = (subjectConfig, isKey) match { + case (Some(SubjectConfig(subjectName, _, Avro)), true) => kafkaAvroKeySerializer.serialize(topicConfig.topicName, toAvroObject(subjectName, input)) + case (Some(SubjectConfig(subjectName, _, Avro)), false) => kafkaAvroValueSerializer.serialize(topicConfig.topicName, toAvroObject(subjectName, input)) + case (Some(SubjectConfig(subjectName, _, Json)), true) => kafkaJsonKeySerializer.serialize(topicConfig.topicName, toJsonObject(subjectName, input)) + case (Some(SubjectConfig(subjectName, _, Json)), false) => kafkaJsonValueSerializer.serialize(topicConfig.topicName, toJsonObject(subjectName, input)) + case (_, true) => kafkaStringKeySerializer.serialize(topicConfig.topicName, input) + case _ => kafkaStringValueSerializer.serialize(topicConfig.topicName, input) + } + +} diff --git a/src/test/resources/features/consumer-avro-full-key-value.feature b/src/test/resources/features/consumer-avro-full-key-value.feature index a7254a0..14519dc 100644 --- a/src/test/resources/features/consumer-avro-full-key-value.feature +++ b/src/test/resources/features/consumer-avro-full-key-value.feature @@ -1,4 +1,4 @@ -Feature: consumer-avro-key-value +Feature: consumer-avro-full-key-value Background: Given subject diff --git a/src/test/scala/com/lectra/kapoeira/domain/WhenStepsSpec.scala b/src/test/scala/com/lectra/kapoeira/domain/WhenStepsSpec.scala index f661449..b00f56e 100644 --- a/src/test/scala/com/lectra/kapoeira/domain/WhenStepsSpec.scala +++ b/src/test/scala/com/lectra/kapoeira/domain/WhenStepsSpec.scala @@ -49,7 +49,7 @@ object WhenStepsSpec extends ZIOSpecDefault { WhenStep.empty, List( 0 -> List( - RecordRead(aTopicAlias, aKey, aValue.getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, aValue, Map.empty) ) ) ) @@ -83,10 +83,10 @@ object WhenStepsSpec extends ZIOSpecDefault { WhenStep.empty, List( 0 -> List( - RecordRead(aTopicAlias, aKey, "aValue1".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue2".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue3".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue4".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue1", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue2", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue3", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue4", Map.empty) ) ) ) @@ -127,12 +127,12 @@ object WhenStepsSpec extends ZIOSpecDefault { WhenStep.empty, List( 1 -> List( - RecordRead(aTopicAlias, aKey, "aValue1".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue2".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue1", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue2", Map.empty) ), 2 -> List( - RecordRead(aTopicAlias, aKey, "aValue3".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue4".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue3", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue4", Map.empty) ) ) ) @@ -176,17 +176,17 @@ object WhenStepsSpec extends ZIOSpecDefault { WhenStep.empty, List( 1 -> List( - RecordRead(aTopicAlias, aKey, "aValue1".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue2".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue1", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue2", Map.empty) ), 2 -> List( - RecordRead(aTopicAlias, aKey, "aValue3".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue4".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue3", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue4", Map.empty) ) ) ).addStepOnLastBatch( kafkaStubb.producer().run( - RecordRead(aTopicAlias, aKey, "aValue5".getBytes, Map.empty), + RecordRead(aTopicAlias, aKey, "aValue5", Map.empty), backgroundContext.inputTopicConfigs.apply(aTopicAlias), None, None @@ -233,12 +233,12 @@ object WhenStepsSpec extends ZIOSpecDefault { WhenStep.empty, List( 1 -> List( - RecordRead(aTopicAlias, aKey, "aValue1".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue2".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue1", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue2", Map.empty) ), 2 -> List( - RecordRead(aTopicAlias, aKey, "aValue3".getBytes, Map.empty), - RecordRead(aTopicAlias, aKey, "aValue4".getBytes, Map.empty) + RecordRead(aTopicAlias, aKey, "aValue3", Map.empty), + RecordRead(aTopicAlias, aKey, "aValue4", Map.empty) ) ) ) diff --git a/src/test/scala/com/lectra/kapoeira/glue/InterpotaleTest.scala b/src/test/scala/com/lectra/kapoeira/glue/InterpolateTest.scala similarity index 92% rename from src/test/scala/com/lectra/kapoeira/glue/InterpotaleTest.scala rename to src/test/scala/com/lectra/kapoeira/glue/InterpolateTest.scala index bc21514..21419c8 100644 --- a/src/test/scala/com/lectra/kapoeira/glue/InterpotaleTest.scala +++ b/src/test/scala/com/lectra/kapoeira/glue/InterpolateTest.scala @@ -26,7 +26,7 @@ import org.scalatest.matchers.should.Matchers import java.nio.charset.StandardCharsets -object InterpotaleSpec extends Properties("String") { +object InterpolateTest extends Properties("String") { val ctx = new BackgroundContext property("identity when context is empty") = forAll { (a: String) => @@ -34,7 +34,7 @@ object InterpotaleSpec extends Properties("String") { } } -class InterpotaleTest extends AnyFlatSpec with Matchers { +class InterpolateTest extends AnyFlatSpec with Matchers { behavior of "interpolate" it should "interpolate variables of a string" in { val ctx = new BackgroundContext @@ -60,7 +60,7 @@ class InterpotaleTest extends AnyFlatSpec with Matchers { val recordRead = RecordRead( "${key1}Topic", "${key2}Key", - "${key1}Value".getBytes(StandardCharsets.UTF_8), + "${key1}Value", templateHeaders )