Java 多线程并发编程提高数据处理效率的详细过程

作者:ReadThroughLife 时间:2021-06-29 04:19:39 

🎉工作场景中遇到这样一个需求:根据主机的 IP 地址联动更新其他模型的相关信息。需求很简单,只涉及一般的数据库联动查询以及更新操作,然而在编码实现过程中发现,由于主机的数量很多,导致循环遍历查询、更新时花费很长的时间,调用一次接口大概需要 30-40 min 时间才能完成操作。

💡因此,为了有效缩短接口方法的执行时间,便考虑使用多线程并发编程方法,利用多核处理器并行执行的能力,通过异步处理数据的方式,便可以大大缩短执行时间,提高执行效率。

📍这里使用可重用固定线程数的线程池 FixedThreadPool,并使用 CountDownLatch 并发工具类提供的并发流程控制工具作为配合使用,保证多线程并发编程过程中的正常运行:

  • 首先,通过 Runtime.getRuntime().availableProcessors() 方法获取运行机器的 CPU 线程数,用于后续设置固定线程池的线程数量。

  • 其次,判断任务的特性,如果为计算密集型任务则设置线程数为 CPU 线程数+1,如果为 IO 密集型任务则设置线程数为 2 * CPU 线程数,由于在方法中需要与数据库进行频繁的交互,因此属于 IO 密集型任务。

  • 之后,对数据进行分组切割,每个线程处理一个分组的数据,分组的组数与线程数保持一致,并且还要创建计数器对象 CountDownLatch,调用构造函数,初始化参数值为线程数个数,保证主线程等待所有子线程运行结束后,再进行后续的操作。

  • 然后,调用 executorService.execute() 方法,重写 run 方法编写业务逻辑与数据处理代码,执行完当前线程后记得将计数器减1操作。

  • 最后,当所有子线程执行完成后,关闭线程池。

✨在省略工作场景中的业务逻辑代码后,通用的处理方法示例如下所示:

public ResponseData updateHostDept() {
// ...
List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
       // split the hostMapList for the following multi-threads task
       // return the number of logical CPUs
       int processorsNum = Runtime.getRuntime().availableProcessors();
       // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
       // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
       int threadNum = processorsNum * 2;  
       // the number of each group data
       int eachGroupNum = hostMapList.size() / threadNum;
       List<List<Map>> groupList = new ArrayList<>();
       for (int i = 0; i < threadNum; i++) {
           int start = i * eachGroupNum;
           if (i == threadNum - 1) {
               int end = mapList.size();
               groupList.add(hostMapList.subList(start, end));
           } else {
               int end = (i+1) * eachGroupNum;
               groupList.add(hostMapList.subList(start, end));
           }
       }
       // update data by using multi-threads asynchronously
       ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2);
       CountDownLatch countDownLatch = new CountDownLatch(threadNum);
       for (List<Map> group : groupList) {
           executorService.execute(()->{
               try {
                   for (Map map : group) {
                   // update the data in mongodb
                   }
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
               // let counter minus one
                   countDownLatch.countDown();  
               }
           });
       }
       try {
       // main thread donnot execute until all child threads finish
           countDownLatch.await();  
       } catch (Exception e) {
           e.printStackTrace();
       }
       // remember to shutdown the threadPool
       executorService.shutdown();  
       return ResponseData.success();
}

🎉那么在使用多线程异步更新的策略后,从当初调用接口所需的大概时间为 30-40 min 下降到了 8-10 min,大大提高了执行效率。

💡需要注意的是,这里使用的 newFixedThreadPool 创建线程池,它有一个缺陷就是,它的阻塞队列默认是一个 * 队列,默认值为 Integer.MAX_VALUE 极有可能会造成 OOM 问题。因此,一般可以使用 ThreadPoolExecutor 来创建线程池,自己可以指定等待队列中的线程个数,避免产生 OOM 问题。

