AQS同步组件Semaphore信号量案例剖析

作者:共饮一杯无 时间:2023-11-27 14:27:04 

基本概念

Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制并发访问资源的线程个数。

AQS同步组件Semaphore信号量案例剖析

例如排队买票的情况,如果只有三个窗口,那么同一时间最多也只能有三个人买票。第四个人来了之后就必须在后面等着,只有其他人买好了,才可以去相应的窗口进行买票 。

作用和使用场景

  • 用于保证同一时间并发访问线程的数目。

  • 信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。

  • 在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。

使用场景:仅能提供有限访问的资源。比如数据库连接。

源码分析

构造函数

/**
*接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表*示允许10个线程获取许可证,
*也就是最大并发数是10。
*
* @param permits 可用许可证的初始数量。
**/
public Semaphore(int permits) {
       sync = new NonfairSync(permits);
}
/**
* 使用给定的许可数量和给定的公平性设置
*
* @param permits 可用许可证的初始数量。
*
* @param fair 指定是公平模式还是非公平模式,默认非公平模式 . 公平模式:先启动的线程优先得到
* 许可。 非公平模式:先启动的线程并不一定先获得许可,谁抢到谁就获得许可。
*/
public Semaphore(int permits, boolean fair) {
   sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

常用方法

acquire() 获取一个许可

acquire(int permits) 获取指定个数的许可

tryAcquire()方法尝试获取1个许可证

tryAcquire(long timeout, TimeUnit unit) 最大等待许可的时间

tryAcquire(int permits) 获取指定个数的许可

tryAcquire(int permits, long timeout, TimeUnit unit) 最大等待许可的时间

availablePermits() : 返回此信号量中当前可用的许可证数

release() 释放许可

release(int permits) 释放指定个数的许可

int getQueueLength() 返回正在等待获取许可证的线程数。

boolean hasQueuedThreads() 是否有线程正在等待获取许可证。

void reducePermits(int reduction) 减少reduction个许可证。是个protected方法。

Collection getQueuedThreads() 返回所有等待获取许可证的线程集合。是个protected方法。

使用案例

acquire()获取单个许可

/**
    * 线程数量
    */
   private final static int threadCount = 15;
   public static void main(String[] args) throws Exception {
       ExecutorService exec = Executors.newCachedThreadPool();
       final Semaphore semaphore = new Semaphore(3);
       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   //获取一个许可
                   semaphore.acquire();
                   test(threadNum);
                   //释放一个许可
                   semaphore.release();
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }
   private static void test(int threadNum) throws Exception {
       // 模拟请求的耗时操作
       Thread.sleep(1000);
       log.info("{}", threadNum);
   }

输出结果:

AQS同步组件Semaphore信号量案例剖析

根据输出结果的时间可以看出来同一时间最多只能3个线程执行,符合预期

acquire(int permits)获取多个许可

/**
    * 线程数量
    */
   private final static int threadCount = 15;
   public static void main(String[] args) throws Exception {
       ExecutorService exec = Executors.newCachedThreadPool();
       //信号量设置为3,也就是最大并发量为3,同时只允许3个线程获得许可
       final Semaphore semaphore = new Semaphore(3);
       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   //获取多个许可
                   semaphore.acquire(3);
                   test(threadNum);
                   //释放多个许可
                   semaphore.release(3);
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }
   private static void test(int threadNum) throws Exception {
       // 模拟请求的耗时操作
       Thread.sleep(1000);
       log.info("{}", threadNum);
   }

输出结果:

AQS同步组件Semaphore信号量案例剖析

设置了3个许可,每个线程每次获取3个许可,因此同一时间只能有1个线程执行 。

tryAcquire()获取许可

tryAcquire()尝试获取一个许可,如果未获取到,不等待,将直接丢弃该线程不执行

/**
    * 线程数量
    */
   private final static int threadCount = 15;
   public static void main(String[] args) throws Exception {
       ExecutorService exec = Executors.newCachedThreadPool();
       //信号量设置为3,也就是最大并发量为3,同时只允许3个线程获得许可
       final Semaphore semaphore = new Semaphore(3);
       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   //尝试获取一个许可,如果未获取到,不等待,将直接丢弃该线程不执行
                   if(semaphore.tryAcquire()) {
                       test(threadNum);
                       //释放许可
                       semaphore.release();
                   }
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }
   private static void test(int threadNum) throws Exception {
       // 模拟请求的耗时操作
       Thread.sleep(1000);
       log.info("{}", threadNum);
   }

输出结果:

AQS同步组件Semaphore信号量案例剖析

从输出可以看到,在3个线程获取到3个许可后,因为每个线程调用的方法要执行1秒中,最早的一个许可也要在1S后释放,剩下的17个线程未获取到许可,使用了semaphore.tryAcquire()方法,没有设置等待时间,所以便直接被丢弃,不执行了。

tryAcquire(long timeout, TimeUnit unit)

tryAcquire(long timeout, TimeUnit unit)未获取到许可,设置等待时长

/**
    * 线程数量
    */
   private final static int threadCount = 15;
   public static void main(String[] args) throws Exception {
       ExecutorService exec = Executors.newCachedThreadPool();
       //信号量设置为3,也就是最大并发量为3,同时只允许3个线程获得许可
       final Semaphore semaphore = new Semaphore(3);
       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   //设置了获取许可等待时间为2秒,如果两秒后还是未获得许可的线程便得不到执行
                   if(semaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) {
                       test(threadNum);
                       //释放许可
                       semaphore.release();
                   }
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }
   private static void test(int threadNum) throws Exception {
       // 模拟请求的耗时操作
       Thread.sleep(1000);
       log.info("{}", threadNum);
   }

输出结果:

AQS同步组件Semaphore信号量案例剖析

tryAcquire通过参数指定了2秒的等待时间。 上述代码中同一时间最多执行3个。第4个线程因前3个线程执行需要耗时一秒未释放许可,因此需要等待。

但是由于设置了2秒的等待时间,所以在5秒内等待到了释放的许可,继续执行,循环往复。

但是15个线程 ,每秒并发3个,2S是执行不完的。所以上面执行到第6个(0开始,显示是5)就结束了,【每次执行结果会有差异,取决于CPU】,并没有全部执行完15个线程。

来源:https://juejin.cn/post/7129010239994593317

标签:AQS,同步组件,Semaphore,信号量
0
投稿

猜你喜欢

  • java递归菜单树转换成pojo对象

    2022-08-12 04:04:40
  • Springboot打成war包并在tomcat中运行的部署方法

    2022-06-29 07:53:32
  • Java NIO Path接口和Files类配合操作文件的实例

    2023-10-20 09:29:01
  • C#实现生成mac地址与IP地址注册码的两种方法

    2022-07-14 20:14:58
  • 解决SpringMVC使用@RequestBody注解报400错误的问题

    2022-02-26 16:06:43
  • SpringCloud Edgware.SR3版本中Ribbon的timeout设置方法

    2023-03-07 10:45:46
  • SpringBoot中整合Minio文件存储的安装部署过程

    2022-02-21 01:28:14
  • Java读取Map的两种方法与对比

    2021-08-08 20:56:55
  • Java8新特性:函数式编程

    2021-12-01 03:09:02
  • MAC算法之消息摘要算法HmacMD5的实现

    2023-02-13 08:03:28
  • 教你3分钟了解Android 简易时间轴的实现方法

    2023-04-02 20:56:19
  • Java游戏俄罗斯方块的实现实例

    2022-11-26 12:35:49
  • Spring Boot 整合持久层之Spring Data JPA

    2022-07-29 04:00:38
  • Android中的广播、服务、数据库、通知、包等术语的原理和介绍(图解)

    2023-01-20 13:57:52
  • JAVA如何调用wsdl过程详解

    2021-08-12 22:00:18
  • 轻松学习C#的ArrayList类

    2022-08-07 18:53:34
  • Android结合xml实现帧动画

    2023-10-07 22:57:51
  • C# memcache 使用介绍

    2022-11-25 12:25:57
  • 详解springboot集成mybatis xml方式

    2022-08-05 09:04:54
  • Android列表实现(2)_游标列表案例讲解

    2022-11-15 16:49:55
  • asp之家 软件编程 m.aspxhome.com