Skip to content

Commit

Permalink
feat: rework producer & serializer
Browse files Browse the repository at this point in the history
Signed-off-by: Mehdi Rebiai <[email protected]>
  • Loading branch information
mrebiai committed Aug 14, 2024
1 parent b48fd69 commit a1db5c1
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 216 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ lazy val root = (project in file("."))
"org.scalatest" %% "scalatest" % "3.2.19",
"com.typesafe" % "config" % "1.4.3",
"io.gatling" % "gatling-jsonpath" % "3.11.5",
"com.lihaoyi" %% "requests" % "0.9.0",
"com.lihaoyi" %% "os-lib" % "0.10.3",
"ch.qos.logback" % "logback-classic" % "1.5.6" % Runtime,
"dev.zio" %% "zio" % zioVersion,
Expand Down
14 changes: 5 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ services:
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://primary-ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: USER_INFO
Expand All @@ -87,10 +87,10 @@ services:
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
primary-ksqldb-server:
ksqldb-server:
image: confluentinc/cp-ksqldb-server:${CONFLUENT_KAFKA_VERSION}
hostname: primary-ksqldb-server
container_name: primary-ksqldb-server
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
- schema-registry
Expand All @@ -107,8 +107,6 @@ services:
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
create-topics:
profiles:
- test
image: confluentinc/cp-enterprise-kafka:${CONFLUENT_KAFKA_VERSION}
volumes:
- ./src/test/resources/scripts:/opt/kapoeira/scripts
Expand All @@ -126,8 +124,6 @@ services:
echo Creating subjects... && \
./createSchemas.sh'"
create-streams:
profiles:
- test
image: confluentinc/cp-ksqldb-cli:${CONFLUENT_KAFKA_VERSION}
container_name: create-streams
volumes:
Expand All @@ -142,7 +138,7 @@ services:
- -f
- /opt/kapoeira/scripts/createStreams.ksql
- --
- http://primary-ksqldb-server:8088
- http://ksqldb-server:8088
kapoeira:
profiles:
- test
Expand Down
18 changes: 8 additions & 10 deletions src/main/scala/com/lectra/kapoeira/domain/RecordRead.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ package com.lectra.kapoeira.domain
import com.lectra.kapoeira.domain
import com.lectra.kapoeira.domain.Services.ReadHeaders

import java.nio.charset.StandardCharsets
import scala.util.Try

/** The record read from the cucumber Datatable directly, or a File.
*/
final case class RecordRead(
topicAlias: String,
key: String,
value: Array[Byte],
value: String,
headers: Map[String, Any]
)

Expand All @@ -54,11 +53,11 @@ trait RecordData[T] {
/** Facilitate import of all implicits.
*/
trait RecordReadImplicits
extends InterpotaleImplicits
extends InterpolateImplicits
with RecordDataImplicits
with RecordDataFromFileImplicits

trait InterpotaleImplicits {
trait InterpolateImplicits {

implicit val interpolateString: Interpolate[String] =
(t: String, ctx: BackgroundContext) => ctx.substituteVariablesIn(t)
Expand All @@ -70,8 +69,7 @@ trait InterpotaleImplicits {
ctx
.substituteVariablesIn(t.key),
ctx
.substituteVariablesIn(new String(t.value))
.getBytes(StandardCharsets.UTF_8),
.substituteVariablesIn(t.value),
interpolateMap(t.headers, ctx)
)
}
Expand Down Expand Up @@ -123,7 +121,7 @@ trait RecordDataFromFileImplicits {
domain.RecordRead(
t.topicAlias,
columns(0),
columns(1).getBytes(StandardCharsets.UTF_8),
columns(1),
Try(columns(2))
.map(headersString => readHeaders.readHeaders(headersString))
.getOrElse(Map.empty)
Expand All @@ -141,7 +139,7 @@ trait RecordDataFromFileImplicits {
RecordRead(
t.topicAlias,
t.key,
line.getBytes(StandardCharsets.UTF_8),
line,
Map.empty
)
})
Expand All @@ -159,7 +157,7 @@ trait RecordDataFromFileImplicits {
RecordRead(
t.topicAlias,
t.key,
line.getBytes(StandardCharsets.UTF_8),
line,
Map.empty
)
)
Expand All @@ -179,7 +177,7 @@ trait RecordDataImplicits {
override def read(t: KeyValueRecord): RecordRead = RecordRead(
t.topicAlias,
t.key,
t.value.getBytes(StandardCharsets.UTF_8),
t.value,
t.headers
)
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/lectra/kapoeira/glue/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.lectra.kapoeira.domain._
import com.lectra.kapoeira.kafka.KapoeiraConsumer._
import com.lectra.kapoeira.kafka.{KapoeiraAdmin, KapoeiraConsumer}
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.json.jackson.Jackson
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -123,7 +124,7 @@ package object glue extends LazyLogging with RecordReadImplicits {
}
}

private[glue] val objectMapper = new ObjectMapper()
private[glue] val objectMapper = Jackson.newObjectMapper();

implicit class RecordReadOps(recordRead: RecordRead) {
def jsonHeaders: Try[Map[String, Array[Byte]]] = Try(
Expand Down
14 changes: 3 additions & 11 deletions src/main/scala/com/lectra/kapoeira/kafka/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,26 @@
*/
package com.lectra.kapoeira.kafka
import com.fasterxml.jackson.databind.JsonNode
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
import io.confluent.kafka.serializers.json.{KafkaJsonSchemaDeserializer, KafkaJsonSchemaSerializer}
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer
import org.apache.avro.generic.GenericData
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.serialization.StringDeserializer

sealed trait DataType[A] {
type DeserializerT
type SerializerT
val classDeserializer: Class[DeserializerT]
val classSerializer: Class[SerializerT]
}
trait AvroType[T] extends DataType[T] {
type DeserializerT = KafkaAvroDeserializer
type SerializerT = KafkaAvroSerializer
val classDeserializer: Class[KafkaAvroDeserializer] = classOf[KafkaAvroDeserializer]
val classSerializer: Class[KafkaAvroSerializer] = classOf[KafkaAvroSerializer]
}
case object StringType extends DataType[String] {
type DeserializerT = StringDeserializer
type SerializerT = StringSerializer
val classDeserializer: Class[StringDeserializer] = classOf[StringDeserializer]
val classSerializer: Class[StringSerializer] = classOf[StringSerializer]
}
case object JsonType extends DataType[JsonNode] {
type DeserializerT = KafkaJsonSchemaDeserializer[JsonNode]
type SerializerT = KafkaJsonSchemaSerializer[JsonNode]
val classDeserializer: Class[KafkaJsonSchemaDeserializer[JsonNode]] = classOf[KafkaJsonSchemaDeserializer[JsonNode]]
val classSerializer: Class[KafkaJsonSchemaSerializer[JsonNode]] = classOf[KafkaJsonSchemaSerializer[JsonNode]]
}
object DataType {
implicit val avroType: DataType[Any] = new AvroType[Any] {}
Expand Down
Loading

0 comments on commit a1db5c1

Please sign in to comment.