Апач Кафка Продюсер

Опубликовано: 9 Января, 2023

Kafka Producers собираются записывать данные в темы, а темы состоят из разделов. Теперь производители в Kafka будут автоматически знать, к какому брокеру и разделу писать, на основе вашего сообщения, и в случае сбоя брокера Kafka в вашем кластере производители автоматически восстановятся после него, что делает Kafka устойчивым и что делает Kafka таким хорошим и используемым Cегодня. Итак, если мы посмотрим на диаграмму, чтобы иметь данные в наших разделах тем, у нас будет производитель слева, отправляющий данные в каждый из разделов наших тем.

Итак, откуда производитель знает, как отправить данные в раздел темы? Для этого мы можем использовать Message Keys . Таким образом, наряду со значением сообщения мы можем выбрать отправку ключа сообщения, и этот ключ может быть любым, что вы хотите, это может быть строка, это может быть любое число, и получается, что если вы не отправите ключ, ключ имеет значение null, тогда данные будут отправлены в циклическом режиме, чтобы сделать это очень простым. Это означает, что ваше первое сообщение будет отправлено в раздел 0, затем ваше второе сообщение будет отправлено в раздел 1, затем в раздел 2 и так далее. Вот почему он называется циклическим перебором, но если вы отправите ключ вместе со своим сообщением, все сообщения с одним и тем же ключом всегда будут отправляться в один и тот же раздел . Так что это очень важное свойство Kafka, потому что это означает, что вам нужен порядок для определенного поля.
Например, если у вас есть автомобили, и вы хотите получить все GPS-позиции для этого конкретного автомобиля, вам необходимо убедиться, что ваш ключ сообщения установлен в качестве уникального идентификатора для вашего автомобиля, т.е. carID , и так в нашем примере автомобильного GPS. которые мы обсуждали в этой статье « Темы, разделы и смещения в Apache Kafka», нам нужно выбрать ключ сообщения, равный carID , чтобы у нас были все позиции автомобиля для этого конкретного автомобиля в порядке как часть одного и того же раздел.

Note: Please refer to the Topic Example that has been discussed in this article, Topics, Partitions, and Offsets in Apache Kafka, so that you can understand which example we are discussing here. 

Итак, второй пример снова, если у нас производитель отправляет данные в 2 раздела, а ключ - carID, тогда carID_123 всегда будет идти в разделе 0, carID_234 также всегда будет идти в разделе 0, а carID_345 и carID_456 всегда будут идти в разделе 1. Идея здесь снова в том, что вы никогда не найдете данные carID_123 в разделе 1 из-за этого ключевого свойства, которое мы только что упомянули.

Пример производителя Apache Kafka

В этом примере мы обсудим , как мы можем создавать сообщения в темах Kafka с помощью Spring Boot . Кратко говоря о Spring Boot, это одна из самых популярных и наиболее часто используемых сред языка программирования Java. Это платформа на основе микросервисов, и создание готового к работе приложения с использованием Spring Boot занимает очень меньше времени. Spring Boot упрощает создание автономных приложений на основе Spring производственного уровня, которые можно « просто запустить ». Итак, начнем с реализации.

Prerequisite: Make sure you have installed Apache Kafka in your local machine. Refer to this article How to Install and Run Apache Kafka on Windows?

Шаг 1: Перейдите по этой ссылке https://start.spring.io/ и создайте проект Spring Boot. Добавьте следующие зависимости в ваш проект Spring Boot.

  • Весенняя сеть
  • Весна для Apache Kafka

Шаг 2: Теперь давайте создадим класс контроллера с именем DemoController .

Java




// Java Program to Illustrate Controller Class
 
package com.amiya.kafka.apachekafkaproducer;
 
// Importing required classes
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
 
// Annotation
@RestController
 
// Class
public class DemoController {
 
    // Autowiring Kafka Template
    @Autowired KafkaTemplate<String, String> kafkaTemplate;
 
    private static final String TOPIC = "NewTopic";
 
    // Publish messages using the GetMapping
    @GetMapping("/publish/{message}")
    public String publishMessage(@PathVariable("message")
                                 final String message)
    {
 
        // Sending the message
        kafkaTemplate.send(TOPIC, message);
 
        return "Published Successfully";
    }
}

Шаг 3. Теперь нам нужно сделать следующие вещи, чтобы публиковать сообщения в темах Kafka с помощью Spring Boot.

  1. Запустите сервер Apache Zookeeper.
  2. Запустите сервер Apache Kafka
  3. Слушайте сообщения, поступающие из новых тем

Запустите сервер Apache Zookeeper с помощью этой команды.

C:kafka>.inwindowszookeeper-server-start.bat .configzookeeper.properties

Точно так же запустите сервер Apache Kafka с помощью этой команды

C:kafka>.inwindowskafka-server-start.bat .configserver.properties

Выполните следующую команду, чтобы прослушать сообщения, поступающие из новых тем.

C:kafka>.inwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic NewTopic --from-beginning

Шаг 4: Теперь запустите приложение весенней загрузки. Убедитесь, что вы изменили номер порта в файле application.properties.

server.port=8081

Давайте запустим загрузочное приложение Spring внутри файла ApacheKafkaProducerApplication.

Шаг 5: Просмотрите этот URL-адрес и передайте свое сообщение после файла /publish/.

http://localhost:8081/publish/GeeksforGeeks

Поскольку мы передали здесь «GeeksforGeeks», вы можете видеть, что в ответ мы получили «Успешно опубликовано». И в режиме реального времени вы также можете увидеть, что сообщение было опубликовано на сервере. Потоковое сообщение находится в режиме реального времени.

Точно так же, если мы передали здесь «Hello World», вы можете увидеть, что в ответ мы получили «Успешно опубликовано». И в режиме реального времени вы также можете увидеть, что сообщение было опубликовано на сервере.