forked from anshukumar045/ScalaCollections
-
Notifications
You must be signed in to change notification settings - Fork 0
/
KafkAkkaConsumerApp.scala
63 lines (48 loc) · 2.45 KB
/
KafkAkkaConsumerApp.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.kanshu.datastructures
object KafkAkkaConsumerApp {
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, Materializer, SinkShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object KafkaAkkaConsumerApp extends App{
implicit val system: ActorSystem = ActorSystem("KafkaConsumerExample")
implicit val materializer: Materializer = Materializer.matFromSystem(system)
// Define Kafka consumer settings
val consumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("<broker>") // Kafka broker address
.withGroupId("my-group") // Consumer group
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Start from the beginning of the topic
// Define the Kafka topic you want to consume from
val kafkaTopic = "<topic>"
// Create a Kafka source
val kafkaSource: Source[ConsumerRecord[String, String], Consumer.Control] =
Consumer.plainSource(consumerSettings, Subscriptions.topics(kafkaTopic))
val processFlow: Flow[ConsumerRecord[String, String], ConsumerRecord[String, String], _] =
Flow[ConsumerRecord[String, String]].map { record =>
println(s"Received value message: ${record.value()}")
record
}
// Sink: Just consuming the message and materializing a future
val sink: Sink[ConsumerRecord[String, String], Future[Done]] =
Sink.foreach(_ => ())
// Create a RunnableGraph connecting source, flow, and sink
val graph: RunnableGraph[Consumer.Control] = kafkaSource.via(processFlow).to(sink)
val control: Consumer.Control = graph.run()
private val pathToDispatchers = "/user/*"
Runtime.getRuntime.addShutdownHook(new Thread() {
() => {
// Gracefully stop the Akka ActorSystem
val termination = system.terminate()
control.shutdown()
system.actorSelection(pathToDispatchers) ! akka.actor.PoisonPill
Await.result(termination, 60.seconds)
}
})
}