Распределенный кеш в Hadoop MapReduce

Опубликовано: 18 Февраля, 2022

Фреймворк Hadoop MapReduce предоставляет возможность кэшировать небольшие или умеренные файлы, доступные только для чтения, такие как текстовые файлы, zip-файлы, jar-файлы и т. Д., И транслировать их на все Datanodes (рабочие узлы), на которых выполняется задание MapReduce. Каждый Datanode получает копию файла (локальную копию), которая отправляется через распределенный кэш . По завершении работы эти файлы удаляются из узлов данных.

Зачем кешировать файл?

Есть некоторые файлы, которые требуются заданиям MapReduce, поэтому вместо того, чтобы каждый раз читать из HDFS (увеличивая время поиска и задержку) , скажем, 100 раз (если запущено 100 картографов), мы просто отправляем копию файла на все Datanode один раз. .

Давайте посмотрим на пример, в котором мы считаем слова из lyrics.txt, кроме слов, присутствующих в stopWords.txt . Вы можете найти эти файлы здесь.

Предпосылки:

1. Скопируйте оба файла из локальной файловой системы в HDFS.

bin / hdfs dfs -put ../Desktop/lyrics.txt / geeksInput

// этот файл будет кеширован
bin / hdfs dfs -put ../Desktop/stopWords.txt / cached_Geeks

2. Получите адрес сервера NameNode. Поскольку доступ к файлу должен осуществляться через URI (унифицированный идентификатор ресурса), нам нужен этот адрес. Его можно найти в core-site.xml

Hadoop_Home_dir / etc / hadoop / core-site.xml

На моем ПК это hdfs: // localhost: 9000, он может отличаться на вашем ПК.

Mapper Code:

package word_count_DC;
  
import java.io.*;
import java.util.*;
import java.net.URI;
  
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
  
public class Cached_Word_Count extends Mapper<LongWritable, 
                                Text, Text, LongWritable> {
  
    ArrayList<String> stopWords = null;
  
    public void setup(Context context) throws IOException, 
                                     InterruptedException
    {
        stopWords = new ArrayList<String>();
  
        URI[] cacheFiles = context.getCacheFiles();
  
        if (cacheFiles != null && cacheFiles.length > 0
        {
            try {
  
                String line = "";
  
               // Create a FileSystem object and pass the 
               // configuration object in it. The FileSystem
               // is an abstract base class for a fairly generic
               // filesystem. All user code that may potentially 
               // use the Hadoop Distributed File System should
               // be written to use a FileSystem object.
                FileSystem fs = FileSystem.get(context.getConfiguration());
                Path getFilePath = new Path(cacheFiles[0].toString());
  
                // We open the file using FileSystem object, 
                // convert the input byte stream to character
                // streams using InputStreamReader and wrap it 
                // in BufferedReader to make it more efficient
                BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(getFilePath)));
  
                while ((line = reader.readLine()) != null
                {
                    String[] words = line.split(" ");
  
                    for (int i = 0; i < words.length; i++) 
                    {
                        // add the words to ArrayList
                        stopWords.add(words[i]); 
                    }
                }
            }
  
            catch (Exception e)
            {
                System.out.println("Unable to read the File");
                System.exit(1);
            }
        }
    }
  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String words[] = value.toString().split(" ");
  
        for (int i = 0; i < words.length; i++) 
        {
  
            // removing all special symbols 
            // and converting it to lowerCase
            String temp = words[i].replaceAll("[?, "()]", "").toLowerCase();
  
            // if not present in ArrayList we write
            if (!stopWords.contains(temp)) 
            {
                context.write(new Text(temp), new LongWritable(1));
            }
        }
    }
}

Reducer Code:

package word_count_DC;
  
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
  
public class Cached_Reducer extends Reducer<Text, 
              LongWritable, Text, LongWritable> {
  
    public void reduce(Text key, Iterable<LongWritable> values,
        Context context) throws IOException, InterruptedException
    {
        long sum = 0;
  
        for (LongWritable val : values)
        {
            sum += val.get();
        }
  
        context.write(key, new LongWritable(sum));
    }
}

Driver Code:

package word_count_DC;
  
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
  
public class Driver {
  
    public static void main(String[] args) throws IOException, 
                 InterruptedException, ClassNotFoundException
    {
  
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  
        if (otherArgs.length != 2
        {
            System.err.println("Error: Give only two paths for <input> <output>");
            System.exit(1);
        }
  
        Job job = Job.getInstance(conf, "Distributed Cache");
  
        job.setJarByClass(Driver.class);
        job.setMapperClass(Cached_Word_Count.class);
        job.setReducerClass(Cached_Reducer.class);
  
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
  
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
  
        try {
  
            // the complete URI(Uniform Resource 
            // Identifier) file path in Hdfs
            job.addCacheFile(new URI("hdfs://localhost:9000/cached_Geeks/stopWords.txt"));
        }
        catch (Exception e) {
            System.out.println("File Not Added");
            System.exit(1);
        }
  
        FileInputFormat.addInputPath(job, new Path(args[0]));
  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
        // throws ClassNotFoundException, so handle it
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    }
}

Как выполнить код?

  1. Экспортируйте проект в виде файла jar и скопируйте на рабочий стол Ubuntu как распределенныйExample.jar.
  2. Запустите свои службы Hadoop. Зайдите внутрь hadoop_home_dir и введите тип терминала
    sbin / start-all.sh
    
  3. Запустите файл jar

    bin/yarn jar jar_file_path packageName.Driver_Class_Name inputFilePath outputFilePath

    bin/yarn jar ../Desktop/distributedExample.jar word_count_DC.Driver /geeksInput /geeksOutput

    Выход:

    // напечатает слова, начинающиеся с t
    
    bin / hdfs dfs -cat / geeksOutput / part * | grep ^ t
    

    В не выход, мы можем наблюдать , нет того или слов , которые мы хотели игнорировать.