Потоковая передача Hadoop с использованием Python - проблема подсчета слов

Опубликовано: 18 Февраля, 2022

Hadoop Streaming - это функция, которая поставляется с Hadoop и позволяет пользователям или разработчикам использовать различные языки для написания программ MapReduce, таких как Python, C ++, Ruby и т. Д. Он поддерживает все языки, которые могут читать из стандартного ввода и записывать в стандартный вывод. Мы будем реализовывать Python с Hadoop Streaming и посмотрим, как это работает. Мы реализуем проблему подсчета слов в Python, чтобы понять Hadoop Streaming. Мы будем создавать mapper.py и reducer.py для выполнения задач сопоставления и сокращения.

Давайте создадим один файл, содержащий несколько слов, которые мы можем сосчитать.

Шаг 1: Создайте файл с именем word_count_data.txt и добавьте в него данные.

 cd Documents / #, чтобы изменить каталог на / Documents
touch word_count_data.txt # touch используется для создания пустого файла    
nano word_count_data.txt # nano - редактор командной строки для редактирования файла    
cat word_count_data.txt # cat используется для просмотра содержимого файла

Шаг 2: Создайте файл mapper.py , реализующий логику сопоставителя. Он будет читать данные из STDIN, разбивать строки на слова и генерировать вывод каждого слова с его индивидуальным счетчиком.

cd Documents/                                   # to change the directory to /Documents
touch mapper.py                    # touch is used to create an empty file    
cat mapper.py                    # cat is used to see the content of the file

Copy the below code to the mapper.py file.

Python3

#!/usr/bin/env python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print "%s %s" % (word, 1)

Здесь в программе выше #! известен как shebang и используется для интерпретации сценария. Файл будет запущен с использованием указанной нами команды.

Давайте проверим наш mapper.py локально, работает он нормально или нет.

Синтаксис:

 cat <text_data_file> | python <mapper_code_python_file>

Команда (в моем случае)

 кот word_count_data.txt | python mapper.py

Результат работы картографа показан ниже.

Step 3: Create a reducer.py file that implements the reducer logic. It will read the output of mapper.py from STDIN(standard input) and will aggregate the occurrence of each word and will write the final output to STDOUT. 

cd Documents/                                   # to change the directory to /Documents
touch reducer.py                     # touch is used to create an empty file 

Python3

#!/usr/bin/env python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # slpiting the data on the basis of tab we have provided in mapper.py
    word, count = line.split(" ", 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print "%s %s" % (current_word, current_count)
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print "%s %s" % (current_word, current_count)

Теперь давайте проверим наш код редуктора reducer.py с помощью mapper.py, правильно он работает или нет, с помощью следующей команды.

 кот word_count_data.txt | python mapper.py | sort -k1,1 | python reducer.py

Мы видим, что наш редуктор также отлично работает в нашей локальной системе.

Шаг 4: Теперь давайте запустим все наши демоны Hadoop с помощью следующей команды.

 start-dfs.sh

start-yarn.sh

Теперь создайте каталог word_count_in_python в нашей HDFS в корневом каталоге, в котором будет храниться наш файл word_count_data.txt, с помощью следующей команды.

 hdfs dfs -mkdir / word_count_in_python

Скопируйте word_count_data.txt в эту папку в нашей HDFS с помощью команды copyFromLocal.

Синтаксис для копирования файла из вашей локальной файловой системы в HDFS приведен ниже:

 hdfs dfs -copyFromLocal / путь 1 / путь 2 .... / путь n / пункт назначения

Актуальная команда (в моем случае)

 hdfs dfs -copyFromLocal /home/dikshant/Documents/word_count_data.txt / word_count_in_python

Теперь наш файл данных был успешно отправлен в HDFS. мы можем проверить, отправляет он или нет, используя команду ниже или вручную посетив нашу HDFS.

 hdfs dfs -ls / # вывести список содержимого корневого каталога

hdfs dfs -ls / word_count_in_python # перечислить содержимое каталога / word_count_in_python

Давайте дадим исполняемое разрешение нашим mapper.py и reducer.py с помощью команды ниже.

 cd документы /

chmod 777 mapper.py reducer.py # изменение разрешения на чтение, запись, выполнение для пользователя, группы и других

На изображении ниже мы видим, что мы изменили права доступа к файлу.

Шаг 5: Теперь загрузите последнюю версию jar-файла с потоковой передачей hadoop по этой ссылке. Затем поместите этот файл jar с потоковой передачей Hadoop в место, откуда вы можете легко получить к нему доступ. В моем случае я помещаю его в папку / Documents, где есть файлы mapper.py и reducer.py .

Теперь давайте запустим наши файлы python с помощью утилиты потоковой передачи Hadoop, как показано ниже.

 банка hadoop /home/dikshant/Documents/hadoop-streaming-2.7.3.jar 

> -ввод /word_count_in_python/word_count_data.txt 

> -output / word_count_in_python / output 

> -mapper /home/dikshant/Documents/mapper.py 

> -reducer /home/dikshant/Documents/reducer.py

В приведенной выше команде in -output , мы укажем место в HDFS, где мы хотим сохранить наш вывод. Итак, давайте проверим наш вывод в выходном файле по адресу / word_count_in_python / output / part-00000 в моем случае. Мы можем проверить результаты, вручную просмотрев местоположение в HDFS или с помощью команды cat, как показано ниже.

 hdfs dfs -cat / word_count_in_python / output / part-00000

Основные параметры, которые мы можем использовать с Hadoop Streaming

Вариант

Описание

-маппер Команда, которую нужно запустить в качестве картографа
-редуктор Команда, которую нужно запустить как редуктор
-Вход Путь ввода DFS для шага Map
-выход Каталог вывода DFS для шага Reduce