Spring Boot | Как использовать сообщения JSON с помощью Apache Kafka

Опубликовано: 4 Февраля, 2022

Apache Kafka - это система потоковой обработки, которая позволяет отправлять сообщения между процессами, приложениями и серверами. В этой статье мы увидим, как публиковать сообщения JSON на консоли загрузочного приложения Spring с помощью Aapche Kafka.

Чтобы узнать, как создать проект загрузки Spring, обратитесь к этой статье.

Working Steps:

  1. Go to Spring initializr and create a starter project with following dependency:
    • Spring for Apache Kafka
  2. Open the project in an IDE and sync the dependencies. In this article, we would be creating a student model where we would be posting the student details. Therefore, create a model class Student. Add data members and create constructor and override the toString method to see the messages in JSON format. The following is the implementation of the student class:

    Student Model

    // Java program to implement a
    // student class
      
    // Creating a student class
    public class Student {
      
        // Data members of the class
        int id;
        String firstName;
        String lastName;
      
        // Constructor of the student
        // class
        public Student()
        {
        }
      
        // Parameterized constructor of
        // the student class
        public Student(int id, String firstName,
                       String lastName)
        {
            this.id = id;
            this.firstName = firstName;
            this.lastName = lastName;
        }
      
        @Override
        public String toString()
        {
            return "Student{"
                + "id = " + id
                + ", firstName = "" + firstName + """
                + ", lastName = "" + lastName + """
                + "}";
        }
    }

  3. Create a new class Config and add annotations @Configuration and @EnableKafka. Now create beans ConsumerFactory and ConcurrentKafkaListenerContainerFactory with Student class object.

    Config clas

    @EnableKafka
    @Configuration
    public class Config {
      
        // Function to establish a connection
        // between Spring application
        // and Kafka server
        @Bean
        public ConsumerFactory<String, Student>
        studentConsumer()
        {
      
            // HashMap to store the configurations
            Map<String, Object> map
                = new HashMap<>();
      
            // put the host IP in the map
            map.put(ConsumerConfig
                        .BOOTSTRAP_SERVERS_CONFIG,
                    "127.0.0.1:9092");
      
            // put the group ID of consumer in the map
            map.put(ConsumerConfig
                        .GROUP_ID_CONFIG,
                    "id");
            map.put(ConsumerConfig
                        .KEY_DESERIALIZER_CLASS_CONFIG,
                    StringDeserializer.class);
            map.put(ConsumerConfig
                        .VALUE_DESERIALIZER_CLASS_CONFIG,
                    JsonDeserializer.class);
      
            // return message in JSON formate
            return new DefaultKafkaConsumerFactory<>(
                map, new StringDeserializer(),
                new JsonDeserializer<>(Student.class));
        }
      
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String,
                                                       Student>
        studentListner()
        {
            ConcurrentKafkaListenerContainerFactory<String,
                                                    Student>
                factory
                = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(studentConsumer());
            return factory;
        }
    }

  4. Create a class KafkaService with @Service annotation. This class will contain the listener method to publish the message on the console.

    KafkaService Class

    @Service
    public class KafkaService {
      
        // Annotation required to listen
        // the message from Kafka server
        @KafkaListener(topics = "JsonTopic",
                       groupId = "id", containerFactory
                                       = "studentListner")
        public void
        publish(Student student)
        {
            System.out.println("New Entry: "
                               + student);
        }
    }
  5. Start zookeeper and Kafka server. Now we need to create a new topic with the name JsonTopic. To do so, open a new command prompt window and change directory to the Kafka directory.
  6. Now create a new topic using the command given below:

    bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux

    .inwindowsKafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windows

  7. Now to run Kafka producer console, use the command below:

    bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux

    .inwindowsKafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windows

  8. Run the application and and type message on Kafka producer and press enter.

    Output:

    Attention reader! Don’t stop learning now. Get hold of all the important Java Foundation and Collections concepts with the Fundamentals of Java and Java Collections Course at a student-friendly price and become industry ready. To complete your preparation from learning a language to DS Algo and many more,  please refer Complete Interview Preparation Course.