Программа MapReduce - Анализ погодных данных для анализа жарких и холодных дней
Здесь мы напишем программу Map-Reduce для анализа наборов данных о погоде, чтобы понять ее программную модель обработки данных. Датчики погоды собирают информацию о погоде по всему миру в большом объеме данных журнала. Эти данные о погоде частично структурированы и ориентированы на записи.
Эти данные хранятся в строчно-ориентированном формате ASCII, где каждая строка представляет собой отдельную запись. В каждой строке есть много полей, таких как долгота, широта, максимальная и минимальная дневная температура, среднесуточная температура и т. Д. Для простоты мы сосредоточимся на основном элементе, то есть температуре. Мы будем использовать данные Национальных центров экологической информации (NCEI). Он содержит огромное количество исторических данных о погоде, которые мы можем использовать для анализа данных.
Постановка задачи:
Анализ погодных данных Фэрбенкса, Аляска, чтобы найти холодные и жаркие дни с помощью MapReduce Hadoop.
Шаг 1:
Мы можем скачать набор данных по этой ссылке для разных городов в разные годы. выберите год по вашему выбору и выберите любой из текстовых файлов данных для анализа. В моем случае я выбрал набор данных CRND0103-2020-AK_Fairbanks_11_NE.txt для анализа жарких и холодных дней в Фэрбенксе, Аляска.
Мы можем получить информацию о данных из файла README.txt, доступного на сайте NCEI.
Шаг 2:
Ниже приведен пример нашего набора данных, где столбец 6 и столбец 7 показывают максимальную и минимальную температуру соответственно.
Шаг 3:
Make a project in Eclipse with below steps:
- First Open Eclipse -> then select File -> New -> Java Project ->Name it MyProject -> then select use an execution environment -> choose JavaSE-1.8 then next -> Finish.
- In this Project Create Java class with name MyMaxMin -> then click Finish
- Copy the below source code to this MyMaxMin java class
// importing Libraries
import
java.io.IOException;
import
java.util.Iterator;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.conf.Configuration;
public
class
MyMaxMin {
// Mapper
/*MaxTemperatureMapper class is static
* and extends Mapper abstract class
* having four Hadoop generics type
* LongWritable, Text, Text, Text.
*/
public
static
class
MaxTemperatureMapper
extends
Mapper<LongWritable, Text, Text, Text> {
/**
* @method map
* This method takes the input as a text data type.
* Now leaving the first five tokens, it takes
* 6th token is taken as temp_max and
* 7th token is taken as temp_min. Now
* temp_max > 30 and temp_min < 15 are
* passed to the reducer.
*/
// the data in our data set with
// this value is inconsistent data
public
static
final
int
MISSING =
9999
;
@Override
public
void
map(LongWritable arg0, Text Value, Context context)
throws
IOException, InterruptedException {
// Convert the single row(Record) to
// String and store it in String
// variable name line
String line = Value.toString();
// Check for the empty line
if
(!(line.length() ==
0
)) {
// from character 6 to 14 we have
// the date in our dataset
String date = line.substring(
6
,
14
);
// similarly we have taken the maximum
// temperature from 39 to 45 characters
float
temp_Max = Float.parseFloat(line.substring(
39
,
45
).trim());
// similarly we have taken the minimum
// temperature from 47 to 53 characters
float
temp_Min = Float.parseFloat(line.substring(
47
,
53
).trim());
// if maximum temperature is
// greater than 30, it is a hot day
if
(temp_Max >
30.0
) {
// Hot day
context.write(
new
Text(
"The Day is Hot Day :"
+ date),
new
Text(String.valueOf(temp_Max)));
}
// if the minimum temperature is
// less than 15, it is a cold day
if
(temp_Min <
15
) {
// Cold day
context.write(
new
Text(
"The Day is Cold Day :"
+ date),
new
Text(String.valueOf(temp_Min)));
}
}
}
}
// Reducer
/*MaxTemperatureReducer class is static
and extends Reducer abstract class
having four Hadoop generics type
Text, Text, Text, Text.
*/
public
static
class
MaxTemperatureReducer
extends
Reducer<Text, Text, Text, Text> {
/**
* @method reduce
* This method takes the input as key and
* list of values pair from the mapper,
* it does aggregation based on keys and
* produces the final context.
*/
public
void
reduce(Text Key, Iterator<Text> Values, Context context)
throws
IOException, InterruptedException {
// putting all the values in
// temperature variable of type String
String temperature = Values.next().toString();
context.write(Key,
new
Text(temperature));
}
}
/**
* @method main
* This method is used for setting
* all the configuration properties.
* It acts as a driver for map-reduce
* code.
*/
public
static
void
main(String[] args)
throws
Exception {
// reads the default configuration of the
// cluster from the configuration XML files
Configuration conf =
new
Configuration();
// Initializing the job with the
// default configuration of the cluster
Job job =
new
Job(conf,
"weather example"
);
// Assigning the driver class name
job.setJarByClass(MyMaxMin.
class
);
// Key type coming out of mapper
job.setMapOutputKeyClass(Text.
class
);
// value type coming out of mapper
job.setMapOutputValueClass(Text.
class
);
// Defining the mapper class name
job.setMapperClass(MaxTemperatureMapper.
class
);
// Defining the reducer class name
job.setReducerClass(MaxTemperatureReducer.
class
);
// Defining input Format class which is
// responsible to parse the dataset
// into a key value pair
job.setInputFormatClass(TextInputFormat.
class
);
// Defining output Format class which is
// responsible to parse the dataset
// into a key value pair
job.setOutputFormatClass(TextOutputFormat.
class
);
// setting the second argument
// as a path in a path variable
Path OutputPath =
new
Path(args[
1
]);
// Configuring the input path
// from the filesystem into the job
FileInputFormat.addInputPath(job,
new
Path(args[
0
]));
// Configuring the output path from
// the filesystem into the job
FileOutputFormat.setOutputPath(job,
new
Path(args[
1
]));
// deleting the context path automatically
// from hdfs so that we don"t have
// to delete it explicitly
OutputPath.getFileSystem(conf).delete(OutputPath);
// exiting the job only if the
// flag value becomes false
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
}
}
- Now we need to add external jar for the packages that we have import. Download the jar package Hadoop Common and Hadoop MapReduce Core according to your Hadoop version.
You can check Hadoop Version:
hadoop version
- Now we add these external jars to our MyProject. Right Click on MyProject -> then select Build Path-> Click on Configue Build Path and select Add External jars…. and add jars from it’s download location then click -> Apply and Close.
- Now export the project as jar file. Right-click on MyProject choose Export.. and go to Java -> JAR file click -> Next and choose your export destination then click -> Next.
choose Main Class as MyMaxMin by clicking -> Browse and then click -> Finish -> Ok.
Step 4:
Start our Hadoop Daemons
start-dfs.sh
start-yarn.sh
Step 5:
Move your dataset to the Hadoop HDFS.
Syntax:
hdfs dfs -put /file_path /destination
In below command / shows the root directory of our HDFS.
hdfs dfs -put /home/dikshant/Downloads/CRND0103-2020-AK_Fairbanks_11_NE.txt /
Check the file sent to our HDFS.
hdfs dfs -ls /
Step 6:
Now Run your Jar File with below command and produce the output in MyOutput File.
Syntax:
hadoop jar /jar_file_location /dataset_location_in_HDFS /output-file_name
Command:
hadoop jar /home/dikshant/Documents/Project.jar /CRND0103-2020-AK_Fairbanks_11_NE.txt /MyOutput
Step 7:
Now Move to localhost:50070/, under utilities select Browse the file system and download part-r-00000 in /MyOutput directory to see result.
Step 8:
See the result in the Downloaded File.
In the above image, you can see the top 10 results showing the cold days. The second column is a day in yyyy/mm/dd format. For Example, 20200101 means
year = 2020 month = 01 Date = 01