浅谈Java中的Queue家族

作者:flydean 时间:2021-12-23 08:36:49 

Queue接口

先看下Queue的继承关系和其中定义的方法:

浅谈Java中的Queue家族

Queue继承自Collection,Collection继承自Iterable。

Queue有三类主要的方法,我们用个表格来看一下他们的区别:

方法类型方法名称方法名称区别
Insertaddoffer两个方法都表示向Queue中添加某个元素,不同之处在于添加失败的情况,add只会返回true,如果添加失败,会抛出异常。offer在添加失败的时候会返回false。所以对那些有固定长度的Queue,优先使用offer方法。
Removeremovepoll如果Queue是空的情况下,remove会抛出异常,而poll会返回null。
Examineelementpeek获取Queue头部的元素,但不从Queue中删除。两者的区别还是在于Queue为空的情况下,element会抛出异常,而peek返回null。

注意,因为对poll和peek来说null是有特殊含义的,所以一般来说Queue中禁止插入null,但是在实现中还是有一些类允许插入null比如LinkedList。

尽管如此,我们在使用中还是要避免插入null元素。

Queue的分类

一般来说Queue可以分为BlockingQueue,Deque和TransferQueue三种。

BlockingQueue

BlockingQueue是Queue的一种实现,它提供了两种额外的功能:

当当前Queue是空的时候,从BlockingQueue中获取元素的操作会被阻塞。当当前Queue达到最大容量的时候,插入BlockingQueue的操作会被阻塞。

BlockingQueue的操作可以分为下面四类:

操作类型Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable

第一类是会抛出异常的操作,当遇到插入失败,队列为空的时候抛出异常。

第二类是不会抛出异常的操作。

第三类是会Block的操作。当Queue为空或者达到最大容量的时候。

第四类是time out的操作,在给定的时间里会Block,超时会直接返回。

BlockingQueue是线程安全的Queue,可以在生产者消费者模式的多线程中使用,如下所示:

class Producer implements Runnable {   private final BlockingQueue queue;   Producer(BlockingQueue q) { queue = q; }   public void run() {     try {       while (true) { queue.put(produce()); }     } catch (InterruptedException ex) { ... handle ...}   }   Object produce() { ... } } class Consumer implements Runnable {   private final BlockingQueue queue;   Consumer(BlockingQueue q) { queue = q; }   public void run() {     try {       while (true) { consume(queue.take()); }     } catch (InterruptedException ex) { ... handle ...}   }   void consume(Object x) { ... } } class Setup {   void main() {     BlockingQueue q = new SomeQueueImplementation();     Producer p = new Producer(q);     Consumer c1 = new Consumer(q);     Consumer c2 = new Consumer(q);     new Thread(p).start();     new Thread(c1).start();     new Thread(c2).start();   } }

最后,在一个线程中向BlockQueue中插入元素之前的操作happens-before另外一个线程中从BlockQueue中删除或者获取的操作。

Deque

Deque是Queue的子类,它代表double ended queue,也就是说可以从Queue的头部或者尾部插入和删除元素。

同样的,我们也可以将Deque的方法用下面的表格来表示,Deque的方法可以分为对头部的操作和对尾部的操作:

方法类型Throws exceptionSpecial valueThrows exceptionSpecial value
InsertaddFirst(e)offerFirst(e)addLast(e)offerLast(e)
RemoveremoveFirst()pollFirst()removeLast()pollLast()
ExaminegetFirst()peekFirst()getLast()peekLast()

和Queue的方法描述基本一致,这里就不多讲了。

当Deque以 FIFO (First-In-First-Out)的方法处理元素的时候,Deque就相当于一个Queue。

当Deque以LIFO (Last-In-First-Out)的方式处理元素的时候,Deque就相当于一个Stack。

TransferQueue

TransferQueue继承自BlockingQueue,为什么叫Transfer呢?因为TransferQueue提供了一个transfer的方法,生产者可以调用这个transfer方法,从而等待消费者调用take或者poll方法从Queue中拿取数据。

还提供了非阻塞和timeout版本的tryTransfer方法以供使用。

我们举个TransferQueue实现的生产者消费者的问题。

先定义一个生产者:


@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
   private TransferQueue<String> transferQueue;

private String name;

private Integer messageCount;

