irpas技术客

MapReduce基础编程之按日期统计及按日期排序_邵奈一

网络投稿 6345

大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。 1、世人称我为:被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员… 2、这几年,我整理了很多IT技术相关的教程给大家,爱生活、爱分享。 3、如果您觉得文章有用,请收藏,转发,评论,并关注我,谢谢! 博客导航跳转(请收藏):邵奈一的技术博客导航 | 公众号 | 微信 | CSDN | 掘金 | 51CTO | 简书 | 微博 |


教程目录 0x00 教程内容0x01 项目准备1. 新建Maven项目2. 需求说明 0x02 编写代码1. 需求1:按日期进行统计2. 需求2:按日期进行排序 0x03 运行代码并观察结果1. 需求1:按日期进行统计2. 需求2:按日期进行排序 0x04 彩蛋1. 打包放到HDFS上去统计 0xFF 总结

0x00 教程内容 项目准备编写代码运行代码并观察结果 0x01 项目准备 1. 新建Maven项目

(1)新建项目 (2)引入Hadoop相关的Jar包

<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.5</version> </dependency>

(3)将数据拷贝到项目中 数据示例:

Nehru,2016-01-01 Dane,2016-01-01 Walter,2016-01-01 Gloria,2016-01-01 Clarke,2016-01-01 Madeline,2016-01-01 Kevyn,2016-01-01

数据说明: 1、文件名是: user_login.txt 2、字段只有两个,一个是 名字 、一个是 登录的日期 3、分隔符是 , 号

2. 需求说明

(1)需求1:按日期进行统计,其实就是统计某一天,一共有多少人登录 (2)需求2:按日期进行排序,其实就是按登录人数的低到高进行排序

0x02 编写代码 1. 需求1:按日期进行统计

完整代码如下:

package com.shaonaiyi.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class dailyAccessCount { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 按逗号进行分割 String array[] = line.split(","); // 将日期作为key String keyOutput = array[1]; // 输出格式:(日期, 1) context.write(new Text(keyOutput), one); } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 定义统计结果result private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义累加器,初始值为0 int sum = 0; // 遍历将map传递过来的相同日期所对应的1进行累加 for (IntWritable val : values) { sum += val.get(); } // 给统计结果result设值 result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { // 参数小于2个时报错并提示内容 System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "Daily Access Count"); job.setJarByClass(dailyAccessCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 2. 需求2:按日期进行排序

完整代码如下:

package com.shaonaiyi.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class accessTimesSort { public static class MyMapper extends Mapper<Object, Text, IntWritable, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); // 按tab键作为分隔符 String array[] = lines.split("\t"); // 将访问次数作为key int keyOutput = Integer.parseInt(array[1]); // 将日期作为value String valueOutput = array[0]; context.write(new IntWritable(keyOutput), new Text(valueOutput)); } } public static class MyReducer extends Reducer<IntWritable, Text, Text, IntWritable> { public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { // 对于IntWritable类型的key,MapReduce会默认进行升序排序 context.write(value, key); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "Access Time Sort"); job.setJarByClass(accessTimesSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

说明: 1、如果key为IntWritable类型,MapReduce会默认进行升序排序; 2、如果key为Text类型,MapReduce会默认按照字典顺序对字符串排序。

0x03 运行代码并观察结果 1. 需求1:按日期进行统计

(1)需求1传递参数 然后输入参数两个参数: (2)结果

2. 需求2:按日期进行排序

(1)需求1传递参数 (2)结果

0x04 彩蛋 1. 打包放到HDFS上去统计

(1)将数据放到HDFS的 / 路径 (2)将项目达成jar包,比如此处为 hadoop-1.0.jar (3)执行命令 格式为:

hadoop jar xxx.jar main方法的类 统计的文件路径 输出结果的路径

执行命令为:

hadoop jar target/hadoop-1.0.jar com.shaonaiyi.mapreduce.dailyAccessCount /user_login.txt /output

统计结果其实已经有了:

0xFF 总结 本文章对MapReduce进行基础的学习想要学习更多大数据相关内容,请关注我!

邵奈一 原创不易,如转载请标明出处,教育是一生的事业。



1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #公众号 #微信