Как найти топ-N записей с помощью MapReduce
Поиск 10 или 20 лучших записей из большого набора данных - это сердце многих систем рекомендаций, а также важный атрибут для анализа данных. Здесь мы обсудим два метода поиска топ-N записей следующим образом.
Метод 1. Сначала давайте определим топ-10 самых просматриваемых фильмов, чтобы понять методы, а затем обобщим его для n записей.
Формат данных:
movie_name и no_of_views (разделены табуляцией)
Используемый подход: Использование TreeMap. Здесь идея состоит в том, чтобы использовать Mappers для поиска локальных топ-10 записей, поскольку может быть много Mappers, работающих параллельно с разными блоками данных файла. И затем все эти 10 лучших локальных записей будут агрегированы в Reducer, где мы найдем 10 лучших глобальных записей для файла.
Пример: Предположим, что файл (30 ТБ) разделен на 3 блока по 10 ТБ каждый, и каждый блок обрабатывается Mapper параллельно, поэтому мы находим 10 лучших записей (локальных) для этого блока. Затем эти данные перемещаются в редуктор, где мы находим актуальные 10 лучших записей из файла movie.txt .
Файл Movie.txt: вы можете просмотреть весь файл, щелкнув здесь
Mapper code:
import java.io.*; import java.util.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; public class top_10_Movies_Mapper extends Mapper<Object, Text, Text, LongWritable> { private TreeMap<Long, String> tmap; @Override public void setup(Context context) throws IOException, InterruptedException { tmap = new TreeMap<Long, String>(); } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // input data format => movie_name // no_of_views (tab seperated) // we split the input data String[] tokens = value.toString().split( " " ); String movie_name = tokens[ 0 ]; long no_of_views = Long.parseLong(tokens[ 1 ]); // insert data into treeMap, // we want top 10 viewed movies // so we pass no_of_views as key tmap.put(no_of_views, movie_name); // we remove the first key-value // if it"s size increases 10 if (tmap.size() > 10 ) { tmap.remove(tmap.firstKey()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<Long, String> entry : tmap.entrySet()) { long count = entry.getKey(); String name = entry.getValue(); context.write( new Text(name), new LongWritable(count)); } } } |
Объяснение: здесь важно отметить, что мы используем « context.write () » в методе cleanup (), который запускается только один раз в конце жизненного цикла Mapper. Mapper обрабатывает одну пару "ключ-значение" за раз и записывает их как промежуточные выходные данные на локальный диск. Но мы должны обработать весь блок (все пары ключ-значение), чтобы найти top10, перед записью вывода, поэтому мы используем context.write () в cleanup ().
Reducer code:
import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class top_10_Movies_Reducer extends Reducer<Text, LongWritable, LongWritable, Text> { private TreeMap<Long, String> tmap2; @Override public void setup(Context context) throws IOException, InterruptedException { tmap2 = new TreeMap<Long, String>(); } @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // input data from mapper // key values // movie_name [ count ] String name = key.toString(); long count = 0 ; for (LongWritable val : values) { count = val.get(); } // insert data into treeMap, // we want top 10 viewed movies // so we pass count as key tmap2.put(count, name); // we remove the first key-value // if it"s size increases 10 if (tmap2.size() > 10 ) { tmap2.remove(tmap2.firstKey()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<Long, String> entry : tmap2.entrySet()) { long count = entry.getKey(); String name = entry.getValue(); context.write( new LongWritable(count), new Text(name)); } } } |
Объяснение: Та же логика, что и у mapper. Reducer обрабатывает одну пару ключ-значение за раз и записывает их в качестве окончательного вывода в HDFS. Но мы должны обработать все пары ключ-значение, чтобы найти top10, перед записью вывода, поэтому мы используем cleanup () .
Код драйвера:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // if less than two paths // provided will show error if (otherArgs.length < 2 ) { System.err.println( "Error: please provide two paths" ); System.exit( 2 ); } Job job = Job.getInstance(conf, "top 10" ); job.setJarByClass(Driver. class ); job.setMapperClass(top_10_Movies_Mapper. class ); job.setReducerClass(top_10_Movies_Reducer. class ); job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(LongWritable. class ); job.setOutputKeyClass(LongWritable. class ); job.setOutputValueClass(Text. class ); FileInputFormat.addInputPath(job, new Path(otherArgs[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 1 ])); System.exit(job.waitForCompletion( true ) ? 0 : 1 ); } } |
Running the jar file:
- We export all the classes as jar files.
- We move our file movie.txt from local file system to /geeksInput in HDFS.
bin/hdfs dfs -put ../Desktop/movie.txt /geeksInput
- We now run the yarn services to run the jar file.
bin/yarn jar jar_file_location package_Name.Driver_classname input_path output_path
Code running:
Output: In ascending order
Method 2: This method is based on the property that output from Mapper is sorted based on key before going to the reducer. Let’s print in descending order this time. Now to do so we just multiply the key with -1 in mapper, so that after sorting higher numbers appears on top (magnitude wise). And now we just print the 10 records removing the -ve sign from keys.
Example: At reducer
Keys After sorting: 23 25 28 ..
If key multiplied with -1
Keys After sorting: -28 -25 -23 ..
Mapper Code:
import
java.io.*;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.mapreduce.Mapper;
public
class
top_10_Movies2_Mapper
extends
Mapper<Object,
Text, LongWritable, Text> {
// data format => movie_name
// no_of_views (tab seperated)
@Override
public
void
map(Object key, Text value,
Context context)
throws
IOException,
InterruptedException
{
String[] tokens = value.toString().split(
" "
);
String movie_name = tokens[
0
];
long
no_of_views = Long.parseLong(tokens[
1
]);
no_of_views = (-
1
) * no_of_views;
context.write(
new
LongWritable(no_of_views),
new
Text(movie_name));
}
}
Reducer Code:
import
java.io.*;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.mapreduce.Reducer;
public
class
top_10_Movies2_Reducer
extends
Reducer<LongWritable,
Text, LongWritable, Text> {
static
int
count;
@Override
public
void
setup(Context context)
throws
IOException,
InterruptedException
{
count =
0
;
}
@Override
public
void
reduce(LongWritable key, Iterable<Text> values,
Context context)
throws
IOException, InterruptedException
{
// key values
//-ve of no_of_views [ movie_name ..]
long
no_of_views = (-
1
) * key.get();
String movie_name =
null
;
for
(Text val : values)
{
movie_name = val.toString();
}
// we just write 10 records as output
if
(count <
10
)
{
context.write(
new
LongWritable(no_of_views),
new
Text(movie_name));
count++;
}
}
}
Explanation: Here, setup() method is the method which runs only once at the beginning
in the life time of a Reducer/Mapper. Since we want to print only 10 records we define the count variable in setup() method.Driver Code:
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public
class
Driver {
public
static
void
main(String[] args)
throws
Exception
{
Configuration conf =
new
Configuration();
String[] otherArgs =
new
GenericOptionsParser(conf
, args).getRemainingArgs();
// if less than two paths
// provided will show error
if
(otherArgs.length <
2
)
{
System.err.println(
"Error: please provide two paths"
);
System.exit(
2
);
}
Job job = Job.getInstance(conf,
"top_10 program_2"
);
job.setJarByClass(Driver.
class
);
job.setMapperClass(top_10_Movies2_Mapper.
class
);
job.setReducerClass(top_10_Movies2_Reducer.
class
);
job.setMapOutputKeyClass(LongWritable.
class
);
job.setMapOutputValueClass(Text.
class
);
job.setOutputKeyClass(LongWritable.
class
);
job.setOutputValueClass(Text.
class
);
FileInputFormat.addInputPath(job,
new
Path(otherArgs[
0
]));
FileOutputFormat.setOutputPath(job,
new
Path(otherArgs[
1
]));
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
}
}
Running the jar file: We now run the yarn services to run the jar file
Code running:
Output:
Note: One important thing to observe is that though Method-2 is easy to implement but it is not very efficient compared to Method-1 as we are passing all key-value pairs to reducer i.e. there is a lot of data movement which may lead to bottleneck situations. But in Method-1 we are only passing 10 key-value pairs to the reducer.
Generalizing it for ‘n’ records: Lets modify our second program for some ‘n’ records whose value we may pass at runtime. First some points to observe:
- We make our custom parameter using set() method
configuration_object.set(String name, String value)
- This value can be accessed in any Mapper/Reducer by using get() method
Configuration conf = context.getConfiguration(); // we will store value in String variable String value = conf.get(String name);
Mapper code: The Mapper code will remain same as we are not using the value there.
Reducer code: Here we make some changes in the setup() method.
import
java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.conf.Configuration;
public
class
top_n_Reducer
extends
Reducer<LongWritable,
Text, LongWritable, Text> {
static
int
count;
@Override
public
void
setup(Context context)
throws
IOException,
InterruptedException
{
Configuration conf = context.getConfiguration();
// we will use the value passed in myValue at runtime
String param = conf.get(
"myValue"
);
// converting the String value to integer
count = Integer.parseInt(param);
}
@Override
public
void
reduce(LongWritable key, Iterable<Text> values,
Context context)
throws
IOException, InterruptedException
{
long
no_of_views = (-
1
) * key.get();
String movie_name =
null
;
for
(Text val : values) {
movie_name = val.toString();
}
/
- We make our custom parameter using set() method