Распределенный кеш в Hadoop MapReduce
Фреймворк Hadoop MapReduce предоставляет возможность кэшировать небольшие или умеренные файлы, доступные только для чтения, такие как текстовые файлы, zip-файлы, jar-файлы и т. Д., И транслировать их на все Datanodes (рабочие узлы), на которых выполняется задание MapReduce. Каждый Datanode получает копию файла (локальную копию), которая отправляется через распределенный кэш . По завершении работы эти файлы удаляются из узлов данных.
Зачем кешировать файл?
Есть некоторые файлы, которые требуются заданиям MapReduce, поэтому вместо того, чтобы каждый раз читать из HDFS (увеличивая время поиска и задержку) , скажем, 100 раз (если запущено 100 картографов), мы просто отправляем копию файла на все Datanode один раз. .
Давайте посмотрим на пример, в котором мы считаем слова из lyrics.txt, кроме слов, присутствующих в stopWords.txt . Вы можете найти эти файлы здесь.
Предпосылки:
1. Скопируйте оба файла из локальной файловой системы в HDFS.
bin / hdfs dfs -put ../Desktop/lyrics.txt / geeksInput // этот файл будет кеширован bin / hdfs dfs -put ../Desktop/stopWords.txt / cached_Geeks
2. Получите адрес сервера NameNode. Поскольку доступ к файлу должен осуществляться через URI (унифицированный идентификатор ресурса), нам нужен этот адрес. Его можно найти в core-site.xml
Hadoop_Home_dir / etc / hadoop / core-site.xml
На моем ПК это hdfs: // localhost: 9000, он может отличаться на вашем ПК.
Mapper Code:
package word_count_DC; import java.io.*; import java.util.*; import java.net.URI; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class Cached_Word_Count extends Mapper<LongWritable, Text, Text, LongWritable> { ArrayList<String> stopWords = null ; public void setup(Context context) throws IOException, InterruptedException { stopWords = new ArrayList<String>(); URI[] cacheFiles = context.getCacheFiles(); if (cacheFiles != null && cacheFiles.length > 0 ) { try { String line = "" ; // Create a FileSystem object and pass the // configuration object in it. The FileSystem // is an abstract base class for a fairly generic // filesystem. All user code that may potentially // use the Hadoop Distributed File System should // be written to use a FileSystem object. FileSystem fs = FileSystem.get(context.getConfiguration()); Path getFilePath = new Path(cacheFiles[ 0 ].toString()); // We open the file using FileSystem object, // convert the input byte stream to character // streams using InputStreamReader and wrap it // in BufferedReader to make it more efficient BufferedReader reader = new BufferedReader( new InputStreamReader(fs.open(getFilePath))); while ((line = reader.readLine()) != null ) { String[] words = line.split( " " ); for ( int i = 0 ; i < words.length; i++) { // add the words to ArrayList stopWords.add(words[i]); } } } catch (Exception e) { System.out.println( "Unable to read the File" ); System.exit( 1 ); } } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String words[] = value.toString().split( " " ); for ( int i = 0 ; i < words.length; i++) { // removing all special symbols // and converting it to lowerCase String temp = words[i].replaceAll( "[?, "()]" , "" ).toLowerCase(); // if not present in ArrayList we write if (!stopWords.contains(temp)) { context.write( new Text(temp), new LongWritable( 1 )); } } } } |
Reducer Code:
package word_count_DC; import java.io.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; public class Cached_Reducer extends Reducer<Text, LongWritable, Text, LongWritable> { public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0 ; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } |
Driver Code:
package word_count_DC; import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Driver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2 ) { System.err.println( "Error: Give only two paths for <input> <output>" ); System.exit( 1 ); } Job job = Job.getInstance(conf, "Distributed Cache" ); job.setJarByClass(Driver. class ); job.setMapperClass(Cached_Word_Count. class ); job.setReducerClass(Cached_Reducer. class ); job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(LongWritable. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(LongWritable. class ); try { // the complete URI(Uniform Resource // Identifier) file path in Hdfs } catch (Exception e) { System.out.println( "File Not Added" ); System.exit( 1 ); } FileInputFormat.addInputPath(job, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(args[ 1 ])); // throws ClassNotFoundException, so handle it System.exit(job.waitForCompletion( true ) ? 0 : 1 ); } } |
Как выполнить код?
- Экспортируйте проект в виде файла jar и скопируйте на рабочий стол Ubuntu как распределенныйExample.jar.
- Запустите свои службы Hadoop. Зайдите внутрь hadoop_home_dir и введите тип терминала
sbin / start-all.sh
- Запустите файл jar
bin/yarn jar jar_file_path packageName.Driver_Class_Name inputFilePath outputFilePath
bin/yarn jar ../Desktop/distributedExample.jar word_count_DC.Driver /geeksInput /geeksOutput
Выход:
// напечатает слова, начинающиеся с t bin / hdfs dfs -cat / geeksOutput / part * | grep ^ t
В не выход, мы можем наблюдать , нет того или слов , которые мы хотели игнорировать.