博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce的手机流量统计的案例
阅读量:4983 次
发布时间:2019-06-12

本文共 12916 字,大约阅读时间需要 43 分钟。

1.程序需要的材料

文件中各个字段的含义,其中第6,7,8,9是要统计的流量相关的字段.

文件内容:

13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200    13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200    18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200    84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200    15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200    13823070001    20-7C-8F-70-68-1F:CMCC    120.196.100.99            6    3    360    180    200    13600217502    00-1F-64-E2-E8-B1:CMCC    120.196.100.55            18    138    1080    186852    200

二.程序:

1 package mapreducejob;  2   3 /**  4  * 老师给的元数据信息如下:  5  *  1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200  6  *    1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200  7  *    1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200  8  *  第六个字段是上行数据包数.  9  *  第七个字段是下行数据包数 10  *  第八个是上行总流量 11  *  第九个是下行总流量 12  */ 13  14 import java.io.DataInput; 15 import java.io.DataOutput; 16 import java.io.IOException; 17  18 import org.apache.hadoop.conf.Configuration; 19 import org.apache.hadoop.fs.Path; 20 import org.apache.hadoop.io.LongWritable; 21 import org.apache.hadoop.io.Text; 22 import org.apache.hadoop.io.Writable; 23 import org.apache.hadoop.mapreduce.Job; 24 import org.apache.hadoop.mapreduce.Mapper; 25 import org.apache.hadoop.mapreduce.Reducer; 26 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 27 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 28  29 public class TrafficApp { 30  31     public static void main(String[] args) throws Exception { 32         Job job = Job.getInstance(new Configuration(), 33                 TrafficApp.class.getSimpleName()); 34         job.setJarByClass(TrafficApp.class);// 通过jar包运行. 35  36         FileInputFormat.setInputPaths(job, args[0]);// 数据输入,指定数据源 37  38         job.setMapperClass(MyMapper.class);// 给job设置map 39         job.setMapOutputKeyClass(Text.class); 40         job.setMapOutputValueClass(TrafficWritable.class); 41  42         job.setReducerClass(MyReducer.class); 43         job.setOutputKeyClass(Text.class); 44         job.setOutputValueClass(TrafficWritable.class); 45  46         FileOutputFormat.setOutputPath(job, new Path(args[1])); 47  48         job.waitForCompletion(true);// 在集群中运行 49     } 50  51     public static class MyMapper extends 52             Mapper
{ 53 // 这四个参数分别是
54 // k1代表的是字节的偏移量,v1是原始数据. k2是手机号.v2是每一次的通话流量 55 Text k2 = new Text();// new 一个作为k2手机号. 56 TrafficWritable v2 = new TrafficWritable();// new 一个作为v2 57 58 @Override 59 protected void map( 60 LongWritable key, 61 Text value, 62 Mapper
.Context context) 63 throws IOException, InterruptedException { 64 String line = value.toString(); 65 String[] splited = line.split("\t");// 以制表符作为拆分符得到一个字节数组. 66 // 通过原始数据文件可以看到这个里面有11个字段,所以这个拆分的数组长度为11 67 k2.set(splited[1]);// k2是手机号,在这个数组中是第二个. 68 v2.set(splited[6], splited[7], splited[8], splited[9]);// v2是代表四个流量 69 // 对应这个被拆分数组的第6,7,8,9个. 70 context.write(k2, v2); 71 } 72 73 } 74 75 public static class MyReducer extends 76 Reducer
{ 77 // 四个参数分别是
78 // k2是手机号,v2是流量TrafficWritable k3是手机号,v3是流量汇总. 79 TrafficWritable v3 = new TrafficWritable(); 80 81 @Override 82 protected void reduce( 83 Text k2, 84 Iterable
v2s, 85 Reducer
.Context context) 86 throws IOException, InterruptedException { 87 // reduce方法的第一个形参是k2,第二个形参是v2s,第三个形参是一个context上下文 88 // v2s是流量集合.我们在reduce方法中要做的就是把v2汇总起来变成v3. 89 long t1 = 0L; 90 long t2 = 0L; 91 long t3 = 0L; 92 long t4 = 0L; 93 for (TrafficWritable v2 : v2s) { 94 t1 += v2.t1; 95 t2 += v2.t2; 96 t3 += v2.t3; 97 t4 += v2.t4; 98 } 99 v3.set(t1, t2, t3, t4);//构造v3100 context.write(k2, v3);101 }102 103 }104 105 /**106 * 针对流量设置一个流量类. 第六个字段是上行数据包数. 第七个字段是下行数据包数. 第八个是上行总流量. 第九个是下行总流量107 *108 */109 static class TrafficWritable implements Writable {110 // 这个类是流量统计类,这个类包含了该手机号的上传和下载的流量111 // 在MapReduce中的键值对中代表的是v3,有四列组成.112 long t1;113 long t2;114 long t3;115 long t4;116 117 // 再搞一个无产的构造函数,否则容易出错118 public TrafficWritable() {119 }120 121 public void set(long t1, long t2, long t3, long t4) {122 // 赋值的方法,这个地方是传入的long类型.123 this.t1 = t1;124 this.t2 = t2;125 this.t3 = t3;126 this.t4 = t4;127 }128 129 public void set(String t1, String t2, String t3, String t4) {130 // 赋值的方法,这个地方是传入的String类型.131 this.t1 = Long.parseLong(t1);132 this.t2 = Long.parseLong(t2);133 this.t3 = Long.parseLong(t3);134 this.t4 = Long.parseLong(t4);135 }136 137 public void readFields(DataInput in) throws IOException {138 // 四列都通过in.readLong()读进来.139 this.t1 = in.readLong();140 this.t2 = in.readLong();141 this.t3 = in.readLong();142 this.t4 = in.readLong();143 }144 145 public void write(DataOutput out) throws IOException {146 // 这个对象有四列,必须要把四列都给写出去.147 out.writeLong(t1);148 out.writeLong(t2);149 out.writeLong(t3);150 out.writeLong(t4);151 }152 153 public String toString() {154 // 在Reduce阶段会用到这个方法,否则输出的是哈希编码155 return this.t1 + "\t" + this.t2 + "\t" + this.t3 + "\t" + this.t4;156 }157 }158 159 }

//===============================================================

代码二:

1 package mapreduce;  2   3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6   7 import org.apache.hadoop.conf.Configuration;  8 import org.apache.hadoop.fs.Path;  9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.Writable; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Partitioner; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18  19 public class TrafficApp { 20     public static void main(String[] args) throws Exception { 21         Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName()); 22         job.setJarByClass(TrafficApp.class); 23          24         FileInputFormat.setInputPaths(job, args[0]); 25          26         job.setMapperClass(TrafficMapper.class); 27         job.setMapOutputKeyClass(Text.class); 28         job.setMapOutputValueClass(TrafficWritable.class); 29          30         job.setNumReduceTasks(2);//设定Reduce的数量为2 31         job.setPartitionerClass(TrafficPartitioner.class);//设定一个Partitioner的类. 32         /* 33          *Partitioner是如何实现不同的Map输出分配到不同的Reduce中? 34          *在不适用指定的Partitioner时,有 一个默认的Partitioner. 35          *就是HashPartitioner.  36          *其只有一行代码,其意思就是过来的key,不管是什么,模numberReduceTasks之后 返回值就是reduce任务的编号. 37          *numberReduceTasks的默认值是1.  任何一个数模1(取余数)都是0.  38          *这个地方0就是取编号为0的Reduce.(Reduce从0开始编号.)  39          */ 40          41         job.setReducerClass(TrafficReducer.class); 42         job.setOutputKeyClass(Text.class); 43         job.setOutputValueClass(TrafficWritable.class); 44          45         FileOutputFormat.setOutputPath(job, new Path(args[1])); 46         job.waitForCompletion(true); 47     } 48      49     public static class TrafficPartitioner extends Partitioner
{
//k2,v2 50 51 @Override 52 public int getPartition(Text key, TrafficWritable value,int numPartitions) { 53 long phoneNumber = Long.parseLong(key.toString()); 54 return (int)(phoneNumber%numPartitions); 55 } 56 57 } 58 59 60 /** 61 * 第一个参数是LongWritable类型是文本一行数据开头的字节数 62 * 第二个参数是文本中的一行数据 Text类型 63 * 第三个参数是要输出的手机号 Text类型 64 * 第四个参数是需要我们自定义的流量类型TrafficWritable 65 * @author ABC 66 * 67 */ 68 public static class TrafficMapper extends Mapper
{ 69 Text k2 = new Text(); 70 TrafficWritable v2 = null; 71 @Override 72 protected void map(LongWritable key,Text value, Mapper
.Context context) 73 throws IOException, InterruptedException { 74 String line = value.toString(); 75 String[] splited = line.split("\t"); 76 77 k2.set(splited[1]);//这个值对应的是手机号码 78 v2 = new TrafficWritable(splited[6], splited[7], splited[8], splited[9]); 79 context.write(k2, v2); 80 } 81 82 } 83 84 public static class TrafficReducer extends Reducer
{ 85 @Override 86 protected void reduce(Text k2,Iterable
v2s, 87 Reducer
.Context context) 88 throws IOException, InterruptedException { 89 //遍历v2s 流量都这个集合里面 90 long t1 = 0L; 91 long t2 = 0L; 92 long t3 = 0L; 93 long t4 = 0L; 94 95 for (TrafficWritable v2 : v2s) { 96 t1 += v2.getT1(); 97 t2 += v2.getT2(); 98 t3 += v2.getT3(); 99 t4 += v2.getT4();100 }101 TrafficWritable v3 = new TrafficWritable(t1, t2, t3, t4);102 context.write(k2, v3);103 }104 }105 106 public static class TrafficWritable implements Writable{107 private long t1;108 private long t2;109 private long t3;110 private long t4;111 //写两个构造方法,一个是有参数的构造方法,一个是无参数的构造方法.112 //必须要有 一个无参数的构造方法,否则程序运行会报错.113 114 public TrafficWritable(){115 super();116 }117 118 public TrafficWritable(long t1, long t2, long t3, long t4) {119 super();120 this.t1 = t1;121 this.t2 = t2;122 this.t3 = t3;123 this.t4 = t4;124 }125 //在程序中读取文本穿过来的都是字符串,所以再搞一个字符串类型的构造方法126 public TrafficWritable(String t1, String t2, String t3, String t4) {127 super();128 this.t1 = Long.parseLong(t1);129 this.t2 = Long.parseLong(t2);130 this.t3 = Long.parseLong(t3);131 this.t4 = Long.parseLong(t4);132 }133 134 public void write(DataOutput out) throws IOException {135 //对各个成员变量进行序列化136 out.writeLong(t1);137 out.writeLong(t2);138 out.writeLong(t3);139 out.writeLong(t4);140 }141 142 public void readFields(DataInput in) throws IOException {143 //对成员变量进行反序列化144 this.t1 = in.readLong();145 this.t2 = in.readLong();146 this.t3 = in.readLong();147 this.t4 = in.readLong();148 }149 150 public long getT1() {151 return t1;152 }153 154 public void setT1(long t1) {155 this.t1 = t1;156 }157 158 public long getT2() {159 return t2;160 }161 162 public void setT2(long t2) {163 this.t2 = t2;164 }165 166 public long getT3() {167 return t3;168 }169 170 public void setT3(long t3) {171 this.t3 = t3;172 }173 174 public long getT4() {175 return t4;176 }177 178 public void setT4(long t4) {179 this.t4 = t4;180 }181 182 @Override183 public String toString() {184 return t1 + "\t" + t2 + "\t" + t3 + "\t" + t4 ;185 }186 187 }188 }

 

转载于:https://www.cnblogs.com/DreamDrive/p/6260491.html

你可能感兴趣的文章
Python3 中 configparser 模块解析配置的用法详解
查看>>
新手android环境搭建、debug调试及各种插件安装__图文全解
查看>>
未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”提供程序 win2008R2 X64 IIS7.5
查看>>
Diffuse贴图+Lightmap+Ambient
查看>>
矩阵树定理
查看>>
[算法]Evaluate Reverse Polish Notation
查看>>
go语言之进阶篇接口的定义和实现以及接口的继承
查看>>
SmartPhone手机网站的制作
查看>>
自适应全屏与居中算法
查看>>
构建之法阅读笔记(一)
查看>>
帮助你设计的50个自由和新鲜的图标集
查看>>
Glusterfs[转]
查看>>
javascript缩写
查看>>
GA来源分析
查看>>
常用统计指标
查看>>
iOS设置圆角矩形和阴影效果
查看>>
在博客园的第一篇文章,先简单自述一下吧
查看>>
深入了解 Dojo 的服务器推送技术
查看>>
hdu 4284 状态压缩
查看>>
逆向分析技术
查看>>