Апач Кафка потребительский

Опубликовано: 9 Января, 2023

Kafka Consumers привыкла читать данные из темы и помнить, что тема снова идентифицируется по ее имени. Таким образом, потребители достаточно умны, и они будут знать, из какого брокера читать и из каких разделов читать. А в случае сбоев брокера потребители знают, как восстанавливаться, и это снова хорошее свойство Apache Kafka. Теперь данные для потребителей будут считываться по порядку в каждом разделе . Теперь, пожалуйста, обратитесь к изображению ниже. Итак, если мы посмотрим на Потребителя, потребляющего из Topic-A/Partition-0 , то он сначала прочитает сообщение 0, затем 1, затем 2, затем 3, вплоть до сообщения 11. Если другой потребитель читает из двух разделов, например Раздел-1 и Раздел-2, будет читать оба раздела по порядку. Это может быть с ними одновременно, но внутри раздела данные будут считываться по порядку, но между разделами, у нас нет возможности сказать, какой из них будет считан первым или вторым, и поэтому нет упорядочивание по разделам в Apache Kafka .

Таким образом, наши потребители Kafka будут читать наши сообщения от Kafka, которые состоят из байтов, поэтому потребителю потребуется десериализатор , чтобы указать, как преобразовать эти байты обратно в некоторые объекты или данные, и они будут использоваться для ключа и значение сообщения. Итак, у нас есть наш ключ и наше значение, и они оба являются двоичными полями и байтами, поэтому мы будем использовать KeyDeserializer типа IntegerDeserializer , чтобы преобразовать это в int и получить обратно число 123 для ключевых объектов, а затем мы будем использовать StringDeserializer чтобы преобразовать байты в строку и прочитать значение объекта обратно в строку «hello world». Пожалуйста, обратитесь к изображению ниже.

Итак, как мы видим здесь, выбор правильного десериализатора очень важен, потому что, если вы не выберете правильный, вы можете не получить правильные данные в конце. Итак, некоторые общие десериализаторы приведены ниже.

  • Строка (включая JSON, если ваши данные соседние)I
  • Целое число и число с плавающей запятой
  • Avro и Protobuf для расширенных типов данных

Пример потребителя Apache Kafka

В этом примере мы обсудим , как мы можем использовать сообщения из тем Kafka с помощью Spring Boot . Кратко говоря о Spring Boot, это одна из самых популярных и наиболее часто используемых сред языка программирования Java. Это платформа на основе микросервисов, и создание готового к работе приложения с использованием Spring Boot занимает очень меньше времени. Spring Boot упрощает создание автономных приложений на основе Spring производственного уровня, которые можно « просто запустить ». Итак, начнем с реализации.

Prerequisite: Make sure you have installed Apache Kafka in your local machine. Refer to this article How to Install and Run Apache Kafka on Windows?

Шаг 1: Перейдите по этой ссылке и создайте проект Spring Boot. Добавьте зависимость « Spring для Apache Kafka » в свой проект Spring Boot.

Шаг 2: Создайте файл конфигурации с именем KafkaConfig . Ниже приведен код файла KafkaConfig.java .

Java




// Java Program to Illustrate Kafka Configuration
  
package com.amiya.kafka.apachekafkaconsumer.config;
  
// Importing required classes
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  
// Annotations
@EnableKafka
@Configuration
  
// Class
public class KafkaConfig {
  
    @Bean
    public ConsumerFactory<String, String> consumerFactory()
    {
  
        // Creating a Map of string-object pairs
        Map<String, Object> config = new HashMap<>();
  
        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
  
        return new DefaultKafkaConsumerFactory<>(config);
    }
  
    // Creating a Listener
    public ConcurrentKafkaListenerContainerFactory
    concurrentKafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<
            String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Шаг 3. Создайте потребительский файл с именем KafkaConsumer.

Java




// Java Program to Illustrate Kafka Consumer
  
package com.amiya.kafka.apachekafkaconsumer.consumer;
  
// Importing required classes
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
  
@Component
  
// Class
public class KafkaConsumer {
  
    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id")
  
    // Method
    public void
    consume(String message)
    {
        // Print statement
        System.out.println("message = " + message);
    }
}

Шаг 4. Теперь нам нужно сделать следующие вещи, чтобы использовать сообщения из тем Kafka с помощью Spring Boot.

  • Запустите сервер Apache Zookeeper.
  • Запустите сервер Apache Kafka
  • Отправляйте сообщения из Kafka Topics

Запустите сервер Apache Zookeeper с помощью этой команды.

C:kafka>.inwindowszookeeper-server-start.bat .configzookeeper.properties

Точно так же запустите сервер Apache Kafka с помощью этой команды

C:kafka>.inwindowskafka-server-start.bat .configserver.properties

Выполните следующую команду, чтобы отправить сообщения из тем Kafka.

C:kafka>.inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

Шаг 5: Теперь запустите приложение весенней загрузки. Убедитесь, что вы изменили номер порта в файле application.properties.

server.port=8081

Давайте запустим загрузочное приложение Spring внутри файла ApacheKafkaConsumerApplication.

Вывод: в выводе вы можете увидеть, когда вы отправляете сообщение из Kafka Topics, оно отображается на консоли в режиме реального времени.