详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

作者:wavebeed 时间:2022-03-22 13:03:02 

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。

内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。


public class {
  //Synchronization control For CountDownLatch.
  //Uses AQS state to represent count.
 private static final class Sync extends AbstractQueuedSynchronizer {
   private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
     setState(count);
   }

int getCount() {
     return getState();
   }

protected int tryAcquireShared(int acquires) {
     return (getState() == 0) ? 1 : -1;
   }

protected boolean tryReleaseShared(int releases) {
     // Decrement count; signal when transition to zero
     for (;;) {
       int c = getState();
       if (c == 0)
         return false;
       int nextc = c-1;
       if (compareAndSetState(c, nextc))
         return nextc == 0;
     }
   }
 }

private final Sync sync;
 ... ...//
}

public class CyclicBarrier {
 /**
  * Each use of the barrier is represented as a generation instance.
  * The generation changes whenever the barrier is tripped, or
  * is reset. There can be many generations associated with threads
  * using the barrier - due to the non-deterministic way the lock
  * may be allocated to waiting threads - but only one of these
  * can be active at a time (the one to which {@code count} applies)
  * and all the rest are either broken or tripped.
  * There need not be an active generation if there has been a break
  * but no subsequent reset.
  */
 private static class Generation {
   boolean broken = false;
 }

/** The lock for guarding barrier entry */
 private final ReentrantLock lock = new ReentrantLock();
 /** Condition to wait on until tripped */
 private final Condition trip = lock.newCondition();
 /** The number of parties */
 private final int parties;
 /* The command to run when tripped */
 private final Runnable barrierCommand;
 /** The current generation */
 private Generation generation = new Generation();

/**
  * Number of parties still waiting. Counts down from parties to 0
  * on each generation. It is reset to parties on each new
  * generation or when broken.
  */
 private int count;

/**
  * Updates state on barrier trip and wakes up everyone.
  * Called only while holding lock.
  */
 private void nextGeneration() {
   // signal completion of last generation
   trip.signalAll();
   // set up next generation
   count = parties;
   generation = new Generation();
 }

/**
  * Sets current barrier generation as broken and wakes up everyone.
  * Called only while holding lock.
  */
 private void breakBarrier() {
   generation.broken = true;
   count = parties;
   trip.signalAll();
 }

/**
  * Main barrier code, covering the various policies.
  */
 private int dowait(boolean timed, long nanos)
   throws InterruptedException, BrokenBarrierException,
       TimeoutException {
   final ReentrantLock lock = this.lock;
   lock.lock();
   try {
     final Generation g = generation;

if (g.broken)
       throw new BrokenBarrierException();

if (Thread.interrupted()) {
       breakBarrier();
       throw new InterruptedException();
     }

int index = --count;
     if (index == 0) { // tripped
       boolean ranAction = false;
       try {
         final Runnable command = barrierCommand;
         if (command != null)
           command.run();
         ranAction = true;
         nextGeneration();
         return 0;
       } finally {
         if (!ranAction)
           breakBarrier();
       }
     }

// loop until tripped, broken, interrupted, or timed out
     for (;;) {
       try {
         if (!timed)
           trip.await();
         else if (nanos > 0L)
           nanos = trip.awaitNanos(nanos);
       } catch (InterruptedException ie) {
         if (g == generation && ! g.broken) {
           breakBarrier();
           throw ie;
         } else {
           // We're about to finish waiting even if we had not
           // been interrupted, so this interrupt is deemed to
           // "belong" to subsequent execution.
           Thread.currentThread().interrupt();
         }
       }

if (g.broken)
         throw new BrokenBarrierException();

if (g != generation)
         return index;

if (timed && nanos <= 0L) {
         breakBarrier();
         throw new TimeoutException();
       }
     }
   } finally {
     lock.unlock();
   }
 }
 ... ... //
}

实战 - 展示各自的使用场景


