云计算实验:Java MapReduce编程

作者:wow_awsl_qwq 时间:2021-10-08 14:25:52 

实验题目:

MapReduce:编程

实验内容:

本实验利用 Hadoop 提供的 Java API 进行编程进行 MapReduce 编程。

实验目标:

  • 掌握MapReduce编程。

  • 理解MapReduce原理

【实验作业】简单流量统计

有如下这样的日志文件:

13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230513 00-FD-07-A4-72-B8:CMCC 120.196.40.8 i02.c.aliimg.com 248 0 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230533 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230543 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Video website 1527 2106 200
13926230553 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13826230563 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13926230573 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 Integrated portal 1938 2910 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 i02.c.aliimg.com 3333 21321 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Search Engines 9531 9531 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200

该日志文件记录了每个手机用户在一段时间内的网络流量信息,具体字段含义为:

手机号码 MAC地址 IP地址 域名 上行流量(字节数) 下行流量(字节数) 套餐类型
根据以上日志,统计出每个手机用户在该时间段内的总流量(上行流量+下行流量),统计结果的格式为:

手机号码 字节数量

实验结果:

云计算实验:Java MapReduce编程

实验代码:

WcMap.java


import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
       @Override
       protected void map(LongWritable key, Text value, Context context)
           throws IOException, InterruptedException {
                   String str = value.toString();
                   String[] words = StringUtils.split(str," ",10);
                   int i=0;
                   for(String word : words){
                       if(i==words.length-2||i==words.length-3)
                       context.write(new Text(words[0]), new LongWritable(Integer.parseInt(word)));
                       i++;
                   }
           }
       }

WcReduce.java


import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
   @Override
   protected void reduce(Text key, Iterable<LongWritable> values,Context context)
           throws IOException, InterruptedException {
       long count = 0;
       for(LongWritable value : values){
           count += value.get();
       }
       context.write(key, new LongWritable(count));
   }
}

WcRunner.java


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;

public class WcRunner{
   public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       Configuration conf = new Configuration();
       Job job = Job.getInstance(conf);

job.setJarByClass(WcRunner.class);

job.setMapperClass(WcMap.class);
       job.setReducerClass(WcReduce.class);

job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(LongWritable.class);

job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(LongWritable.class);

Scanner sc = new Scanner(System.in);
       System.out.print("inputPath:");
       String inputPath = sc.next();
       System.out.print("outputPath:");
       String outputPath = sc.next();

try {
           FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
           Path hdfsPath = new Path(outputPath);
           fs0.copyFromLocalFile(new Path("/headless/Desktop/workspace/mapreduce/WordCount/data/1.txt"),new Path("/mapreduce/WordCount/input/1.txt"));
           if(fs0.delete(hdfsPath,true)){
               System.out.println("Directory "+ outputPath +" has been deleted successfully!");
           }
       }catch(Exception e) {
           e.printStackTrace();
       }
       FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
       FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
       job.waitForCompletion(true);
       try {
           FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
           Path srcPath = new Path(outputPath+"/part-r-00000");

FSDataInputStream is = fs.open(srcPath);
           System.out.println("Results:");
           while(true) {
               String line = is.readLine();
               if(line == null) {
                   break;
               }
               System.out.println(line);
           }
           is.close();
       }catch(Exception e) {
           e.printStackTrace();
       }
   }
}

【实验作业】索引倒排输出行号

在索引倒排实验中,我们可以得到每个单词分布在哪些文件中,以及在每个文件中出现的次数,修改以上实现,在输出的倒排索引结果中可以得到每个单词在每个文件中的具体行号信息。输出结果的格式如下:
单词 文件名:行号,文件名:行号,文件名:行号

实验结果:

MapReduce在3.txt的第一行出现了两次所以有两个1

云计算实验:Java MapReduce编程


import java.io.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyMapper extends Mapper<Object,Text,Text,Text>{
   private Text keyInfo = new Text();
   private Text valueInfo = new Text();
   private FileSplit split;
   int num=0;

public void map(Object key,Text value,Context context)
           throws IOException,InterruptedException{
       num++;
       split = (FileSplit)context.getInputSplit();
       StringTokenizer itr = new StringTokenizer(value.toString());
       while(itr.hasMoreTokens()){
           keyInfo.set(itr.nextToken()+" "+split.getPath().getName().toString());
           valueInfo.set(num+"");
           context.write(keyInfo,valueInfo);
       }
   }
}


