Skip to content

Commit

Permalink
Move kafka value context specific classes into package (#33)
Browse files Browse the repository at this point in the history
* Remove unused imports
* flatMap doesn't make sense in this context, removed.
* Cleanup after testing in node ingestor
  • Loading branch information
mtps authored Oct 28, 2022
1 parent fc563d0 commit 9cc0d94
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
26 changes: 20 additions & 6 deletions cli/src/main/kotlin/tech/figure/kafka/cli/Main.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package tech.figure.kafka.cli

import ch.qos.logback.classic.Level
import tech.figure.kafka.coroutines.channels.kafkaConsumerChannel
import tech.figure.kafka.coroutines.channels.kafkaProducerChannel
import java.time.OffsetDateTime
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import kotlinx.cli.ArgParser
import kotlinx.cli.ArgType
Expand All @@ -13,6 +11,7 @@ import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
Expand All @@ -25,6 +24,10 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import tech.figure.kafka.context.acking
import tech.figure.kafka.context.withValue
import tech.figure.kafka.coroutines.channels.kafkaConsumerChannel
import tech.figure.kafka.coroutines.channels.kafkaProducerChannel

@OptIn(ObsoleteCoroutinesApi::class)
fun main(args: Array<String>) {
Expand All @@ -36,8 +39,10 @@ fun main(args: Array<String>) {
parser.parse(args)

val commonProps = mapOf<String, Any>(
CommonClientConfigs.GROUP_ID_CONFIG to group + OffsetDateTime.now().minute,
CommonClientConfigs.GROUP_ID_CONFIG to UUID.randomUUID().toString(),
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG to broker,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 100,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
Expand All @@ -46,6 +51,7 @@ fun main(args: Array<String>) {

log {
"org.apache.kafka".level = Level.WARN
"tech.figure.kafka.coroutines.channels.KafkaConsumerChannel".level = Level.INFO
}

val log = logger("main")
Expand Down Expand Up @@ -108,8 +114,16 @@ fun main(args: Array<String>) {
val i = AtomicInteger(0)
ticker.receiveAsFlow().onEach {
log.info("ticker")
producer.send(ProducerRecord(source, dest, "test-${i.getAndIncrement()}"))
producer.send(ProducerRecord(source, "test-${i.getAndIncrement()}", "testing"))
}.collect()
}

launch(Dispatchers.IO) {
incoming.receiveAsFlow()
.map { it.map { it.withValue(it.offset) } }
.acking()
.onEach { log.info("committed offset:$it") }
.collect()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package tech.figure.kafka.context

import tech.figure.kafka.records.AckedConsumerRecord

data class AckedConsumerRecordValue<K, V, T>(val record: AckedConsumerRecord<K, V>, val value: T) {
fun <R> map(block: (T) -> R): AckedConsumerRecordValue<K, V, R> =
AckedConsumerRecordValue(record, block(value))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tech.figure.kafka.context

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import tech.figure.kafka.records.UnAckedConsumerRecord

fun <K, V, T> UnAckedConsumerRecord<K, V>.withValue(t: T) = UnAckedConsumerRecordValue(this, t)

fun <K, V, T> Flow<List<UnAckedConsumerRecordValue<K, V, T>>>.acking(): Flow<List<AckedConsumerRecordValue<K, V, T>>> =
map { it.map { it.ack() } }

data class UnAckedConsumerRecordValue<K, V, T>(val record: UnAckedConsumerRecord<K, V>, val value: T) {
suspend fun ack(): AckedConsumerRecordValue<K, V, T> =
AckedConsumerRecordValue(record.ack(), value)

fun <R> map(block: (T) -> R): UnAckedConsumerRecordValue<K, V, R> =
UnAckedConsumerRecordValue(record, block(value))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.record.TimestampType

interface KafkaRecord<K, V> {
companion object {
fun <K, V> wrapping(consumerRecord: ConsumerRecord<K, V>): KafkaRecord<K, V> {
return tech.figure.kafka.records.wrapping(consumerRecord)
}
}

val topic: String
val partition: Int
val offset: Long
Expand Down

0 comments on commit 9cc0d94

Please sign in to comment.