Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solution/communication kafka #132

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft

Conversation

adibwaluya
Copy link

Created Spring Kafka solution v1 (updated 13.06.2022).

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Waluya seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link

@baumeister25 baumeister25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi together,

a very detailed documentation! I really appreciate your research and I think it's very good for your own studies to write that down.

For the solution I fear it is to detailled. You have to imagine an architect under time pressure that wants to know how to use a technology.
Therefore the focus should be on a description as minimal as possible.
Most of our users that have come here have already experience with kafka or a basic understanding. Therefore, the detailed description can be skipped and replaced with valuable references to existing sources.

The most important point is that the solution should not focus on the sample application. It should provide an architect or developer an implementation hint on the three mechanisms that are the focus of this solution (monitoring, tracing and retry). In my eyes a short explanation (with a link to the official sources) plus code snippet that shows the implementation would be enough.
if necessary the configuration of kafka could be also wuite interesting (Maybe tatjana could give you advice how they've used it)

Spring Kafka, Docker, and Postman are the tools being used during the creation of this document. To run both Kafka message broker and Zookeeper, Docker image is used. For the app testing, we use Postman by sending already defined http requests to examine the expected outputs.

# Introduction and Goals
The application should show how spring-kafka can be implemented within a sample application to leverage Apache Kafka's functionalities.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the solution should be more generic and not focussed on the sample application.
You can point to the sample application as a references, but it should not be the goal of the solution.

The goal of the solution is to provide guidance on how to use Kafka efficiently with spring boot leveraging the spring kafka module.
It should therefore concentrate on the aspects

  • configuration
  • monitoring
  • tracing
  • retry

Comment on lines 13 to 54
1. Kafka Message Broker (Produce-Consume Topics)
Kafka is a message broker in part because it allows several consumers to perform different logic focused on a single message. A message broker allows services and applications to exchange messages with one another. The structure of the messages is explicitly specified and independent of the services that send them.

The Producer is in charge of sending messages through a linked application to the message broker. In the meanwhile, the consumer consumes the messages that have been queued in the message broker. Messages are stored in a queue/topic by the message broker.


2. Streaming
The Kafka Streams API is a component of the Apache Kafka open-source data transformation and enrichment project. Kafka Streams is most commonly used to create real-time streaming data pipelines and applications that react to the data streams. It uses a combination of messaging, storage, and stream processing to store and analyze both historical and real-time data.

As previously stated, Kafka Streams provides an API for developing client applications that alter data in Apache Kafka and then publish the modified data onto a new topic. The client application is where the data is processed.


3. Temporary/Permanent Data Storing
Kafka is a streaming data system that may hold data for a period of time before eliminating it. Meaning, Kafka does not discard messages as soon as the consumer reads them.

Kafka may also store data on permanent storage and replicate data among brokers in a cluster, giving it the aspect of a database. Kafka's data can be stored indefinitely by the user.

4. Logging
Apache Kafka Logs are a collection of numerous data segments stored on disk that are referred to as form-topic partitions or specific topic-partitions. Each Kafka log represents a logical unique topic-based partitioning.

Logging with Apache Kafka makes the transformation of unstructured logging streams into an analyzable and understandable output possible. The output can then be used for detecting potential and ongoing problems as well as better environment monitoring.

5. Retry
Failures in a distributed system may happen, i.e. failed message process, network errors, runtime exceptions. Therefore, the retry logic implementation is something essential to have.

It is important to note that Retries in Kafka can be quickly implemented at the consumer side. This is known as Simple Blocking Retries. To accomplish visible error handling without causing real-time disruption, Non-Bloking Retries and Dead Letter Topics are implemented.

6. Data Monitoring (Metrics)
Metrics are measurements that capture a value about the systems at a specific point in time, such as (provide a use case as an example from sample application). They are often collected every second, minute, or other regular interval to track a system's performance over time.

Kafka Monitoring Metrics measure how well a component, for example network latency, performs its role. It's vital to keep an eye on the health of Kafka deployment to ensure that the applications that rely on it keep working.

7. Event Sourcing
Event sourcing is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style.


8. Tracking (Activity Tracking)
With activity tracking it is possible to track page views, searches, uploads or other actions users may take. When an event happens in the blog, for example a user logs in, an information about the event and a tracking event and will be placed into a record. The record will be placed on a specified Kafka topic.

Kafka-clients have many implementations in different languages. However, this document focuses on Spring Kafka using Java.

The following are the advantages and (or) disadvantages of Kafka.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though I like this explicit list of details around kafka, this should be shortened to a bare minimum about kafka + some useful references to detailed information in blogs, documentation or whatever web resources.

I would basically describe Kafka as an event streaming plattform that stores event messages in distributed logs that form logical topics. It can be extended with a streaming API and various connectors to data sources. The main difference to other message broker is the fact that event messages are not deleted

Maybe a simple picutre would help the unerstanding of such a small aspect and a link to the official documentation
image
(from https://kafka.apache.org/documentation/#introduction)

Because of the focus of this article I would keep 5. Retry.

Comment on lines 57 to 148
1. Streaming

Advantages
- Transferring or filtering data between topics is easier
- Events can be sent to external systems
- Allows data parallelism, distributed coordination, fault
tolerance, scalability
- supports the creation of lightweight and small microservices
- it's flexible, scalable and fault-tolerant
- Data operations might be stateless, stateful or windowed.
- non-buffering system => streams are processed one at a time


2. Logging

Advantages
- supports many integrations
- a clearly readable data set
- simple Implementation
- provides organizations access to a large number of tools

Disadvantages
- Log messages are maintained for a certain amount of time before
deleting them
- During a spike, events can be deleted or lost without a user's
knowledge while they are timed out
- lack of scalability within a large enterprise environment
- An expanding company will have to increase the number pf resources on
servers, as well as process more logs.

3. Message Broker

Advantages
- Enables communication between several services that may not be
executing at the same time.
- Asynchronous processing improves system performance.
- Ensuring that messages are delivered reliably

Disadvantages
- Increasing the complexity of the system
- Debugging can be difficult
- Learning takes time


4. Retry

Advantages:
- Real time traffic is not disrupted (Batch processing can be unblocked)
- Error handling is observable (metadata can be obtained on the retries)
- Higher real-time throughput can be achieved

Disadvantage:
- Kafka's ordering guarantees for the topic on which non-blocking retries is implemented is no longer applied.
- The batch processing is blocked, as failed messages are repeatedly processed in real time (they are re-consumed again and again).

5. Data Monitoring (Metrics)

Advantages:
Assist in identifying potential bottlenecks and performance concerns before they cause major issues

Disadvantages:
- Kafka lacks a full set of management and monitoring capabilities. This makes enterprise support staff wary about implementing kafka and maintaining it in the long run.
- The first challenge above can be overcome by installing third-party, open-source, or commercial graphical tools that offer administrative and monitoring features.

6. Event Sourcing

Advantages:
- Kafka can be use as a natural backbone for Event Sourcing
Disadvantages:


7. Tracking (Activity Tracking)

Advantages: -


Disadvantages:
- often requires a very high volume of throughput


8. Temporary/Permanent Data Storing

Advantages:
- can be easily integrated with databases and cloud data lake storage
- quite easy to understand
- helps to avoid copying the full dump of one database to another


Disadvantages:
- doesn’t provide arbitrary access lookup to the data
- there isn’t a query language like SQL available to help you access data
- it is not a replacement for databases

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not keep this long list of advantages and disadvantages.

If you're finished with the documentation early in time a section "Constraints and Alternatives" would be beneficial to describe problems with kafka and for what scenarios other solutions like RabbitMq or others are a better fit.

- there isn’t a query language like SQL available to help you access data
- it is not a replacement for databases

# Context and Scope (in progress...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is quite important for you I guess.

For me the scope is really limited to how can you implement

  • logging
  • monitoring
  • retry
    with spring-kafka

Comment on lines 152 to 153
# Solution Strategy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the solution strategy chapter I personally would expect a basic understanding on how you would like to implement the three goals.

Monitoring and logging with interceptors of the incoming and outgoing streams, that add trace ids and log the messages.

Furthermore the retry is used from kafka and here the picture that we already know using dead letter queues (or how it's named here)

Comment on lines 154 to 202
# Apache Kafka
Apache Kafka was built to make real-time data available to all the applications that need to use it.

## Use cases

### Publish-Subscribe Messaging

Kafka can be used as a distributed pub/sub messaging system that replaces traditional message brokers like ActiveMQ and RabbitMQ.

### Streams

Kafka can be used to perform real-time computations on event streams.

### Metrics and Monitoring

Kafka can be used to aggregate statistics from distributed applications to produce centralized feeds with real-time metrics.

### Log Aggregation

Kafka can be used as a single source of truth by centralizing logging data across all sources in a distributed system.

## Kafka Concepts

### Kafka and Events

An event is any type of action, incident, or change that's identified or recorded by software or applications. Kafka models events as key/value pairs. Keys are often primitive types like strings or integers. Values are serialized representations of an application domain object or a raw message.

### Kafka Topics

A topic is a log of events. They are append only: New messages always go to the end of a log. The logs are immutable - once something has happened, it is difficult to make it un-happen.
Traditional messaging systems have topics and queues, which store messages temporarily for a short time. The advantage of Kafka topics is, that every topic can be configured to expire data after it reaches a certain age, that means the data can be stored for as short as seconds to indefinitely. The logs are files stored on disk.

### Kafka Partitioning

Partitioning takes the single topic log and breaks in into multiple logs, each of which can live on a separate node in the Kafka cluster. This makes Kafka a distributed system and helps it scale. Kafka messages can be given a key, the key guarantees that all the events from a given entity will always arrive in order. Without a key, the messages will be distributed among all the topic's partitions and any kind of ordering will be ignored.

### Kafka Brokers

Kafka is composed of a network of containers or machines called brokers, each running the Kafka broker process. Each broker hosts some set of partitions and handles incoming requests to write new events to those partitions or read events from them. Brokers also handle replication of partitions between each other.

### Replication

Partition data is copied automatically from a leader broker to follower brokers in the network. If one node in the cluster dies, another will take its role. This helps availability of data.

### Kafka Producers and Consumers

Producers connect and send key/value pairs to specific topic on the cluster. Consumers can then connect to the cluster and subscribe to one or more topics, listening and consuming key/value pairs of Kafka messages. Consuming a message does not destroy it, it can still be read by any other consumer. This differentiates Kafka from other traditional message brokers. Many consumers can read from one topic. Applications can be both consumers and producers.

#### Source: https://developer.confluent.io/what-is-apache-kafka/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is necessary for the solution. Maybe the use cases could be the input for a "Constraints and alternatives" section where you list the use cases (as bullet points) with the hint these are common use cases for kafka.

I would expect for this article that whoever reads this article is ciomfortable in using kafka. If not, he should find here references to good documentation.

Comment on lines 204 to 423
```
@KafkaListener(id = "bookings", topics = "bookings", groupId = "ship")
public void onBookingEvent(Booking booking) throws ShipNotFoundException, BookingAlreadyConfirmedException {
LOG.info("Received: {}", booking);
shipComponentLogic.confirmBooking(booking);
}
```
The booking can be confirmed through `confirmBooking` function, which can be found in `ShipComponentLogic` class. Firstly, it will check if the status of this newly created booking is already confirmed or not. If it is, then `BookingAlreadyConfirmedException` will be thrown. If it's newly created, the ship that'll be booked is checked whether there's enough available container or not. If there is not enough available containers, the booking is cancelled (`BookingStatus.CANCELED`). Otherwise, it will be confirmed (`BookingStatus.CONFIRMED`) and this confirmation is afterwards sent to the `ship-bookings` topic.
```
@Transactional(rollbackFor = {BookingAlreadyConfirmedException.class})
public void confirmBooking(Booking booking) throws BookingAlreadyConfirmedException, ShipNotFoundException {
Ship ship = shipRepository.findById(booking.getShipId()).orElseThrow(() -> new ShipNotFoundException(booking.getShipId()));
LOG.info("Found: {}", ship);

if (booking.getBookingStatus() == BookingStatus.CONFIRMED) {
throw new BookingAlreadyConfirmedException(booking.getId());
} else if (booking.getBookingStatus().equals(BookingStatus.REQUESTED)) {
if(booking.getContainerCount() < ship.getAvailableContainers()){
ship.setAvailableContainers(ship.getAvailableContainers() - booking.getContainerCount());
booking.updateBookingStatus(BookingStatus.CONFIRMED);
shipRepository.save(ship);
} else {
booking.updateBookingStatus(BookingStatus.CANCELED);
}
}

template.send("ship-bookings", booking.getId(), booking);
LOG.info("Sent: {}", booking);
}
```

The confirmation is finally consumed by `booking-service`, in which the booking is processed, hence updating the booking status to `CONFIRMED`.
```
@KafkaListener(id ="ship-bookings", topics = "ship-bookings", groupId = "booking")
public void listenBooking(Booking booking) throws BookingNotFoundException {
LOG.info("Received message: {}", booking.toString());
bookingComponentBusinessLogic.processBooking(booking);
}
```

#### Logging
As logging tool, SLF4J (Simple Logging Facade for Java) is used. It is an abstraction layer for different Java logging frameworks, such as Log4j2 or logback.

The general pattern (common solution) for accessing loggers by defining logger as static final instance is no longer recommended, as well as defining logger as instance variable (as slf4j.org used to recommend). For example, in `ShipComponentLogic`, the log is declared as static final instance as following:

```
private static final Logger LOG = LoggerFactory.getLogger(ShipComponentLogic.class);
```

In `BookingComponentMessagingGateway`, the log is defined as instance variable as following:
```
private final Logger LOG = LoggerFactory.getLogger(getClass());
```
Further comparison and explanation between these two different declarations can be found in https://www.slf4j.org/faq.html#declared_static.

SLF4J standardized the logging levels, which are different for the particular implementations. The usage of SLF4J is straightforward yet adaptable, allowing for better readability and performance.

The logging levels used in SLF4J are: *TRACE*, *INFO*, *DEBUG*, *ERROR*, and *WARN*. *FATAL* logging level (introduced in Log4j) is removed in SLF4J due to redundancy
>The Marker interface, part of the org.slf4j package, renders the FATAL level largely redundant.
> -- <cite>https://www.slf4j.org/faq.html#fatal</cite>

It is also dropped due to the fact that we should not determine when to terminate an application in a logging framework (https://www.baeldung.com/slf4j-with-log4j2-logback).


#### Tracing
In microservice architecture, tracing is implemented to monitor applications as well as to help identify where errors or failures occur, which may cause poor performance. Just like in our application which contains several services, it is necessary to trace the invocation from one service to another, either directly or through a kafka message broker.

In this sample application, tracing implementations are supported by Spring Cloud Sleuth. Sleuth seamlessly interfaces with logging frameworks such as SLF4J and logback to add unique IDs that aid in the tracking and diagnosis of issues leveraging logs. Before it is implemented, its dependency must be defined in `build.gradle` file as following:
```
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:2021.0.2"
}
}

dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-sleuth'
}
```
Once added within the application, Spring Cloud Sleuth automatically formats the logs that contain traceId and spanId. Below you can see the logs when the application just started and when a customer just booked a ship.

```
2022-06-13 22:01:16.243 INFO [shipkafka,bd2025db4480982a,bd2025db4480982a] 14272 --- [nio-8080-exec-5] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1655150476243
```
```
2022-06-13 22:01:16.430 INFO [shipkafka,bd2025db4480982a,cfcad1e77644ff13] 14272 --- [ bookings-0-C-1] c.d.s.s.api.ShipRestController : Received: Booking(createdOn=Mon Jun 13 22:01:16 CEST 2022, id=12, lastUpdatedOn=Mon Jun 13 22:01:16 CEST 2022, containerCount=2, shipId=4, bookingStatus=REQUESTED, version=0)
```
The part of normal log with additional core information from Spring Sleuth follows the format of:
**[application name, traceId, spanId]**

- **Application name** - name that is set in the properties file/settings.gradle file. It can be used to aggregate logs from multiple instances of the same application.
- **traceId** - unique identifier for a specific job, request, or action. It is the same across all microservices.
- **spanId** - unique identifier that tracks a unit of work. It is assigned to each operation and therefore can be vary depends on what request is performed.


Once the application is started, the traceID and spanID **will be the same** by default.

Further reading: https://www.baeldung.com/spring-cloud-sleuth-single-application#:~:text=TraceId%20%E2%80%93%20This%20is%20an%20id,that%20consists%20of%20multiple%20steps.

#### Retry (in progress...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would fit good into the sample application documentation.

In the solution the documentation should be limited in the last step on the how in a generic way.

  1. How to do monitoring: -> Here it should only be a code snippet with a short explanation
  2. How to do logging: -> Here the same
  3. How to use the retry mechanism: -> Code snippet if necessary (or just the configuration parts explained)

@adibwaluya
Copy link
Author

Thank you for your feedback. We just edited the solution (removed unnecessary elaborations and added some useful references). Please kindly re-review and we're looking forward to further feedback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants