详解Java 信号量Semaphore

作者:java小新人 时间:2021-12-22 11:10:36 

Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数;

一.简单使用

同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行:


package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
 //创建一个信号量的实例,信号量初始值为0
 static Semaphore semaphore = new Semaphore(0);

public static void main(String[] args) throws Exception {
   ExecutorService pool = Executors.newFixedThreadPool(3);
   pool.submit(()->{
     System.out.println("Thread1---start");
     //信号量加一
     semaphore.release();
   });

pool.submit(()->{
     System.out.println("Thread2---start");
     //信号量加一
     semaphore.release();
   });
   pool.submit(()->{
     System.out.println("Thread3---start");
     //信号量加一
     semaphore.release();
   });
   //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
   semaphore.acquire(2);
   System.out.println("两个子线程执行完毕");

//关闭线程池,正在执行的任务继续执行
   pool.shutdown();

}

}

详解Java 信号量Semaphore

这个信号量也可以复用,类似CyclicBarrier:


package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
 //创建一个信号量的实例,信号量初始值为0
 static Semaphore semaphore = new Semaphore(0);

public static void main(String[] args) throws Exception {
   ExecutorService pool = Executors.newFixedThreadPool(3);
   pool.submit(()->{
     System.out.println("Thread1---start");
     //信号量加一
     semaphore.release();
   });

pool.submit(()->{
     System.out.println("Thread2---start");
     //信号量加一
     semaphore.release();
   });

//等待两个子线程执行完毕就放过,必须要信号量等于2才放过
   semaphore.acquire(2);
   System.out.println("子线程1,2执行完毕");

pool.submit(()->{
     System.out.println("Thread3---start");
     //信号量加一
     semaphore.release();
   });
   pool.submit(()->{
     System.out.println("Thread4---start");
     //信号量加一
     semaphore.release();
   });

semaphore.acquire(2);
   System.out.println("子线程3,4执行完毕");

//关闭线程池,正在执行的任务继续执行
   pool.shutdown();

}

}

详解Java 信号量Semaphore

二.信号量原理

看看下面这个图,可以知道信号量Semaphore还是根据AQS实现的,内部有个Sync工具类操作AQS,还分为公平策略和非公平策略;

详解Java 信号量Semaphore

构造器:


//默认是非公平策略
public Semaphore(int permits) {
 sync = new NonfairSync(permits);
}
//可以根据第二个参数选择是公平策略还是非公平策略
public Semaphore(int permits, boolean fair) {
 sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire(int permits)方法:


public void acquire(int permits) throws InterruptedException {
 if (permits < 0) throw new IllegalArgumentException();
 sync.acquireSharedInterruptibly(permits);
}

//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
   throws InterruptedException {
 if (Thread.interrupted()) throw new InterruptedException();
 //这里根据子类是公平策略还是非公平策略
 if (tryAcquireShared(arg) < 0)
   //获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park方法挂起当前线程
   doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
 return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
 //一个无限循环,获取state剩余的信号量,因为每调用一次release()方法的话,信号量就会加一,这里将
 //最新的信号量减去传进来的参数比较,比如有两个线程,其中一个线程已经调用了release方法,然后调用acquire(2)方法,那么
 //这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;如果另外一个线程也调用了release方法,
 //那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
 //
 for (;;) {
   int available = getState();
   int remaining = available - acquires;
   if (remaining < 0 || compareAndSetState(available, remaining))
     return remaining;
 }
}
//公平策略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平模式一样的了
protected int tryAcquireShared(int acquires) {
 for (;;) {
   if (hasQueuedPredecessors())
     return -1;
   int available = getState();
   int remaining = available - acquires;
   if (remaining < 0 ||compareAndSetState(available, remaining))
     return remaining;
 }
}

再看看release(int permits)方法:


//这个方法的作用就是将信号量加一
public void release(int permits) {
 if (permits < 0) throw new IllegalArgumentException();
 sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
 //tryReleaseShared尝试释放资源
 if (tryReleaseShared(arg)) {
   //释放资源成功就调用park方法唤醒唤醒AQS队列中最前面的节点中的线程
   doReleaseShared();
   return true;
 }
 return false;
}

protected final boolean tryReleaseShared(int releases) {
 //一个无限循环,获取state,然后加上传进去的参数,如果新的state的值小于旧的state,说明已经超过了state的最大值,溢出了
 //没有溢出的话,就用CAS更新state的值
 for (;;) {
   int current = getState();
   int next = current + releases;
   if (next < current) // overflow
     throw new Error("Maximum permit count exceeded");
   if (compareAndSetState(current, next))
     return true;
 }
}

private void doReleaseShared() {

for (;;) {
   Node h = head;
   if (h != null && h != tail) {
     int ws = h.waitStatus;
     //ws==Node.SIGNAL表示节点中线程需要被唤醒
     if (ws == Node.SIGNAL) {
       if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
         continue;      // loop to recheck cases
       //调用阻塞队列中线程的unpark方法唤醒线程
       unparkSuccessor(h);
     }
     //ws == 0表示节点中线程是初始状态
     else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
       continue;        // loop on failed CAS
   }

if (h == head)          // loop if head changed
     break;
 }
}

以最上面的例子简单说一下,其实不是很难,首先线程1和线程2分别去调用release方法,这个方法里面会将AQS中的state加一,但是在执行这个操作之前,主线程肯定会先到acquire(2),在这个方法里面,假如默认使用非公平策略,首先获取当前的信号量state(state的初始值是0),用当前信号量减去2,如果小于0,那么当前主线程就会丢到AQS队列中阻塞;

这个时候线程1的release方法执行了,于是就把信号量state加一(此时state==1),CAS更新state为一,成功的话,就调用doReleaseShared()方法唤醒AQS阻塞队列中最先挂起的线程(这里就是因为调用acquire方法而阻塞的主线程),主线程唤醒之后又会去获取最新的信号量,与2比较,发现还是小于0,于是又会阻塞;

线程2此时的release方法执行完成,重复线程一的操作,主线程唤醒之后(此时state==2),又去获取最新的信号量发现是2,减去acquire方法的参数2等于0,于是就用CAS更新state的值,然后acquire方法也就执行完毕,主线程继续执行后面的代码;

其实信号量还是很有意思的,记得在项目里,有人利用信号量实现了一个故障隔离,什么时候我可以把整理之后的代码贴出来分享一下,还是很有意思的,就跟springcloud的熔断机制差不多,场景是:比如你在service的一个方法调用第三方的接口,你不知道调不调得通,而且你不希望每次前端过来都会去调用,比如当调用失败的次数超过100次,那么五分钟之后才会再去实际调用这个第三方服务!这五分钟内前调用这个服务,就会触发我们这个故障隔离的机制,向前端返回一个特定的错误码和错误信息!

来源:https://www.cnblogs.com/wyq1995/p/12319707.html

标签:Java,信号量,Semaphore
0
投稿

猜你喜欢

  • springboot 自定义权限标签(tld),在freemarker引用操作

    2023-11-23 06:20:15
  • Android实现动态定值范围效果的控件

    2023-10-15 02:00:37
  • Hibernate实现悲观锁和乐观锁代码介绍

    2022-04-25 09:43:45
  • Java多线程之synchronized关键字的使用

    2023-12-12 21:46:16
  • Linux中Java开发常用软件安装方法总结

    2022-03-11 16:21:03
  • C# dump系统lsass内存和sam注册表详细

    2021-06-26 12:19:53
  • Android如何获取系统通知的开启状态详解

    2021-12-28 05:45:09
  • 详解java中BigDecimal精度问题

    2021-08-17 10:24:59
  • java通过ip获取客户端Mac地址的小例子

    2021-12-22 06:37:07
  • IDEA2020如何打开Run Dashboard的方法步骤

    2023-02-10 15:29:10
  • Android使用Intent.ACTION_SEND分享图片和文字内容的示例代码

    2023-12-17 02:57:28
  • C#中的应用程序接口介绍及实现,密封类与密封方法

    2023-10-24 07:37:41
  • 详解如何将Spring Boot应用跑在Docker容器中

    2023-04-25 08:08:58
  • Java多线程Future松获取异步任务结果轻松实现

    2022-11-22 15:49:38
  • C++类和对象之类的6个默认成员函数详解

    2022-01-05 13:50:35
  • SpringBoot线程池和Java线程池的使用和实现原理解析

    2022-06-27 07:22:30
  • c#基数排序Radix sort的实现方法

    2021-07-25 02:02:21
  • Java 客户端操作 FastDFS 实现文件上传下载替换删除功能

    2022-06-01 15:01:38
  • Java锁之可重入锁介绍

    2021-06-01 03:05:19
  • Java流程控制break和continue

    2023-06-16 09:49:54
  • asp之家 软件编程 m.aspxhome.com