1.程序需要的材料
文件中各个字段的含义,其中第6,7,8,9是要统计的流量相关的字段.
文件内容:
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200
二.程序:
1 package mapreducejob; 2 3 /** 4 * 老师给的元数据信息如下: 5 * 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 6 * 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 7 * 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 8 * 第六个字段是上行数据包数. 9 * 第七个字段是下行数据包数 10 * 第八个是上行总流量 11 * 第九个是下行总流量 12 */ 13 14 import java.io.DataInput; 15 import java.io.DataOutput; 16 import java.io.IOException; 17 18 import org.apache.hadoop.conf.Configuration; 19 import org.apache.hadoop.fs.Path; 20 import org.apache.hadoop.io.LongWritable; 21 import org.apache.hadoop.io.Text; 22 import org.apache.hadoop.io.Writable; 23 import org.apache.hadoop.mapreduce.Job; 24 import org.apache.hadoop.mapreduce.Mapper; 25 import org.apache.hadoop.mapreduce.Reducer; 26 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 27 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 28 29 public class TrafficApp { 30 31 public static void main(String[] args) throws Exception { 32 Job job = Job.getInstance(new Configuration(), 33 TrafficApp.class.getSimpleName()); 34 job.setJarByClass(TrafficApp.class);// 通过jar包运行. 35 36 FileInputFormat.setInputPaths(job, args[0]);// 数据输入,指定数据源 37 38 job.setMapperClass(MyMapper.class);// 给job设置map 39 job.setMapOutputKeyClass(Text.class); 40 job.setMapOutputValueClass(TrafficWritable.class); 41 42 job.setReducerClass(MyReducer.class); 43 job.setOutputKeyClass(Text.class); 44 job.setOutputValueClass(TrafficWritable.class); 45 46 FileOutputFormat.setOutputPath(job, new Path(args[1])); 47 48 job.waitForCompletion(true);// 在集群中运行 49 } 50 51 public static class MyMapper extends 52 Mapper{ 53 // 这四个参数分别是 和 54 // k1代表的是字节的偏移量,v1是原始数据. k2是手机号.v2是每一次的通话流量 55 Text k2 = new Text();// new 一个作为k2手机号. 56 TrafficWritable v2 = new TrafficWritable();// new 一个作为v2 57 58 @Override 59 protected void map( 60 LongWritable key, 61 Text value, 62 Mapper .Context context) 63 throws IOException, InterruptedException { 64 String line = value.toString(); 65 String[] splited = line.split("\t");// 以制表符作为拆分符得到一个字节数组. 66 // 通过原始数据文件可以看到这个里面有11个字段,所以这个拆分的数组长度为11 67 k2.set(splited[1]);// k2是手机号,在这个数组中是第二个. 68 v2.set(splited[6], splited[7], splited[8], splited[9]);// v2是代表四个流量 69 // 对应这个被拆分数组的第6,7,8,9个. 70 context.write(k2, v2); 71 } 72 73 } 74 75 public static class MyReducer extends 76 Reducer { 77 // 四个参数分别是 78 // k2是手机号,v2是流量TrafficWritable k3是手机号,v3是流量汇总. 79 TrafficWritable v3 = new TrafficWritable(); 80 81 @Override 82 protected void reduce( 83 Text k2, 84 Iterable v2s, 85 Reducer .Context context) 86 throws IOException, InterruptedException { 87 // reduce方法的第一个形参是k2,第二个形参是v2s,第三个形参是一个context上下文 88 // v2s是流量集合.我们在reduce方法中要做的就是把v2汇总起来变成v3. 89 long t1 = 0L; 90 long t2 = 0L; 91 long t3 = 0L; 92 long t4 = 0L; 93 for (TrafficWritable v2 : v2s) { 94 t1 += v2.t1; 95 t2 += v2.t2; 96 t3 += v2.t3; 97 t4 += v2.t4; 98 } 99 v3.set(t1, t2, t3, t4);//构造v3100 context.write(k2, v3);101 }102 103 }104 105 /**106 * 针对流量设置一个流量类. 第六个字段是上行数据包数. 第七个字段是下行数据包数. 第八个是上行总流量. 第九个是下行总流量107 *108 */109 static class TrafficWritable implements Writable {110 // 这个类是流量统计类,这个类包含了该手机号的上传和下载的流量111 // 在MapReduce中的键值对中代表的是v3,有四列组成.112 long t1;113 long t2;114 long t3;115 long t4;116 117 // 再搞一个无产的构造函数,否则容易出错118 public TrafficWritable() {119 }120 121 public void set(long t1, long t2, long t3, long t4) {122 // 赋值的方法,这个地方是传入的long类型.123 this.t1 = t1;124 this.t2 = t2;125 this.t3 = t3;126 this.t4 = t4;127 }128 129 public void set(String t1, String t2, String t3, String t4) {130 // 赋值的方法,这个地方是传入的String类型.131 this.t1 = Long.parseLong(t1);132 this.t2 = Long.parseLong(t2);133 this.t3 = Long.parseLong(t3);134 this.t4 = Long.parseLong(t4);135 }136 137 public void readFields(DataInput in) throws IOException {138 // 四列都通过in.readLong()读进来.139 this.t1 = in.readLong();140 this.t2 = in.readLong();141 this.t3 = in.readLong();142 this.t4 = in.readLong();143 }144 145 public void write(DataOutput out) throws IOException {146 // 这个对象有四列,必须要把四列都给写出去.147 out.writeLong(t1);148 out.writeLong(t2);149 out.writeLong(t3);150 out.writeLong(t4);151 }152 153 public String toString() {154 // 在Reduce阶段会用到这个方法,否则输出的是哈希编码155 return this.t1 + "\t" + this.t2 + "\t" + this.t3 + "\t" + this.t4;156 }157 }158 159 }
//===============================================================
代码二:
1 package mapreduce; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.Writable; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Partitioner; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 19 public class TrafficApp { 20 public static void main(String[] args) throws Exception { 21 Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName()); 22 job.setJarByClass(TrafficApp.class); 23 24 FileInputFormat.setInputPaths(job, args[0]); 25 26 job.setMapperClass(TrafficMapper.class); 27 job.setMapOutputKeyClass(Text.class); 28 job.setMapOutputValueClass(TrafficWritable.class); 29 30 job.setNumReduceTasks(2);//设定Reduce的数量为2 31 job.setPartitionerClass(TrafficPartitioner.class);//设定一个Partitioner的类. 32 /* 33 *Partitioner是如何实现不同的Map输出分配到不同的Reduce中? 34 *在不适用指定的Partitioner时,有 一个默认的Partitioner. 35 *就是HashPartitioner. 36 *其只有一行代码,其意思就是过来的key,不管是什么,模numberReduceTasks之后 返回值就是reduce任务的编号. 37 *numberReduceTasks的默认值是1. 任何一个数模1(取余数)都是0. 38 *这个地方0就是取编号为0的Reduce.(Reduce从0开始编号.) 39 */ 40 41 job.setReducerClass(TrafficReducer.class); 42 job.setOutputKeyClass(Text.class); 43 job.setOutputValueClass(TrafficWritable.class); 44 45 FileOutputFormat.setOutputPath(job, new Path(args[1])); 46 job.waitForCompletion(true); 47 } 48 49 public static class TrafficPartitioner extends Partitioner{ //k2,v2 50 51 @Override 52 public int getPartition(Text key, TrafficWritable value,int numPartitions) { 53 long phoneNumber = Long.parseLong(key.toString()); 54 return (int)(phoneNumber%numPartitions); 55 } 56 57 } 58 59 60 /** 61 * 第一个参数是LongWritable类型是文本一行数据开头的字节数 62 * 第二个参数是文本中的一行数据 Text类型 63 * 第三个参数是要输出的手机号 Text类型 64 * 第四个参数是需要我们自定义的流量类型TrafficWritable 65 * @author ABC 66 * 67 */ 68 public static class TrafficMapper extends Mapper { 69 Text k2 = new Text(); 70 TrafficWritable v2 = null; 71 @Override 72 protected void map(LongWritable key,Text value, Mapper .Context context) 73 throws IOException, InterruptedException { 74 String line = value.toString(); 75 String[] splited = line.split("\t"); 76 77 k2.set(splited[1]);//这个值对应的是手机号码 78 v2 = new TrafficWritable(splited[6], splited[7], splited[8], splited[9]); 79 context.write(k2, v2); 80 } 81 82 } 83 84 public static class TrafficReducer extends Reducer { 85 @Override 86 protected void reduce(Text k2,Iterable v2s, 87 Reducer .Context context) 88 throws IOException, InterruptedException { 89 //遍历v2s 流量都这个集合里面 90 long t1 = 0L; 91 long t2 = 0L; 92 long t3 = 0L; 93 long t4 = 0L; 94 95 for (TrafficWritable v2 : v2s) { 96 t1 += v2.getT1(); 97 t2 += v2.getT2(); 98 t3 += v2.getT3(); 99 t4 += v2.getT4();100 }101 TrafficWritable v3 = new TrafficWritable(t1, t2, t3, t4);102 context.write(k2, v3);103 }104 }105 106 public static class TrafficWritable implements Writable{107 private long t1;108 private long t2;109 private long t3;110 private long t4;111 //写两个构造方法,一个是有参数的构造方法,一个是无参数的构造方法.112 //必须要有 一个无参数的构造方法,否则程序运行会报错.113 114 public TrafficWritable(){115 super();116 }117 118 public TrafficWritable(long t1, long t2, long t3, long t4) {119 super();120 this.t1 = t1;121 this.t2 = t2;122 this.t3 = t3;123 this.t4 = t4;124 }125 //在程序中读取文本穿过来的都是字符串,所以再搞一个字符串类型的构造方法126 public TrafficWritable(String t1, String t2, String t3, String t4) {127 super();128 this.t1 = Long.parseLong(t1);129 this.t2 = Long.parseLong(t2);130 this.t3 = Long.parseLong(t3);131 this.t4 = Long.parseLong(t4);132 }133 134 public void write(DataOutput out) throws IOException {135 //对各个成员变量进行序列化136 out.writeLong(t1);137 out.writeLong(t2);138 out.writeLong(t3);139 out.writeLong(t4);140 }141 142 public void readFields(DataInput in) throws IOException {143 //对成员变量进行反序列化144 this.t1 = in.readLong();145 this.t2 = in.readLong();146 this.t3 = in.readLong();147 this.t4 = in.readLong();148 }149 150 public long getT1() {151 return t1;152 }153 154 public void setT1(long t1) {155 this.t1 = t1;156 }157 158 public long getT2() {159 return t2;160 }161 162 public void setT2(long t2) {163 this.t2 = t2;164 }165 166 public long getT3() {167 return t3;168 }169 170 public void setT3(long t3) {171 this.t3 = t3;172 }173 174 public long getT4() {175 return t4;176 }177 178 public void setT4(long t4) {179 this.t4 = t4;180 }181 182 @Override183 public String toString() {184 return t1 + "\t" + t2 + "\t" + t3 + "\t" + t4 ;185 }186 187 }188 }