Передача данных JSON в тему Kafka с использованием Rest Proxy
В этом POC описывается процедура передачи данных формата JSON в тему Kafka с использованием прокси-сервера Kafka REST, который предоставляет интерфейс RESTful для кластера Kafka.
Предпосылки:
Прежде чем приступить к этой процедуре, убедитесь, что:
- Административный доступ к работающей виртуальной машине Kafka, и эта виртуальная машина должна иметь возможность подключения, как описано в предварительных требованиях для загрузки.
- Определите и запишите имя хоста и порт Zoo-Keeper.
- Определите и запишите имя хоста и порт брокера Kafka.
- Определите и запишите имя хоста и порт прокси-сервера Kafka Rest.
Примечание. Эта процедура предполагает, что вы установили дистрибутив Apache Kafka. Если вы используете другой дистрибутив Kafka, вам может потребоваться настроить некоторые команды в этой процедуре.
Здесь, в этом случае использования, мы настроили имена хостов и порты со следующими
- локальный хост для остальных прокси: 8082
- локальный хост зоопарка: 2182
- локальный хост бутстрап-сервера: 9095
Процедура передачи данных JSON в тему Kafka:
Шаг 1: Войдите на хост в виртуальной машине Kafka.
$ cd kafka_2.12-2.4.0 /*if this directory does not exit, Use ls command to view the folder and copy/paste the existing folder*/
Чтобы перечислить все темы, которые присутствуют в этих темах кафки, используйте приведенную ниже команду cmd
$ bin/kafka-topics.sh --list --zookeeper localhost:2182 /*To check/verify and to display all the topics*/
Шаг 2: Создайте тему Kafka. Здесь создайте тему с именем «topic-test-1» с одним разделом и только одной репликой:
Например:
$ bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic topic-test-1
$ bin/kafka-topics.sh --list --zookeeper localhost:2182 /*To verify or to list out the created topic*/
Шаг 3: Создайте файл JSON. Создайте файл с именем sample-json-data.json в редакторе по вашему выбору.
Например:
$ vi sample-json-data.json
затем вставьте текст в формате json и добавьте его в файл, а затем сохраните файл и выйдите
Например:
{ "first_name": "Tom", "last_name": "Cruze", "email": "cruze@gmail.com", "gender": "Male", "ip_address": "1.2.3.4" }
Шаг 4. Передача содержимого json-файла производителю консоли Kafka.
$ bin/kafka-console-producer.sh --broker-list localhost:9095 --topic topic-test-1 < sample-json-data.json
Шаг 5. Чтобы убедиться, что производитель консоли Kafka опубликовал сообщения в теме, запустив потребителя консоли Kafka.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning
Шаг 6. Потоковая передача содержимого другого файла JSON производителю консоли Kafka.
Например:
$ vi sample.json
затем вставьте текст в формате JSON и добавьте его в файл, а затем сохраните файл и выйдите
{ “cust_id”: 1313131, “month”: 12, “expenses”: 1313.13 }
{ “cust_id”: 3535353, “month”: 11, “expenses”: 761.35 }
{ “cust_id”: 7979797, “month”: 10, “expenses”: 4489.00 }
{ “cust_id”: 7979797, “month”: 11, “expenses”: 18.72 }
{ “cust_id”: 3535353, “month”: 10, “expenses”: 6001.94 }
{ “cust_id”: 7979797, “month”: 12, “expenses”: 173.18 }
{ “cust_id”: 1313131, “month”: 10, “expenses”: 492.83 }
{ “cust_id”: 3535353, “month”: 12, “expenses”: 81.12 }
{ “cust_id”: 1313131, “month”: 11, “expenses”: 368.27 }
Кафка REST-прокси:
Прокси-сервер Kafka REST предоставляет интерфейс RESTful для кластера Kafka. Это упрощает создание и использование сообщений, просмотр состояния кластера и выполнение административных действий без использования собственного протокола или клиентов Kafka.
Чтобы получить список тем, используйте curl
$ curl "http://localhost:8082/topics"
Чтобы получить информацию по одной теме
$ curl http://localhost:8082/topics/<menction topic name>
Например:
$ curl "http://localhost:8082/topics/topic-test-1"
Шаг 1: Создание сообщения с использованием JSON со значением в теме
Например, для создания сообщения с использованием JSON со значением '{"month": 12}' в тему тема-тест-1.
$ curl -X POST -H “Content-Type: application/vnd.kafka.json.v2+json”
-H “Accept: application/vnd.kafka.v2+json”
–data ‘{“records”:[{“value”:{“month”: 12}}]}’ “http://localhost:8082/topics/topic-test-1”
/*Expected output from preceding command*/
{
“offsets”:[{“partition”:0,”offset”:16,”error_code”:null,”error”:null}],”key_schema_id”:null,”value_schema_id”:null
}
Чтобы убедиться, что производитель консоли Kafka опубликовал сообщения в теме, запустив потребитель консоли Kafka
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning
Шаг 2. Создайте потребителя для данных JSON, начиная с начала темы.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data "{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}" http://localhost:8082/consumers/my_json_consumer /* Expected output from preceding command*/ { "instance_id":"my_consumer_instance", "base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance" OR "base_uri":"http://rest-proxy:8082/consumers/my_json_consumer/instances/my_consumer_instance" }
Шаг 3: Войдите и подпишитесь на тему.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data "{"topics":["topic-test-1"]}" http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription /* Expected output from preceding command*/ # No content in response
Шаг 4: Чтобы использовать некоторые данные, используя базовый URL-адрес в первом ответе.
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Дополнительные шаги:
Шаг 1: Наконец, закрыть потребителя с помощью DELETE, чтобы он покинул группу и очистил свои ресурсы.
$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance /* Expected output from preceding command*/ # No content in response
Шаг 2: проверьте экземпляр потребителя, используя следующую команду
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records /* Expected output from preceding command*/ { “error_code”: 40403, “message”: “Consumer instance not found.” }