java实现MapReduce对文件进行切分的示例代码
作者:liangzai2048 时间:2023-10-07 21:46:59
比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?
1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总
例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中
创建MapTask
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MapTask extends Thread {
//用来接收具体的哪一个文件
private File file;
private int flag;
public MapTask(File file, int flag) {
this.file = file;
this.flag = flag;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
/**
* 统计班级人数HashMap存储
*/
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
br.close();
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String key = entry.getKey();
Integer value = entry.getValue();
bw.write(key + ":" + value);
bw.newLine();
}
bw.flush();
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建Map
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Map {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 多线程连接池(线程池)
ExecutorService executorService = Executors.newFixedThreadPool(8);
// 获取文件列表
File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
File[] files = file.listFiles();
//创建多线程对象
int flag = 0;
for (File f : files) {
//为每一个文件启动一个线程
MapTask mapTask = new MapTask(f, flag);
executorService.submit(mapTask);
flag++;
}
executorService.shutdown();
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
创建ClazzSum
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
public class ClazzSum {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
System.out.println(map);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
创建split128
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
public class Split128 {
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));
//用作标记文件,也作为文件名称
int index = 0;
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
ArrayList<String> list = new ArrayList<String>();
String line;
//用作累计读取了多少行数据
int flag = 0;
int row = 0;
while ((line = br.readLine()) != null) {
list.add(line);
flag++;
// flag = 140
if (flag == 140) {// 一个文件读写完成,生成新的文件
row = 0 + 128 * index;
for (int i = row; i <= row + 127; i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
/**
* 生成新的文件
* 计数清零
*/
index++;
flag = 12;
bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
}
}
//文件读取剩余128*1.1范围之内
for (int i = list.size() - flag; i < list.size(); i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
}
}
创建Reduce
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
public class Reduce {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
HashMap<String, Integer> map = new HashMap<String, Integer>();
File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
File[] files = file.listFiles();
for (File f : files) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
String clazz = line.split(":")[0];
int sum = Integer.valueOf(line.split(":")[1]);
if (!map.containsKey(clazz)) {
map.put(clazz, sum);
} else {
map.put(clazz, map.get(clazz) + sum);
}
}
}
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(map);
}
}
最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。
来源:https://blog.csdn.net/hujieliang123/article/details/122546452
标签:java,MapReduce,切分
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
Mybatis表的关联查询详情
2023-11-23 12:15:03
![](https://img.aspxhome.com/file/2023/3/85893_0s.png)
Android开发中Launcher3常见默认配置修改方法总结
2023-08-28 06:43:43
Java编程实现非对称加密的方法详解
2023-08-24 01:21:26
Java 反射机制原理与用法详解
2023-09-18 02:51:48
Android RecyclerView显示Item布局不一致解决办法
2023-04-25 06:50:32
三十分钟快速掌握C# 6.0知识点
2022-10-15 04:26:00
Java中输出字符的ASCII值实例
2023-02-27 08:59:11
![](https://img.aspxhome.com/file/2023/4/83444_0s.jpg)
java实现递归菜单树
2023-02-01 00:03:06
SpringBoot借助spring.factories文件跨模块实例化Bean
2021-12-01 18:22:41
![](https://img.aspxhome.com/file/2023/1/89561_0s.webp)
springboot2.0使用Hikari连接池的方法(替换druid)
2023-04-12 00:54:33
![](https://img.aspxhome.com/file/2023/1/70531_0s.png)
Java实现级联下拉结构的示例代码
2023-11-03 18:22:06
关于Spring Boot项目的 log4j2 核弹漏洞问题(一行代码配置搞定)
2022-08-26 03:04:20
![](https://img.aspxhome.com/file/2023/9/85539_0s.png)
一键设置java 环境变量 cmd下查看、修改(覆盖与添加)等说明
2023-03-29 07:09:25
![](https://img.aspxhome.com/file/2023/3/82033_0s.png)
Java和Ceylon对象的构造和验证
2022-04-05 04:28:37
spring boot 日志配置详解
2023-09-24 00:23:11
![](https://img.aspxhome.com/file/2023/9/125229_0s.png)
深入理解Spring中bean的生命周期介绍
2023-02-08 17:21:37
![](https://img.aspxhome.com/file/2023/8/61318_0s.png)
springBoot service层事务控制的操作
2022-02-01 12:39:40
C#多线程之线程同步WaitHandle
2022-08-10 10:16:12
![](https://img.aspxhome.com/file/2023/3/94623_0s.jpg)
datatable生成excel和excel插入图片示例详解
2022-06-18 21:48:38
![](https://img.aspxhome.com/file/2023/2/116592_0s.jpg)
java容器详细解析
2023-08-23 16:13:38
![](https://img.aspxhome.com/file/2023/1/119131_0s.png)