import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class MyCombiner extends Reducer<Text,Text,Text,Text>{

private Text info = new Text();

public void reduce(Text key,Iterable<Text>values,Context context)
           throws IOException, InterruptedException{
       String  sum = "";
       for(Text value:values){
           sum += value.toString()+" ";
       }

String record = key.toString();
       String[] str = record.split(" ");

key.set(str[0]);
       info.set(str[1]+":"+sum);
       context.write(key,info);
   }
}


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text,Text,Text,Text>{
   private Text result = new Text();
   public void reduce(Text key,Iterable<Text>values,Context context) throws

IOException, InterruptedException{
       String value =new String();
       for(Text value1:values){
           value += value1.toString()+" ; ";
       }
       result.set(value);
       context.write(key,result);
   }
}


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;

public class MyRunner {
   public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
       Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(MyRunner.class);

job.setMapperClass(MyMapper.class);
       job.setReducerClass(MyReducer.class);
       job.setCombinerClass(MyCombiner.class);

job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);

job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(Text.class);

Scanner sc = new Scanner(System.in);
       System.out.print("inputPath:");
       String inputPath = sc.next();
       System.out.print("outputPath:");
       String outputPath = sc.next();

try {
           FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
           Path hdfsPath = new Path(outputPath);
           if(fs0.delete(hdfsPath,true)){
               System.out.println("Directory "+ outputPath +" has been deleted successfully!");
           }
       }catch(Exception e) {
           e.printStackTrace();
       }

FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));

FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));

job.waitForCompletion(true);

try {
           FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
           Path srcPath = new Path(outputPath+"/part-r-00000");

FSDataInputStream is = fs.open(srcPath);
           System.out.println("Results:");
           while(true) {
               String line = is.readLine();
               if(line == null) {
                   break;
               }
               System.out.println(line);
           }
           is.close();
       }catch(Exception e) {
           e.printStackTrace();
       }

}
}

来源:https://blog.csdn.net/qq_42641977/article/details/121578883

标签:云计算实验,MapReduce,Java
0
投稿

猜你喜欢

  • java实现简易超市管理系统 附源码下载

    2021-11-05 18:58:30
  • java多线程中的异常处理机制简析

    2021-11-18 01:54:39
  • java阶乘计算获得结果末尾0的个数代码实现

    2022-11-30 00:01:02
  • c# 应用事务的简单实例

    2021-11-24 23:48:34
  • .NET(C#):Emit创建异常处理的方法

    2023-11-05 04:03:02
  • Spring BeanPostProcessor接口使用详解

    2022-12-18 09:19:36
  • Android仿新浪微博分页管理界面(3)

    2023-08-04 19:14:02
  • Java运算符的知识点与代码汇总

    2022-12-05 19:03:36
  • 老生常谈Java 网络编程 —— Socket 详解

    2023-07-12 16:32:54
  • Mybatis之typeAlias配置的3种方式小结

    2023-11-26 16:42:14
  • 基于Freemarker和xml实现Java导出word

    2022-07-11 23:15:12
  • 深入浅出探索Java分布式锁原理

    2021-12-16 11:58:07
  • Java中构造函数,set/get方法和toString方法使用及注意说明

    2021-07-15 13:01:39
  • Android 微信摇一摇功能实现详细介绍

    2023-06-21 21:00:09
  • Java8中CompletableFuture的用法全解

    2023-09-08 15:08:55
  • java实战小技巧之字符串与容器互转详解

    2023-09-04 10:56:01
  • 一篇文章带你深入了解Java封装

    2023-11-20 00:37:45
  • Hibernate批量处理海量数据的方法

    2023-07-30 08:12:04
  • Maven搭建springboot项目的方法步骤

    2022-08-08 09:50:09
  • 深入理解Java设计模式之备忘录模式

    2023-09-20 06:16:43
  • asp之家 软件编程 m.aspxhome.com