Spring Boot | Как использовать сообщения JSON с помощью Apache Kafka
Apache Kafka - это система потоковой обработки, которая позволяет отправлять сообщения между процессами, приложениями и серверами. В этой статье мы увидим, как публиковать сообщения JSON на консоли загрузочного приложения Spring с помощью Aapche Kafka.
Чтобы узнать, как создать проект загрузки Spring, обратитесь к этой статье.
Working Steps:
- Go to Spring initializr and create a starter project with following dependency:
- Spring for Apache Kafka
- Open the project in an IDE and sync the dependencies. In this article, we would be creating a student model where we would be posting the student details. Therefore, create a model class Student. Add data members and create constructor and override the toString method to see the messages in JSON format. The following is the implementation of the student class:
Student Model
// Java program to implement a
// student class
// Creating a student class
public
class
Student {
// Data members of the class
int
id;
String firstName;
String lastName;
// Constructor of the student
// class
public
Student()
{
}
// Parameterized constructor of
// the student class
public
Student(
int
id, String firstName,
String lastName)
{
this
.id = id;
this
.firstName = firstName;
this
.lastName = lastName;
}
@Override
public
String toString()
{
return
"Student{"
+
"id = "
+ id
+
", firstName = ""
+ firstName +
"""
+
", lastName = ""
+ lastName +
"""
+
"}"
;
}
}
- Create a new class Config and add annotations @Configuration and @EnableKafka. Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with Student class object.
Config clas
@EnableKafka
@Configuration
public
class
Config {
// Function to establish a connection
// between Spring application
// and Kafka server
@Bean
public
ConsumerFactory<String, Student>
studentConsumer()
{
// HashMap to store the configurations
Map<String, Object> map
=
new
HashMap<>();
// put the host IP in the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092"
);
// put the group ID of consumer in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id"
);
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.
class
);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.
class
);
// return message in JSON formate
return
new
DefaultKafkaConsumerFactory<>(
map,
new
StringDeserializer(),
new
JsonDeserializer<>(Student.
class
));
}
@Bean
public
ConcurrentKafkaListenerContainerFactory<String,
Student>
studentListner()
{
ConcurrentKafkaListenerContainerFactory<String,
Student>
factory
=
new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(studentConsumer());
return
factory;
}
}
- Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.
KafkaService Class
@Service
public
class
KafkaService {
// Annotation required to listen
// the message from Kafka server
@KafkaListener
(topics =
"JsonTopic"
,
groupId =
"id"
, containerFactory
=
"studentListner"
)
public
void
publish(Student student)
{
System.out.println(
"New Entry: "
+ student);
}
}
- Start zookeeper and Kafka server. Now we need to create a new topic with the name JsonTopic. To do so, open a new command prompt window and change directory to the Kafka directory.
- Now create a new topic using the command given below:
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux
.inwindowsKafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windows
- Now to run Kafka producer console, use the command below:
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux
.inwindowsKafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windows
- Run the application and and type message on Kafka producer and press enter.
Output:
Attention reader! Don’t stop learning now. Get hold of all the important Java Foundation and Collections concepts with the Fundamentals of Java and Java Collections Course at a student-friendly price and become industry ready. To complete your preparation from learning a language to DS Algo and many more, please refer Complete Interview Preparation Course.