java实现MapReduce对文件进行切分的示例代码

作者:liangzai2048 时间:2023-10-07 21:46:59 

比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?

1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总

java实现MapReduce对文件进行切分的示例代码

例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中

java实现MapReduce对文件进行切分的示例代码

创建MapTask

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class MapTask extends Thread {
   //用来接收具体的哪一个文件
   private File file;
   private int flag;

public MapTask(File file, int flag) {
       this.file = file;
       this.flag = flag;
   }

@Override
   public void run() {
       try {
           BufferedReader br = new BufferedReader(new FileReader(file));
           String line;
           HashMap<String, Integer> map = new HashMap<String, Integer>();
           while ((line = br.readLine()) != null) {
               /**
                * 统计班级人数HashMap存储
                */
               String clazz = line.split(",")[4];
               if (!map.containsKey(clazz)) {
                   map.put(clazz, 1);
               } else {
                   map.put(clazz, map.get(clazz) + 1);
               }
           }
           br.close();
           BufferedWriter bw = new BufferedWriter(
                   new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
           Set<Map.Entry<String, Integer>> entries = map.entrySet();
           for (Map.Entry<String, Integer> entry : entries) {
               String key = entry.getKey();
               Integer value = entry.getValue();
               bw.write(key + ":" + value);
               bw.newLine();
           }
           bw.flush();
           bw.close();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

创建Map

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Map {
   public static void main(String[] args) {
       long start = System.currentTimeMillis();
       // 多线程连接池(线程池)
       ExecutorService executorService = Executors.newFixedThreadPool(8);
       // 获取文件列表
       File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
       File[] files = file.listFiles();
       //创建多线程对象
       int flag = 0;
       for (File f : files) {
           //为每一个文件启动一个线程
           MapTask mapTask = new MapTask(f, flag);
           executorService.submit(mapTask);
           flag++;
       }
       executorService.shutdown();
       long end = System.currentTimeMillis();
       System.out.println(end-start);
   }
}

创建ClazzSum

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;

public class ClazzSum {
   public static void main(String[] args) throws Exception {
       long start = System.currentTimeMillis();
       BufferedReader br = new BufferedReader(
               new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
       String line;
       HashMap<String, Integer> map = new HashMap<String, Integer>();
       while ((line = br.readLine()) != null) {
           String clazz = line.split(",")[4];
           if (!map.containsKey(clazz)) {
               map.put(clazz, 1);
           } else {
               map.put(clazz, map.get(clazz) + 1);
           }
       }
       System.out.println(map);
       long end = System.currentTimeMillis();
       System.out.println(end-start);
   }
}

创建split128

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;

public class Split128 {
   public static void main(String[] args) throws Exception {
       BufferedReader br = new BufferedReader(
               new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));

//用作标记文件,也作为文件名称
       int index = 0;
       BufferedWriter bw = new BufferedWriter(
               new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));

ArrayList<String> list = new ArrayList<String>();
       String line;
       //用作累计读取了多少行数据
       int flag = 0;
       int row = 0;
       while ((line = br.readLine()) != null) {
           list.add(line);
           flag++;
           // flag = 140
           if (flag == 140) {// 一个文件读写完成,生成新的文件
               row = 0 + 128 * index;
               for (int i = row; i <= row + 127; i++) {
                   bw.write(list.get(i));
                   bw.newLine();
               }
               bw.flush();
               bw.close();
               /**
                * 生成新的文件
                * 计数清零
                */
               index++;
               flag = 12;
               bw = new BufferedWriter(
                       new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
           }
       }
       //文件读取剩余128*1.1范围之内
       for (int i = list.size() - flag; i < list.size(); i++) {
           bw.write(list.get(i));
           bw.newLine();
       }
       bw.flush();
       bw.close();
   }
}

创建Reduce

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;

public class Reduce {
   public static void main(String[] args) throws Exception {
       long start = System.currentTimeMillis();
       HashMap<String, Integer> map = new HashMap<String, Integer>();
       File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
       File[] files = file.listFiles();
       for (File f : files) {
           BufferedReader br = new BufferedReader(new FileReader(f));
           String line;
           while ((line = br.readLine()) != null) {
               String clazz = line.split(":")[0];
               int sum = Integer.valueOf(line.split(":")[1]);
               if (!map.containsKey(clazz)) {
                   map.put(clazz, sum);
               } else {
                   map.put(clazz, map.get(clazz) + sum);
               }
           }
       }
       long end = System.currentTimeMillis();
       System.out.println(end-start);
       System.out.println(map);
   }
}

java实现MapReduce对文件进行切分的示例代码

java实现MapReduce对文件进行切分的示例代码

最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。

来源:https://blog.csdn.net/hujieliang123/article/details/122546452

标签:java,MapReduce,切分
0
投稿

猜你喜欢

  • Mybatis表的关联查询详情

    2023-11-23 12:15:03
  • Android开发中Launcher3常见默认配置修改方法总结

    2023-08-28 06:43:43
  • Java编程实现非对称加密的方法详解

    2023-08-24 01:21:26
  • Java 反射机制原理与用法详解

    2023-09-18 02:51:48
  • Android RecyclerView显示Item布局不一致解决办法

    2023-04-25 06:50:32
  • 三十分钟快速掌握C# 6.0知识点

    2022-10-15 04:26:00
  • Java中输出字符的ASCII值实例

    2023-02-27 08:59:11
  • java实现递归菜单树

    2023-02-01 00:03:06
  • SpringBoot借助spring.factories文件跨模块实例化Bean

    2021-12-01 18:22:41
  • springboot2.0使用Hikari连接池的方法(替换druid)

    2023-04-12 00:54:33
  • Java实现级联下拉结构的示例代码

    2023-11-03 18:22:06
  • 关于Spring Boot项目的 log4j2 核弹漏洞问题(一行代码配置搞定)

    2022-08-26 03:04:20
  • 一键设置java 环境变量 cmd下查看、修改(覆盖与添加)等说明

    2023-03-29 07:09:25
  • Java和Ceylon对象的构造和验证

    2022-04-05 04:28:37
  • spring boot 日志配置详解

    2023-09-24 00:23:11
  • 深入理解Spring中bean的生命周期介绍

    2023-02-08 17:21:37
  • springBoot service层事务控制的操作

    2022-02-01 12:39:40
  • C#多线程之线程同步WaitHandle

    2022-08-10 10:16:12
  • datatable生成excel和excel插入图片示例详解

    2022-06-18 21:48:38
  • java容器详细解析

    2023-08-23 16:13:38
  • asp之家 软件编程 m.aspxhome.com