public ResponseData updateHostDept() {
// ...
List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host");
       // split the hostMapList for the following multi-threads task
       // return the number of logical CPUs
       int processorsNum = Runtime.getRuntime().availableProcessors();
       // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks,
       // if Computing Tasks set the threadNum as (the number of logical  CPUs) + 1
       int threadNum = processorsNum * 2;  
       // the number of each group data
       int eachGroupNum = hostMapList.size() / threadNum;
       List<List<Map>> groupList = new ArrayList<>();
       for (int i = 0; i < threadNum; i++) {
           int start = i * eachGroupNum;
           if (i == threadNum - 1) {
               int end = mapList.size();
               groupList.add(hostMapList.subList(start, end));
           } else {
               int end = (i+1) * eachGroupNum;
               groupList.add(hostMapList.subList(start, end));
           }
       }
       // update data by using multi-threads asynchronously
       ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS,
               new ArrayBlockingQueue<>(100));
       CountDownLatch countDownLatch = new CountDownLatch(threadNum);
       for (List<Map> group : groupList) {
           executor.execute(()->{
               try {
                   for (Map map : group) {
                   // update the data in mongodb
                   }
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
               // let counter minus one
                   countDownLatch.countDown();  
               }
           });
       }
       try {
       // main thread donnot execute until all child threads finish
           countDownLatch.await();  
       } catch (Exception e) {
           e.printStackTrace();
       }
       // remember to shutdown the threadPool
       executor.shutdown();  
       return ResponseData.success();
}

在上述的代码中,核心线程数和最大线程数分别为 5 和 8,并没有设置的很大的值,因为如果如果设置的很大,线程间频繁的上下文切换也会增加时间消耗,反而不能最大程度上发挥多线程的优势。至于如何选择合适的参数,需要根据机器的参数以及任务的类型综合考虑决定。

🎉最后补充一点,如果想要通过非编码的方式获取机器的 CPU 线程个数也很简单,windows 系统通过任务管理器,选择 &ldquo;性能&rdquo;,便可以查看 CPU 线程个数的情况,如下图所示:

Java 多线程并发编程提高数据处理效率的详细过程

🎉从上图可以看到,我的机器中内核是八个 CPU,但是通过超线程技术一个物理的 CPU 核心可以模拟成两个逻辑 CPU 线程,因此我的机器是支持8核16线程的。

来源:https://blog.csdn.net/weixin_43252521/article/details/127143078

标签:Java,多线程,数据处理效率
0
投稿

猜你喜欢

  • 关于java入门与java开发环境配置详细教程

    2023-11-24 14:21:44
  • Java深入学习图形用户界面GUI之事件处理

    2023-11-29 14:14:57
  • 浅谈Spring Security LDAP简介

    2022-09-19 20:30:06
  • Java读取本地json文件及相应处理方法

    2023-10-16 16:37:34
  • 在SpringBoot中通过jasypt进行加密解密的方法

    2023-11-15 21:29:23
  • Java编程BigDecimal用法实例分享

    2022-05-02 05:40:06
  • Struts 2中的constant配置详解

    2023-11-10 08:18:18
  • Java @Deprecated注解的作用及传递性

    2023-08-11 12:55:05
  • SpringBoot项目中遇到的BUG问题及解决方法

    2022-01-19 14:44:38
  • 使用itextpdf操作pdf的实例讲解

    2022-11-16 00:22:43
  • 通过实例解析Socket套接字通信原理

    2023-11-02 20:17:35
  • 一文带你深入了解Java泛型

    2022-02-10 05:38:02
  • 利用Java8 Optional类优雅如何地解决空指针问题

    2023-07-30 04:58:13
  • 带你了解Java的类和对象

    2022-05-08 09:10:21
  • springboot大文件上传、分片上传、断点续传、秒传的实现

    2023-06-16 02:18:30
  • spring boot实战之本地jar包引用示例

    2021-11-01 20:44:45
  • Spring Data Jpa框架最佳实践示例

    2021-11-25 00:43:01
  • Java泛型定义与用法实例详解

    2023-11-25 11:50:28
  • Spring MVC接口防数据篡改和重复提交

    2023-11-29 15:02:11
  • Java执行SQL脚本文件到数据库详解

    2023-08-08 08:30:00
  • asp之家 软件编程 m.aspxhome.com