blocking
blockingqueue 使用方法笔记
本例介绍一个特殊的队列:BlockingQueue,它是阻塞式队列,如果从BlockingQueue中读数据,此时BlockingQueue为空这个操作会被阻塞进入等待状态,直到BlockingQueue写入元素会被唤醒,同理如果BlockingQueue是满的,任何写入操作 会被阻塞进入等待状态,直到BlockingQueue里面有空间才被唤醒。
一、BlockingQueue常用方法:
1)add(E e):
添加元素,如果BlockingQueue可以容纳,则返回true,否则报异常
2)offer(E e):
添加元素,如果BlockingQueue可以容纳,则返回true,否则返回false.
3)put(E e):
添加元素,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
4)poll(long timeout, TimeUnit timeUnit):
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等timeout参数规定的时间,取不到时返回null
5)take():
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
二、BlockingQueue常用实现类
1)ArrayBlockingQueue:
有界的先入先出顺序队列,构造方法确定队列的大小.
2)LinkedBlockingQueue:
无界的先入先出顺序队列,构造方法提供两种,一种初始化队列大小,队列即有界;第二种默认构造方法,队列无界(有界即 integer.MAX_VALUE)
4)SynchronousQueue:
特殊的BlockingQueue,没有空间的队列,即必须有取的方法阻塞在这里的时候才能放入元素。
3)priorityBlockingQueue:
支持优先级的阻塞队列 ,存入对象必须实现Comparator接口 (需要注意的是 队列不是在加入元素的时候进行排序,而是取出的时候,根据Comparator来决定优先级最高的)。
如下是阻塞队列的简单实现:
package com.lyq.jsoup.MyQueue;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MyQueue {
private List<Object> mq = new LinkedList<>();
private AtomicInteger couter = new AtomicInteger();
private int maxSize = 20;
private int minSize = 0;
private Object lock = new Object();
public MyQueue(int minSize,int maxSize){
this.minSize = minSize;
this.maxSize = maxSize;
}
public Object take(){
Object ret = null;
synchronized (lock){
if (this.couter.get() == minSize){
try {
lock.wait();
} catch (InterruptedException e) {
e.printstacktrace();
}
}
ret = mq.remove(0);
System.out.println("take:"+ret);
couter.decrementAndGet();
lock.notify();
}
return ret;
}
public void put(Object obj){
synchronized (lock){
if (this.couter.get() == maxSize){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
mq.add(obj);
System.out.println("put:"+obj);
couter.incrementAndGet();
lock.notify();
}
}
@Override
public String toString() {
return "MyQueue{" +
"mq=" + mq +
", couter=" + couter +
", max=" + maxSize +
", min=" + minSize +
", lock=" + lock +
'}';
}
public static void main(String[] args){
MyQueue mq = new MyQueue(3,5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
new Thread(new Runnable() {
@Override
public void run() {
mq.put("f");
mq.put("g");
mq.put("h");
}
},"t1").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
mq.take();
mq.take();
mq.take();
}
},"t2").start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(mq.toString());
}
}
参考文章1
相关阅读
1:BlockingQueue继承关系java.util.concurrent 包里的 BlockingQueue是一个接口, 继承Queue接口,Queue接口继承 Collection Blockin