Передача данных JSON в тему Kafka с использованием Rest Proxy

Опубликовано: 6 Октября, 2022

В этом POC описывается процедура передачи данных формата JSON в тему Kafka с использованием прокси-сервера Kafka REST, который предоставляет интерфейс RESTful для кластера Kafka.

Предпосылки:

Прежде чем приступить к этой процедуре, убедитесь, что:

  • Административный доступ к работающей виртуальной машине Kafka, и эта виртуальная машина должна иметь возможность подключения, как описано в предварительных требованиях для загрузки.
  • Определите и запишите имя хоста и порт Zoo-Keeper.
  • Определите и запишите имя хоста и порт брокера Kafka.
  • Определите и запишите имя хоста и порт прокси-сервера Kafka Rest.

Примечание. Эта процедура предполагает, что вы установили дистрибутив Apache Kafka. Если вы используете другой дистрибутив Kafka, вам может потребоваться настроить некоторые команды в этой процедуре.

Здесь, в этом случае использования, мы настроили имена хостов и порты со следующими

  • локальный хост для остальных прокси: 8082
  • локальный хост зоопарка: 2182
  • локальный хост бутстрап-сервера: 9095

Процедура передачи данных JSON в тему Kafka:

Шаг 1: Войдите на хост в виртуальной машине Kafka.

$ cd kafka_2.12-2.4.0  /*if this directory does not exit, Use ls command to view the folder and copy/paste the existing folder*/ 

Чтобы перечислить все темы, которые присутствуют в этих темах кафки, используйте приведенную ниже команду cmd

$ bin/kafka-topics.sh --list --zookeeper localhost:2182  /*To check/verify and to display all the topics*/  

Шаг 2: Создайте тему Kafka. Здесь создайте тему с именем «topic-test-1» с одним разделом и только одной репликой:

Например:

$ bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic topic-test-1

$ bin/kafka-topics.sh --list --zookeeper localhost:2182  /*To verify or to list out the created topic*/

Шаг 3: Создайте файл JSON. Создайте файл с именем sample-json-data.json в редакторе по вашему выбору.

Например:

$ vi sample-json-data.json

затем вставьте текст в формате json и добавьте его в файл, а затем сохраните файл и выйдите

Например:

{
 "first_name": "Tom",
 "last_name": "Cruze",
 "email": "cruze@gmail.com",
 "gender": "Male",
 "ip_address": "1.2.3.4"
}

Шаг 4. Передача содержимого json-файла производителю консоли Kafka.

$ bin/kafka-console-producer.sh --broker-list localhost:9095 --topic topic-test-1 < sample-json-data.json

Шаг 5. Чтобы убедиться, что производитель консоли Kafka опубликовал сообщения в теме, запустив потребителя консоли Kafka.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning

Шаг 6. Потоковая передача содержимого другого файла JSON производителю консоли Kafka.

Например:

$ vi sample.json

затем вставьте текст в формате JSON и добавьте его в файл, а затем сохраните файл и выйдите

{ “cust_id”: 1313131, “month”: 12, “expenses”: 1313.13 }

{ “cust_id”: 3535353, “month”: 11, “expenses”: 761.35 }

{ “cust_id”: 7979797, “month”: 10, “expenses”: 4489.00 }

{ “cust_id”: 7979797, “month”: 11, “expenses”: 18.72 }

{ “cust_id”: 3535353, “month”: 10, “expenses”: 6001.94 }

{ “cust_id”: 7979797, “month”: 12, “expenses”: 173.18 }

{ “cust_id”: 1313131, “month”: 10, “expenses”: 492.83 }

{ “cust_id”: 3535353, “month”: 12, “expenses”: 81.12 }

{ “cust_id”: 1313131, “month”: 11, “expenses”: 368.27 }

Кафка REST-прокси:

Прокси-сервер Kafka REST предоставляет интерфейс RESTful для кластера Kafka. Это упрощает создание и использование сообщений, просмотр состояния кластера и выполнение административных действий без использования собственного протокола или клиентов Kafka.

Чтобы получить список тем, используйте curl

$ curl "http://localhost:8082/topics"

Чтобы получить информацию по одной теме

$ curl http://localhost:8082/topics/<menction topic name>

Например:

$ curl "http://localhost:8082/topics/topic-test-1"

Шаг 1: Создание сообщения с использованием JSON со значением в теме

Например, для создания сообщения с использованием JSON со значением '{"month": 12}' в тему тема-тест-1.

$ curl -X POST -H “Content-Type: application/vnd.kafka.json.v2+json”

-H “Accept: application/vnd.kafka.v2+json”

–data ‘{“records”:[{“value”:{“month”: 12}}]}’ “http://localhost:8082/topics/topic-test-1”

/*Expected output from preceding command*/

{

“offsets”:[{“partition”:0,”offset”:16,”error_code”:null,”error”:null}],”key_schema_id”:null,”value_schema_id”:null

}

Чтобы убедиться, что производитель консоли Kafka опубликовал сообщения в теме, запустив потребитель консоли Kafka

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning

Шаг 2. Создайте потребителя для данных JSON, начиная с начала темы.

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" 
--data "{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}"   http://localhost:8082/consumers/my_json_consumer
/* Expected output from preceding command*/
{
"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
OR
"base_uri":"http://rest-proxy:8082/consumers/my_json_consumer/instances/my_consumer_instance"  
}  

Шаг 3: Войдите и подпишитесь на тему.

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data "{"topics":["topic-test-1"]}" 
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
/* Expected output from preceding command*/
# No content in response

Шаг 4: Чтобы использовать некоторые данные, используя базовый URL-адрес в первом ответе.

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" 
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Дополнительные шаги:

Шаг 1: Наконец, закрыть потребителя с помощью DELETE, чтобы он покинул группу и очистил свои ресурсы.

$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" 
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
/* Expected output from preceding command*/
# No content in response

Шаг 2: проверьте экземпляр потребителя, используя следующую команду

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" 
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
/* Expected output from preceding command*/
{ “error_code”: 40403, “message”: “Consumer instance not found.” }