详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

作者:wavebeed 时间:2022-03-22 13:03:02 

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。

内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。


public class {
  //Synchronization control For CountDownLatch.
  //Uses AQS state to represent count.
 private static final class Sync extends AbstractQueuedSynchronizer {
   private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
     setState(count);
   }

int getCount() {
     return getState();
   }

protected int tryAcquireShared(int acquires) {
     return (getState() == 0) ? 1 : -1;
   }

protected boolean tryReleaseShared(int releases) {
     // Decrement count; signal when transition to zero
     for (;;) {
       int c = getState();
       if (c == 0)
         return false;
       int nextc = c-1;
       if (compareAndSetState(c, nextc))
         return nextc == 0;
     }
   }
 }

private final Sync sync;
 ... ...//
}

public class CyclicBarrier {
 /**
  * Each use of the barrier is represented as a generation instance.
  * The generation changes whenever the barrier is tripped, or
  * is reset. There can be many generations associated with threads
  * using the barrier - due to the non-deterministic way the lock
  * may be allocated to waiting threads - but only one of these
  * can be active at a time (the one to which {@code count} applies)
  * and all the rest are either broken or tripped.
  * There need not be an active generation if there has been a break
  * but no subsequent reset.
  */
 private static class Generation {
   boolean broken = false;
 }

/** The lock for guarding barrier entry */
 private final ReentrantLock lock = new ReentrantLock();
 /** Condition to wait on until tripped */
 private final Condition trip = lock.newCondition();
 /** The number of parties */
 private final int parties;
 /* The command to run when tripped */
 private final Runnable barrierCommand;
 /** The current generation */
 private Generation generation = new Generation();

/**
  * Number of parties still waiting. Counts down from parties to 0
  * on each generation. It is reset to parties on each new
  * generation or when broken.
  */
 private int count;

/**
  * Updates state on barrier trip and wakes up everyone.
  * Called only while holding lock.
  */
 private void nextGeneration() {
   // signal completion of last generation
   trip.signalAll();
   // set up next generation
   count = parties;
   generation = new Generation();
 }

/**
  * Sets current barrier generation as broken and wakes up everyone.
  * Called only while holding lock.
  */
 private void breakBarrier() {
   generation.broken = true;
   count = parties;
   trip.signalAll();
 }

/**
  * Main barrier code, covering the various policies.
  */
 private int dowait(boolean timed, long nanos)
   throws InterruptedException, BrokenBarrierException,
       TimeoutException {
   final ReentrantLock lock = this.lock;
   lock.lock();
   try {
     final Generation g = generation;

if (g.broken)
       throw new BrokenBarrierException();

if (Thread.interrupted()) {
       breakBarrier();
       throw new InterruptedException();
     }

int index = --count;
     if (index == 0) { // tripped
       boolean ranAction = false;
       try {
         final Runnable command = barrierCommand;
         if (command != null)
           command.run();
         ranAction = true;
         nextGeneration();
         return 0;
       } finally {
         if (!ranAction)
           breakBarrier();
       }
     }

// loop until tripped, broken, interrupted, or timed out
     for (;;) {
       try {
         if (!timed)
           trip.await();
         else if (nanos > 0L)
           nanos = trip.awaitNanos(nanos);
       } catch (InterruptedException ie) {
         if (g == generation && ! g.broken) {
           breakBarrier();
           throw ie;
         } else {
           // We're about to finish waiting even if we had not
           // been interrupted, so this interrupt is deemed to
           // "belong" to subsequent execution.
           Thread.currentThread().interrupt();
         }
       }

if (g.broken)
         throw new BrokenBarrierException();

if (g != generation)
         return index;

if (timed && nanos <= 0L) {
         breakBarrier();
         throw new TimeoutException();
       }
     }
   } finally {
     lock.unlock();
   }
 }
 ... ... //
}

实战 - 展示各自的使用场景


