详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别
作者:wavebeed 时间:2022-03-22 13:03:02
前言
CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。
内部实现差异
前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。
public class {
//Synchronization control For CountDownLatch.
//Uses AQS state to represent count.
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
... ...//
}
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
... ... //
}
实战 - 展示各自的使用场景
/**
*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
*/
public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
/*初始化线程*/
private static class InitThread implements Runnable{
public void run() {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work......");
latch.countDown();
for(int i =0;i<2;i++) {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ........continue do its work");
}
}
}
/*业务线程等待latch的计数器为0完成*/
private static class BusiThread implements Runnable{
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i =0;i<3;i++) {
System.out.println("BusiThread_"+Thread.currentThread().getId()
+" do business-----");
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for(int i=0;i<=3;i++){
Thread thread = new Thread(new InitThread());
thread.start();
}
latch.await();
System.out.println("Main do ites work........");
}
}
/**
*类说明:共4个子线程,他们全部完成工作后,交出自己结果,
*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
*/
class UseCyclicBarrier {
private static CyclicBarrier barrier
= new CyclicBarrier(4,new CollectThread());
//存放子线程工作结果的容器
private static ConcurrentHashMap<String,Long> resultMap
= new ConcurrentHashMap<String,Long>();
public static void main(String[] args) {
for(int i=0;i<4;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
}
/*汇总的任务*/
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result);
System.out.println("do other business........");
}
}
/*相互等待的子线程*/
private static class SubThread implements Runnable{
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId()+"",id);
try {
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do something ");
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do its business ");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
两者总结
1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;
2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;
3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;
4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;
5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;
6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;
来源:https://blog.51cto.com/14815984/2496009
标签:java,CountDownLatch,CyclicBarrier
0
投稿
猜你喜欢
Android RecyclerView多类型布局卡片解决方案
2022-06-18 17:39:09
Java类加载基本过程详细介绍
2023-04-15 10:54:14
详解Android中Notification的使用方法
2023-02-07 13:17:39
在Maven下代理服务器设定的方式
2023-10-15 02:17:13
Android自定义View图片按Path运动和旋转
2022-09-15 22:53:11
Spring依赖注入的三种方式小结
2022-08-09 15:56:41
C#中异步回调函数用法实例
2023-01-05 13:10:53
Java常用集合与原理解析
2023-04-01 14:26:42
详解Java volatile 内存屏障底层原理语义
2023-05-08 19:25:47
java中的SpringBoot框架
2022-02-10 16:50:05
Android自定义View实现水平带数字百分比进度条
2022-03-17 02:47:06
Android CheckBox中设置padding无效解决办法
2022-09-07 01:35:48
Java Controller实现参数验证与统一异常处理流程详细讲解
2022-01-25 18:49:47
Java实现简单的万年历
2023-11-29 08:31:24
一篇文章带你入门Java接口
2023-11-06 02:07:55
Android仿一点资讯收藏Toast动画效果
2022-01-15 18:42:33
Android编程简单实现ImageView点击时背景图修改的方法
2023-05-23 11:27:40
Java8之lambda最佳实践_动力节点Java学院整理
2023-11-28 00:07:28
c语言版本二叉树基本操作示例(先序 递归 非递归)
2023-03-17 23:40:25
Mybatis-Plus支持GBase8s分页查询的实现示例
2021-11-21 14:33:30