/**
*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
*/
public class UseCountDownLatch {

static CountDownLatch latch = new CountDownLatch(6);

/*初始化线程*/
 private static class InitThread implements Runnable{

public void run() {
     System.out.println("Thread_"+Thread.currentThread().getId()
        +" ready init work......");
     latch.countDown();
     for(int i =0;i<2;i++) {
       System.out.println("Thread_"+Thread.currentThread().getId()
          +" ........continue do its work");
     }
   }
 }

/*业务线程等待latch的计数器为0完成*/
 private static class BusiThread implements Runnable{

public void run() {
     try {
       latch.await();
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
     for(int i =0;i<3;i++) {
       System.out.println("BusiThread_"+Thread.currentThread().getId()
          +" do business-----");
     }
   }
 }

public static void main(String[] args) throws InterruptedException {
   new Thread(new Runnable() {
     public void run() {
       SleepTools.ms(1);
       System.out.println("Thread_"+Thread.currentThread().getId()
          +" ready init work step 1st......");
       latch.countDown();
       System.out.println("begin step 2nd.......");
       SleepTools.ms(1);
       System.out.println("Thread_"+Thread.currentThread().getId()
          +" ready init work step 2nd......");
       latch.countDown();
     }
   }).start();
   new Thread(new BusiThread()).start();
   for(int i=0;i<=3;i++){
     Thread thread = new Thread(new InitThread());
     thread.start();
   }
   latch.await();
   System.out.println("Main do ites work........");
 }
}

/**
*类说明:共4个子线程,他们全部完成工作后,交出自己结果,
*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
*/
class UseCyclicBarrier {
 private static CyclicBarrier barrier
     = new CyclicBarrier(4,new CollectThread());

//存放子线程工作结果的容器
 private static ConcurrentHashMap<String,Long> resultMap
     = new ConcurrentHashMap<String,Long>();

public static void main(String[] args) {
   for(int i=0;i<4;i++){
     Thread thread = new Thread(new SubThread());
     thread.start();
   }

}

/*汇总的任务*/
 private static class CollectThread implements Runnable{

@Override
   public void run() {
     StringBuilder result = new StringBuilder();
     for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
       result.append("["+workResult.getValue()+"]");
     }
     System.out.println(" the result = "+ result);
     System.out.println("do other business........");
   }
 }

/*相互等待的子线程*/
 private static class SubThread implements Runnable{
   @Override
   public void run() {
     long id = Thread.currentThread().getId();
     resultMap.put(Thread.currentThread().getId()+"",id);
     try {
         Thread.sleep(1000+id);
         System.out.println("Thread_"+id+" ....do something ");
       barrier.await();
       Thread.sleep(1000+id);
       System.out.println("Thread_"+id+" ....do its business ");
       barrier.await();
     } catch (Exception e) {
       e.printStackTrace();
     }
   }
 }
}

 两者总结

1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;

来源:https://blog.51cto.com/14815984/2496009

标签:java,CountDownLatch,CyclicBarrier
0
投稿

猜你喜欢

  • Android RecyclerView多类型布局卡片解决方案

    2022-06-18 17:39:09
  • Java类加载基本过程详细介绍

    2023-04-15 10:54:14
  • 详解Android中Notification的使用方法

    2023-02-07 13:17:39
  • 在Maven下代理服务器设定的方式

    2023-10-15 02:17:13
  • Android自定义View图片按Path运动和旋转

    2022-09-15 22:53:11
  • Spring依赖注入的三种方式小结

    2022-08-09 15:56:41
  • C#中异步回调函数用法实例

    2023-01-05 13:10:53
  • Java常用集合与原理解析

    2023-04-01 14:26:42
  • 详解Java volatile 内存屏障底层原理语义

    2023-05-08 19:25:47
  • java中的SpringBoot框架

    2022-02-10 16:50:05
  • Android自定义View实现水平带数字百分比进度条

    2022-03-17 02:47:06
  • Android CheckBox中设置padding无效解决办法

    2022-09-07 01:35:48
  • Java Controller实现参数验证与统一异常处理流程详细讲解

    2022-01-25 18:49:47
  • Java实现简单的万年历

    2023-11-29 08:31:24
  • 一篇文章带你入门Java接口

    2023-11-06 02:07:55
  • Android仿一点资讯收藏Toast动画效果

    2022-01-15 18:42:33
  • Android编程简单实现ImageView点击时背景图修改的方法

    2023-05-23 11:27:40
  • Java8之lambda最佳实践_动力节点Java学院整理

    2023-11-28 00:07:28
  • c语言版本二叉树基本操作示例(先序 递归 非递归)

    2023-03-17 23:40:25
  • Mybatis-Plus支持GBase8s分页查询的实现示例

    2021-11-21 14:33:30
  • asp之家 软件编程 m.aspxhome.com