Spring Boot | Как использовать сообщения JSON с помощью Apache Kafka
Apache Kafka - это система потоковой обработки, которая позволяет отправлять сообщения между процессами, приложениями и серверами. В этой статье мы увидим, как публиковать сообщения JSON на консоли загрузочного приложения Spring с помощью Aapche Kafka.
Чтобы узнать, как создать проект загрузки Spring, обратитесь к этой статье.
Working Steps:
- Go to Spring initializr and create a starter project with following dependency:
- Spring for Apache Kafka
- Open the project in an IDE and sync the dependencies. In this article, we would be creating a student model where we would be posting the student details. Therefore, create a model class Student. Add data members and create constructor and override the toString method to see the messages in JSON format. The following is the implementation of the student class:
Student Model
// Java program to implement a// student class// Creating a student classpublicclassStudent {// Data members of the classintid;String firstName;String lastName;// Constructor of the student// classpublicStudent(){}// Parameterized constructor of// the student classpublicStudent(intid, String firstName,String lastName){this.id = id;this.firstName = firstName;this.lastName = lastName;}@OverridepublicString toString(){return"Student{"+"id = "+ id+", firstName = ""+ firstName +"""+", lastName = ""+ lastName +"""+"}";}} - Create a new class Config and add annotations @Configuration and @EnableKafka. Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with Student class object.
Config clas
@EnableKafka@ConfigurationpublicclassConfig {// Function to establish a connection// between Spring application// and Kafka server@BeanpublicConsumerFactory<String, Student>studentConsumer(){// HashMap to store the configurationsMap<String, Object> map=newHashMap<>();// put the host IP in the mapmap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");// put the group ID of consumer in the mapmap.put(ConsumerConfig.GROUP_ID_CONFIG,"id");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);// return message in JSON formatereturnnewDefaultKafkaConsumerFactory<>(map,newStringDeserializer(),newJsonDeserializer<>(Student.class));}@BeanpublicConcurrentKafkaListenerContainerFactory<String,Student>studentListner(){ConcurrentKafkaListenerContainerFactory<String,Student>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(studentConsumer());returnfactory;}} - Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.
KafkaService Class
@ServicepublicclassKafkaService {// Annotation required to listen// the message from Kafka server@KafkaListener(topics ="JsonTopic",groupId ="id", containerFactory="studentListner")publicvoidpublish(Student student){System.out.println("New Entry: "+ student);}} - Start zookeeper and Kafka server. Now we need to create a new topic with the name JsonTopic. To do so, open a new command prompt window and change directory to the Kafka directory.
- Now create a new topic using the command given below:
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux
.inwindowsKafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windows
- Now to run Kafka producer console, use the command below:
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux
.inwindowsKafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windows
- Run the application and and type message on Kafka producer and press enter.
Output:


Attention reader! Don’t stop learning now. Get hold of all the important Java Foundation and Collections concepts with the Fundamentals of Java and Java Collections Course at a student-friendly price and become industry ready. To complete your preparation from learning a language to DS Algo and many more, please refer Complete Interview Preparation Course.