/**
*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
*/
public class UseCountDownLatch {

static CountDownLatch latch = new CountDownLatch(6);

/*初始化线程*/
 private static class InitThread implements Runnable{

public void run() {
     System.out.println("Thread_"+Thread.currentThread().getId()
        +" ready init work......");
     latch.countDown();
     for(int i =0;i<2;i++) {
       System.out.println("Thread_"+Thread.currentThread().getId()
          +" ........continue do its work");
     }
   }
 }

/*业务线程等待latch的计数器为0完成*/
 private static class BusiThread implements Runnable{

public void run() {
     try {
       latch.await();
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
     for(int i =0;i<3;i++) {
       System.out.println("BusiThread_"+Thread.currentThread().getId()
          +" do business-----");
     }
   }
 }

public static void main(String[] args) throws InterruptedException {
   new Thread(new Runnable() {
     public void run() {
       SleepTools.ms(1);
       System.out.println("Thread_"+Thread.currentThread().getId()
          +" ready init work step 1st......");
       latch.countDown();
       System.out.println("begin step 2nd.......");
       SleepTools.ms(1);
       System.out.println("Thread_"+Thread.currentThread().getId()
          +" ready init work step 2nd......");
       latch.countDown();
     }
   }).start();
   new Thread(new BusiThread()).start();
   for(int i=0;i<=3;i++){
     Thread thread = new Thread(new InitThread());
     thread.start();
   }
   latch.await();
   System.out.println("Main do ites work........");
 }
}

/**
*类说明:共4个子线程,他们全部完成工作后,交出自己结果,
*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
*/
class UseCyclicBarrier {
 private static CyclicBarrier barrier
     = new CyclicBarrier(4,new CollectThread());

//存放子线程工作结果的容器
 private static ConcurrentHashMap<String,Long> resultMap
     = new ConcurrentHashMap<String,Long>();

public static void main(String[] args) {
   for(int i=0;i<4;i++){
     Thread thread = new Thread(new SubThread());
     thread.start();
   }

}

/*汇总的任务*/
 private static class CollectThread implements Runnable{

@Override
   public void run() {
     StringBuilder result = new StringBuilder();
     for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
       result.append("["+workResult.getValue()+"]");
     }
     System.out.println(" the result = "+ result);
     System.out.println("do other business........");
   }
 }

/*相互等待的子线程*/
 private static class SubThread implements Runnable{
   @Override
   public void run() {
     long id = Thread.currentThread().getId();
     resultMap.put(Thread.currentThread().getId()+"",id);
     try {
         Thread.sleep(1000+id);
         System.out.println("Thread_"+id+" ....do something ");
       barrier.await();
       Thread.sleep(1000+id);
       System.out.println("Thread_"+id+" ....do its business ");
       barrier.await();
     } catch (Exception e) {
       e.printStackTrace();
     }
   }
 }
}

 两者总结

1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;

来源:https://blog.51cto.com/14815984/2496009

标签:java,CountDownLatch,CyclicBarrier
0
投稿

猜你喜欢

  • Java 集合框架之List 的使用(附小游戏练习)

    2023-11-24 10:33:40
  • springboot 按月分表的实现方式

    2023-11-25 00:03:47
  • Mybatis Limit实现分页功能

    2022-03-14 13:57:57
  • springboot整合quartz项目使用案例

    2023-02-13 19:57:12
  • Spring中事务几个常见的问题解决

    2022-04-07 00:52:01
  • 带你了解Java常用类小结

    2023-04-15 14:38:26
  • spring boot集成smart-doc自动生成接口文档详解

    2023-11-28 23:08:02
  • Flutter使用sqflite处理数据表变更的方法详解

    2023-10-21 11:05:49
  • 关于MyBatis模糊查询的几种实现方式

    2023-05-09 04:23:12
  • C#串口连接的读取和发送详解

    2022-05-11 10:04:47
  • Java 中 hashCode() 与 equals() 的关系(面试)

    2023-08-29 18:03:57
  • SpringBoot依赖管理的源码解析

    2022-10-29 23:37:14
  • Java异常处理try catch的基本使用

    2023-11-24 05:04:38
  • 自己动手写的mybatis分页插件(极其简单好用)

    2023-11-01 18:12:09
  • Java实现简单台球游戏

    2022-06-28 23:55:59
  • 使用注解@Validated和BindingResult对入参进行非空校验方式

    2022-09-16 11:30:44
  • SpringMVC接收复杂集合对象(参数)代码示例

    2023-01-29 18:33:51
  • Java Condition条件变量提高线程通信效率

    2022-11-26 13:32:46
  • Java web访问localhost报404错误问题的解决方法

    2023-07-27 05:28:55
  • activemq整合springboot使用方法(个人微信小程序用)

    2023-07-08 22:29:55
  • asp之家 软件编程 m.aspxhome.com