Апач Кафка потребительский
Kafka Consumers привыкла читать данные из темы и помнить, что тема снова идентифицируется по ее имени. Таким образом, потребители достаточно умны, и они будут знать, из какого брокера читать и из каких разделов читать. А в случае сбоев брокера потребители знают, как восстанавливаться, и это снова хорошее свойство Apache Kafka. Теперь данные для потребителей будут считываться по порядку в каждом разделе . Теперь, пожалуйста, обратитесь к изображению ниже. Итак, если мы посмотрим на Потребителя, потребляющего из Topic-A/Partition-0 , то он сначала прочитает сообщение 0, затем 1, затем 2, затем 3, вплоть до сообщения 11. Если другой потребитель читает из двух разделов, например Раздел-1 и Раздел-2, будет читать оба раздела по порядку. Это может быть с ними одновременно, но внутри раздела данные будут считываться по порядку, но между разделами, у нас нет возможности сказать, какой из них будет считан первым или вторым, и поэтому нет упорядочивание по разделам в Apache Kafka .
Таким образом, наши потребители Kafka будут читать наши сообщения от Kafka, которые состоят из байтов, поэтому потребителю потребуется десериализатор , чтобы указать, как преобразовать эти байты обратно в некоторые объекты или данные, и они будут использоваться для ключа и значение сообщения. Итак, у нас есть наш ключ и наше значение, и они оба являются двоичными полями и байтами, поэтому мы будем использовать KeyDeserializer типа IntegerDeserializer , чтобы преобразовать это в int и получить обратно число 123 для ключевых объектов, а затем мы будем использовать StringDeserializer чтобы преобразовать байты в строку и прочитать значение объекта обратно в строку «hello world». Пожалуйста, обратитесь к изображению ниже.
Итак, как мы видим здесь, выбор правильного десериализатора очень важен, потому что, если вы не выберете правильный, вы можете не получить правильные данные в конце. Итак, некоторые общие десериализаторы приведены ниже.
- Строка (включая JSON, если ваши данные соседние)I
- Целое число и число с плавающей запятой
- Avro и Protobuf для расширенных типов данных
Пример потребителя Apache Kafka
В этом примере мы обсудим , как мы можем использовать сообщения из тем Kafka с помощью Spring Boot . Кратко говоря о Spring Boot, это одна из самых популярных и наиболее часто используемых сред языка программирования Java. Это платформа на основе микросервисов, и создание готового к работе приложения с использованием Spring Boot занимает очень меньше времени. Spring Boot упрощает создание автономных приложений на основе Spring производственного уровня, которые можно « просто запустить ». Итак, начнем с реализации.
Prerequisite: Make sure you have installed Apache Kafka in your local machine. Refer to this article How to Install and Run Apache Kafka on Windows?
Шаг 1: Перейдите по этой ссылке и создайте проект Spring Boot. Добавьте зависимость « Spring для Apache Kafka » в свой проект Spring Boot.
Шаг 2: Создайте файл конфигурации с именем KafkaConfig . Ниже приведен код файла KafkaConfig.java .
Java
// Java Program to Illustrate Kafka Configuration package com.amiya.kafka.apachekafkaconsumer.config; // Importing required classes import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; // Annotations @EnableKafka @Configuration // Class public class KafkaConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { // Creating a Map of string-object pairs Map<String, Object> config = new HashMap<>(); // Adding the Configuration config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092" ); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id" ); config.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); config.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); return new DefaultKafkaConsumerFactory<>(config); } // Creating a Listener public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory< String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } |
Шаг 3. Создайте потребительский файл с именем KafkaConsumer.
Java
// Java Program to Illustrate Kafka Consumer package com.amiya.kafka.apachekafkaconsumer.consumer; // Importing required classes import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component // Class public class KafkaConsumer { @KafkaListener (topics = "NewTopic" , groupId = "group_id" ) // Method public void consume(String message) { // Print statement System.out.println( "message = " + message); } } |
Шаг 4. Теперь нам нужно сделать следующие вещи, чтобы использовать сообщения из тем Kafka с помощью Spring Boot.
- Запустите сервер Apache Zookeeper.
- Запустите сервер Apache Kafka
- Отправляйте сообщения из Kafka Topics
Запустите сервер Apache Zookeeper с помощью этой команды.
C:kafka>.inwindowszookeeper-server-start.bat .configzookeeper.properties
Точно так же запустите сервер Apache Kafka с помощью этой команды
C:kafka>.inwindowskafka-server-start.bat .configserver.properties
Выполните следующую команду, чтобы отправить сообщения из тем Kafka.
C:kafka>.inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic
Шаг 5: Теперь запустите приложение весенней загрузки. Убедитесь, что вы изменили номер порта в файле application.properties.
server.port=8081
Давайте запустим загрузочное приложение Spring внутри файла ApacheKafkaConsumerApplication.
Вывод: в выводе вы можете увидеть, когда вы отправляете сообщение из Kafka Topics, оно отображается на консоли в режиме реального времени.