Java中的阻塞队列的学习使用。

一、阻塞队列介绍(BlockingQueue)

1.BlockingQueue是一种支持两个附加操作的队列,这两个操作是:

(1)获取元素的操作会在队列为空时进行等待,直至队列为非空时继续;
(2)存储元素的操作会在队列满时进行等待,直至队列出现可用空间时继续。

2.BlockingQueue的方法以四种形式出现:

BlockingQueue不能够存入null元素,会抛出NullPointerException异常,null 被用作指示 poll 操作失败的警戒值。

3.BlockingQueue主要用于实现生产者-消费者模式,生产者线程向队列容器中添加元素,消费者线程从队列中取出元素。BlockingQueue实现是线程安全的,所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。

二、BlockingQueue的常见实现类

1.ArrayBlockingQueue一个由数组结构组成的有界阻塞队列,遵循“先进先出”原则对元素进行排序。队列空间的大小固定,一旦创建以后就不可以再增加容量。试图向 已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

默认情况下不保证对访问者的公平访问队列,即先阻塞的线程先访问。但是可以通过构造方法中设置fairness(公平性)属性为true,创建公平的访问队列。公平性会对性能产生影响,但也减少了可变性和不平衡性。

​ ArrayBlockingQueue mArrayBlockingQueue = new ArrayBlockingQueue(10,true);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static ArrayBlockingQueue<String> mArrayBlockingQueue = new ArrayBlockingQueue(10,true);
public static void main(String[] args){
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
private static class Producer implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
mArrayBlockingQueue.put("数据:"+i);
//若队列空间已满,会进入阻塞状态
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable{
@Override
public void run() {
while (true){
try {
System.out.println(mArrayBlockingQueue.take());
//当队列为空时,会进入阻塞状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

2.LinkedBlockingQueue

一个基于链表的阻塞队列,其实现的队列可以不指定队列大小,默认是Integer.MAX_VALUE。
其实现队列中的锁是分离的,生产者和消费者可以并行的操作队列中的数据,从而使得并发执行数据的效率更高。实现方式和ArrayBlockingQueue 基本一致。

3.PriorityBlockingQueue

一种优先级队列,队列中的元素按照优先级排列。队列中存储的对象必须实现Comparable接口,确定对象的优先级。
其比较规则:
(1)当前对象和其他对象比较,如果如果compare方法返回-1,那么其优先级高于比较的对象。
(2)compare方法返回0,则两个优先级相等。
(3)compare方法返回1,则优先级低于比较的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CommandPriority implements Comparable<CommandPriority> {
private Command command;
private int priority;
public CommandPriority(Command command, int priority) {
this.command = command;
this.priority = priority;
}
public Command getCommand() {
return command;
}
@Override
public String toString() {
return "CommandPriority{" +
"command=" + command +
", priority=" + priority +
'}';
}
@Override
public int compareTo(@NonNull CommandPriority o) {
if (this.priority > o.priority) {
return -1;
} else if (this.priority < o.priority) {
return 1;
} else {
return 0;
}
}
}

4.SynchronousQueue

一个没有数据缓冲的队列,生产者对其的插入操作put后进入阻塞状态直至消费者执行take操作后才执行下一次操作,反之亦然;它的内部由于没有数据缓存空间,所以不能使用peek来查看队列中是否存在元素,其返回值永远为null。

5.DelayQueue

一个无界的阻塞队列,队列中的元素需要实现Delayed接口,Delayed接口继承Comparable接口,其元素存入队列时需要进行比较,比较的基准为延时的时间值,它是一种内部由PriorityQueue实现的变体。队列中的元素在到期时才能从队列中取出,对头的元素是最紧急任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//JAVA编程思想中的例子
class DelayedTask implements Runnable, Delayed {
private static int current = 0;
private final int id = current++;
private final int detail;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<>();
public DelayedTask(int delayedInMillisecond) {
detail = delayedInMillisecond;
trigger = System.nanoTime() + NANOSECONDS.convert(detail, MILLISECONDS);
sequence.add(this);
}
@Override
public void run() {
System.out.println(this + "aa");
}
@Override
public long getDelay(TimeUnit unit) {
//获取剩余延长时间
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
//进行比较
DelayedTask that = (DelayedTask) o;
if (trigger < that.trigger) return -1;
if (trigger > that.trigger) return 1;
return 0;
}
}