Распределенный кеш в Hadoop MapReduce
Фреймворк Hadoop MapReduce предоставляет возможность кэшировать небольшие или умеренные файлы, доступные только для чтения, такие как текстовые файлы, zip-файлы, jar-файлы и т. Д., И транслировать их на все Datanodes (рабочие узлы), на которых выполняется задание MapReduce. Каждый Datanode получает копию файла (локальную копию), которая отправляется через распределенный кэш . По завершении работы эти файлы удаляются из узлов данных.
Зачем кешировать файл?
Есть некоторые файлы, которые требуются заданиям MapReduce, поэтому вместо того, чтобы каждый раз читать из HDFS (увеличивая время поиска и задержку) , скажем, 100 раз (если запущено 100 картографов), мы просто отправляем копию файла на все Datanode один раз. .
Давайте посмотрим на пример, в котором мы считаем слова из lyrics.txt, кроме слов, присутствующих в stopWords.txt . Вы можете найти эти файлы здесь.
Предпосылки:
1. Скопируйте оба файла из локальной файловой системы в HDFS.
bin / hdfs dfs -put ../Desktop/lyrics.txt / geeksInput // этот файл будет кеширован bin / hdfs dfs -put ../Desktop/stopWords.txt / cached_Geeks
2. Получите адрес сервера NameNode. Поскольку доступ к файлу должен осуществляться через URI (унифицированный идентификатор ресурса), нам нужен этот адрес. Его можно найти в core-site.xml
Hadoop_Home_dir / etc / hadoop / core-site.xml

На моем ПК это hdfs: // localhost: 9000, он может отличаться на вашем ПК.
Mapper Code:
package word_count_DC;  import java.io.*;import java.util.*;import java.net.URI;  import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;  public class Cached_Word_Count extends Mapper<LongWritable,                                 Text, Text, LongWritable> {      ArrayList<String> stopWords = null;      public void setup(Context context) throws IOException,                                      InterruptedException    {        stopWords = new ArrayList<String>();          URI[] cacheFiles = context.getCacheFiles();          if (cacheFiles != null && cacheFiles.length > 0)         {            try {                  String line = "";                 // Create a FileSystem object and pass the                // configuration object in it. The FileSystem               // is an abstract base class for a fairly generic               // filesystem. All user code that may potentially                // use the Hadoop Distributed File System should               // be written to use a FileSystem object.                FileSystem fs = FileSystem.get(context.getConfiguration());                Path getFilePath = new Path(cacheFiles[0].toString());                  // We open the file using FileSystem object,                 // convert the input byte stream to character                // streams using InputStreamReader and wrap it                 // in BufferedReader to make it more efficient                BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(getFilePath)));                  while ((line = reader.readLine()) != null)                 {                    String[] words = line.split(" ");                      for (int i = 0; i < words.length; i++)                     {                        // add the words to ArrayList                        stopWords.add(words[i]);                     }                }            }              catch (Exception e)            {                System.out.println("Unable to read the File");                System.exit(1);            }        }    }      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException    {        String words[] = value.toString().split(" ");          for (int i = 0; i < words.length; i++)         {              // removing all special symbols             // and converting it to lowerCase            String temp = words[i].replaceAll("[?, "()]", "").toLowerCase();              // if not present in ArrayList we write            if (!stopWords.contains(temp))             {                context.write(new Text(temp), new LongWritable(1));            }        }    }} | 
Reducer Code:
package word_count_DC;  import java.io.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Reducer;  public class Cached_Reducer extends Reducer<Text,               LongWritable, Text, LongWritable> {      public void reduce(Text key, Iterable<LongWritable> values,        Context context) throws IOException, InterruptedException    {        long sum = 0;          for (LongWritable val : values)        {            sum += val.get();        }          context.write(key, new LongWritable(sum));    }} | 
Driver Code:
package word_count_DC;  import java.io.*;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;  public class Driver {      public static void main(String[] args) throws IOException,                  InterruptedException, ClassNotFoundException    {          Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();          if (otherArgs.length != 2)         {            System.err.println("Error: Give only two paths for <input> <output>");            System.exit(1);        }          Job job = Job.getInstance(conf, "Distributed Cache");          job.setJarByClass(Driver.class);        job.setMapperClass(Cached_Word_Count.class);        job.setReducerClass(Cached_Reducer.class);          job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);          job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);          try {              // the complete URI(Uniform Resource             // Identifier) file path in Hdfs        }        catch (Exception e) {            System.out.println("File Not Added");            System.exit(1);        }          FileInputFormat.addInputPath(job, new Path(args[0]));          FileOutputFormat.setOutputPath(job, new Path(args[1]));          // throws ClassNotFoundException, so handle it        System.exit(job.waitForCompletion(true) ? 0 : 1);     }} | 
Как выполнить код?
- Экспортируйте проект в виде файла jar и скопируйте на рабочий стол Ubuntu как распределенныйExample.jar.
 -  Запустите свои службы Hadoop. Зайдите внутрь hadoop_home_dir и введите тип терминала
sbin / start-all.sh
 - Запустите файл jar
bin/yarn jar jar_file_path packageName.Driver_Class_Name inputFilePath outputFilePath
bin/yarn jar ../Desktop/distributedExample.jar word_count_DC.Driver /geeksInput /geeksOutput

Выход:
// напечатает слова, начинающиеся с t bin / hdfs dfs -cat / geeksOutput / part * | grep ^ t

В не выход, мы можем наблюдать , нет того или слов , которые мы хотели игнорировать.