今天重温了下 java 多线程中的 notify()
方法以及 wait()
方法,一时兴起,决定通过这俩个方法,实现一个简易的自定义阻塞队列。
阻塞队列是什么,与普通队列的区别是什么?
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来。
新建一个
MyQueue.java
类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import com.xiaoleilu.hutool.util.StrUtil;
/**
* 使用 notify() 和 wait() 实现自定义阻塞队列
*
* @author Yangkai.Shen
* @version 1.0
* @date 2017.08.02 at 11:51:14
*/
public class MyQueue {
// 1. 承载数据的容器
private LinkedList<Object> queue = new LinkedList<Object>();
// 2. 计数器,用于判定边界
private AtomicInteger count = new AtomicInteger(0);
private final int minSize = 0;
// 3. 初始化一个对象,用于加锁
private final Object lock = new Object();
private final int maxSize;
public MyQueue(int maxSize) {
this.maxSize = maxSize;
}
/**
* 添加一个元素到队列中,如果队列元素已满,则调用此方法的线程被阻塞,直到存在多余空间了,再进行添加
*
* @param obj 添加 obj 到队列尾部
*/
public void put(Object obj) {
synchronized (lock) {
// 1.没有多余空间,就阻塞线程
while (count.get() == this.maxSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 2.添加元素
queue.add(obj);
// 3.计数器累加
count.incrementAndGet();
System.out.println(StrUtil.format("新加入的元素为:{}", obj));
// 4.唤醒其他线程(若本来元素为空,有线程调用 get 方法,那么原本被阻塞的,需要在此时被唤醒)
lock.notify();
}
}
/**
* 获取一个元素,如果队列元素为空,则调用此方法的线程被阻塞,直到添加新元素了,再进行获取
*
* @return 返回队列的第一个元素
*/
public Object get() {
Object ret = null;
synchronized (lock) {
// 1.没有元素,就阻塞线程
while (count.get() == this.minSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 2.取第一个元素
ret = queue.removeFirst();
// 3.计数器递减
count.decrementAndGet();
System.out.println(StrUtil.format("移除的元素为:{}", ret));
// 4.唤醒其他线程(若元素本来已满,有线程调用 put 方法,那么原本被阻塞的,需要在此时被唤醒)
lock.notify();
}
return ret;
}
public int getSize() {
return this.count.get();
}
}新建一个测试类
MyQueueTest.java
,测试类中,我们初始化一个队列,并将元素填满,然后启动一个线程t1
,去插入数据,中间休眠2s,再去启动一个线程t2
取数据。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import com.xiaoleilu.hutool.util.StrUtil;
import java.util.concurrent.TimeUnit;
public class MyQueueTest {
public static void main(String[] args) {
final MyQueue queue = new MyQueue(5);
queue.put("a");
queue.put("b");
queue.put("c");
queue.put("d");
queue.put("e");
System.out.println(StrUtil.format("当前队列的长度: {}", queue.getSize()));
Thread t1 = new Thread(() -> {
queue.put("f");
queue.put("g");
queue.put("h");
}, "t1");Thread t2 = new Thread(() -> { queue.get(); queue.get(); }); t1.start(); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } t2.start(); }
}
启动测试类,查看运行结果。控制台如果应该出现的效果是,队列先初始化完成,然后休眠2s,接下来先取数据,再插入数据,则证明阻塞队列生效。下面是控制台运行的效果:
- 初始化队列
- 休眠2s后取队首元素,再插入队尾元素
- 此时我们会发现,程序还未停止,因为此时队列已满,但是线程
t1
还未插入h
元素,因此线程被阻塞着,直至下次队列有空余空间才会被唤醒。
- 初始化队列
至此,一个自定义阻塞队列就已经实现了。
细心的朋友会发现,我打印的 log 里用到了一个
StrUtil.format()
方法,这个和slf4j
的 log 用法一致,可以使用占位符。这个是用到了一个国产良心工具类,hutool,国产开源,需要大家的支持,觉得好用的话,期望可以去 码云 或者 github 上给个 Star 吧!