public static final AtomicInteger messageProduced = new AtomicInteger();

@Override
   public void run() {
       for (int i = 0; i < messageCount; i++) {
           try {
               boolean added = transferQueue.tryTransfer( "第"+i+"个", 2000, TimeUnit.MILLISECONDS);
               log.info("transfered {} 是否成功: {}","第"+i+"个",added);
               if(added){
                   messageProduced.incrementAndGet();
               }
           } catch (InterruptedException e) {
               log.error(e.getMessage(),e);
           }
       }
       log.info("total transfered {}",messageProduced.get());
   }
}

在生产者的run方法中,我们调用了tryTransfer方法,等待2秒钟,如果没成功则直接返回。

再定义一个消费者:


@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

private TransferQueue<String> transferQueue;

private String name;

private int messageCount;

public static final AtomicInteger messageConsumed = new AtomicInteger();

@Override
   public void run() {
       for (int i = 0; i < messageCount; i++) {
           try {
               String element = transferQueue.take();
               log.info("take {}",element );
               messageConsumed.incrementAndGet();
               Thread.sleep(500);
           } catch (InterruptedException e) {
               log.error(e.getMessage(),e);
           }
       }
       log.info("total consumed {}",messageConsumed.get());
   }

}

在run方法中,调用了transferQueue.take方法来取消息。

下面先看一下一个生产者,零个消费者的情况:


@Test
public void testOneProduceZeroConsumer() throws InterruptedException {

TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
   ExecutorService exService = Executors.newFixedThreadPool(10);
   Producer producer = new Producer(transferQueue, "ProducerOne", 5);

exService.execute(producer);

exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
   exService.shutdown();
}

输出结果:

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0

可以看到,因为没有消费者,所以消息并没有发送成功。

再看下一个有消费者的情况:


@Test
public void testOneProduceOneConsumer() throws InterruptedException {

TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
   ExecutorService exService = Executors.newFixedThreadPool(10);
   Producer producer = new Producer(transferQueue, "ProducerOne", 2);
   Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

exService.execute(producer);
   exService.execute(consumer);

exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
   exService.shutdown();
}

输出结果:

[pool-1-thread-2] INFO com.flydean.Consumer - take 第0个

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: true

[pool-1-thread-2] INFO com.flydean.Consumer - take 第1个

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: true

[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2

[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2

可以看到Producer和Consumer是一个一个来生产和消费的。

来源:https://www.cnblogs.com/flydean/p/java-queue-overview.html

标签:Java,Queue
0
投稿

猜你喜欢

  • Android Java调用自己C++类库的实例讲解

    2023-06-16 19:09:17
  • Java 阻塞队列和线程池原理分析

    2022-04-19 11:57:45
  • C#利用Task实现任务超时多任务一起执行的方法

    2023-07-04 20:03:38
  • QT实现QML侧边导航栏的最简方法

    2021-09-17 02:54:35
  • Java中避免NullPointerException的方法总结

    2021-08-29 08:09:24
  • Android Flutter实现五种酷炫文字动画效果详解

    2023-06-27 02:57:16
  • 了解Java虚拟机JVM的基本结构及JVM的内存溢出方式

    2023-02-20 03:08:51
  • android通过usb读取U盘的方法

    2023-03-14 07:43:25
  • c#访问this关键字和base关键字示例

    2021-09-28 23:36:14
  • Java实现插入公式到PPT的示例代码

    2023-11-12 03:04:41
  • opencv3/C++图像滤波实现方式

    2023-06-23 15:37:08
  • Android studio 快捷键大全

    2022-03-27 14:58:56
  • java.lang.Runtime.exec的左膀右臂:流输入和流读取详解

    2023-08-06 04:59:03
  • 采用C#实现软件自动更新的方法

    2021-12-30 19:13:38
  • Flutter 剪裁组件的使用

    2023-06-18 13:15:04
  • pom文件中${project.basedir}的使用

    2021-12-24 04:24:11
  • Java线程池源码的深度解析

    2023-10-02 19:33:40
  • Java简单工厂模式详细解释

    2021-09-20 16:12:07
  • java 类加载机制和反射详解及实例代码

    2023-11-30 06:42:20
  • Android 内存泄漏的几种可能总结

    2022-02-27 21:43:26
  • asp之家 软件编程 m.aspxhome.com