Java并发性BlockingQueue接口


一个java.util.concurrent.BlockingQueue接口是Queue接口的一个子接口,另外还支持诸如在获取元素之前等待队列变为非空以及在存储元素之前等待队列中的空间变得可用的操作。

BlockingQueue方法

Sr.No. 方法和描述
1 boolean add(E e) 如果可以立即执行而不违反容量限制,则将指定的元素插入此队列,成功时返回true,如果当前没有空间,则返回IllegalStateException。
2 boolean contains(Object o) 如果此队列包含指定的元素,则返回true。
3 int drainTo(Collection <?super E> c) 从该队列中移除所有可用的元素,并将它们添加到给定的集合中。
4 int drainTo(Collection <?super E> c,int maxElements) 最多从此队列中移除给定数量的可用元素,并将它们添加到给定集合中。
5 boolean offer(E e) 如果可以立即执行而不违反容量限制,则将指定的元素插入此队列,成功时返回true,如果当前没有可用空间,则返回false。
6 boolean offer(E e, long timeout, TimeUnit unit) 将指定的元素插入此队列中,如有必要,等待指定的等待时间以使空间变为可用。
7 E poll(long timeout, TimeUnit unit) 检索并删除此队列的头部,如果元素变为可用,则等待达到指定的等待时间。
8 void put(E e) 将指定的元素插入此队列,等待空间变得可用时。
9 int remainingCapacity() 如果没有内部限制,则返回此队列理想情况下(在没有内存或资源约束的情况下)接受而不阻止的附加元素的数量,或者返回Integer.MAX_VALUE。
10 boolean remove(Object o) 从该队列中移除指定元素的单个实例(如果存在)。
11 E take() 检索并删除此队列的头部,如果需要等待,直到元素变为可用。

以下TestThread程序显示在基于线程的环境中使用BlockingQueue接口。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }  


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }    
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {

         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

输出

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27