自定义阻塞队列

今天重温了下 java 多线程中的 notify() 方法以及 wait() 方法,一时兴起,决定通过这俩个方法,实现一个简易的自定义阻塞队列。

阻塞队列是什么,与普通队列的区别是什么?
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来。

  1. 新建一个 MyQueue.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    import java.util.LinkedList;
    import java.util.concurrent.atomic.AtomicInteger;
    import com.xiaoleilu.hutool.util.StrUtil;
    /**
    * 使用 notify() 和 wait() 实现自定义阻塞队列
    *
    * @author Yangkai.Shen
    * @version 1.0
    * @date 2017.08.02 at 11:51:14
    */
    public class MyQueue {
    // 1. 承载数据的容器
    private LinkedList<Object> queue = new LinkedList<Object>();
    // 2. 计数器,用于判定边界
    private AtomicInteger count = new AtomicInteger(0);
    private final int minSize = 0;
    // 3. 初始化一个对象,用于加锁
    private final Object lock = new Object();
    private final int maxSize;
    public MyQueue(int maxSize) {
    this.maxSize = maxSize;
    }
    /**
    * 添加一个元素到队列中,如果队列元素已满,则调用此方法的线程被阻塞,直到存在多余空间了,再进行添加
    *
    * @param obj 添加 obj 到队列尾部
    */
    public void put(Object obj) {
    synchronized (lock) {
    // 1.没有多余空间,就阻塞线程
    while (count.get() == this.maxSize) {
    try {
    lock.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    // 2.添加元素
    queue.add(obj);
    // 3.计数器累加
    count.incrementAndGet();
    System.out.println(StrUtil.format("新加入的元素为:{}", obj));
    // 4.唤醒其他线程(若本来元素为空,有线程调用 get 方法,那么原本被阻塞的,需要在此时被唤醒)
    lock.notify();
    }
    }
    /**
    * 获取一个元素,如果队列元素为空,则调用此方法的线程被阻塞,直到添加新元素了,再进行获取
    *
    * @return 返回队列的第一个元素
    */
    public Object get() {
    Object ret = null;
    synchronized (lock) {
    // 1.没有元素,就阻塞线程
    while (count.get() == this.minSize) {
    try {
    lock.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    // 2.取第一个元素
    ret = queue.removeFirst();
    // 3.计数器递减
    count.decrementAndGet();
    System.out.println(StrUtil.format("移除的元素为:{}", ret));
    // 4.唤醒其他线程(若元素本来已满,有线程调用 put 方法,那么原本被阻塞的,需要在此时被唤醒)
    lock.notify();
    }
    return ret;
    }
    public int getSize() {
    return this.count.get();
    }
    }
  2. 新建一个测试类 MyQueueTest.java,测试类中,我们初始化一个队列,并将元素填满,然后启动一个线程 t1,去插入数据,中间休眠2s,再去启动一个线程 t2 取数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    import com.xiaoleilu.hutool.util.StrUtil;
    import java.util.concurrent.TimeUnit;
    public class MyQueueTest {
    public static void main(String[] args) {
    final MyQueue queue = new MyQueue(5);
    queue.put("a");
    queue.put("b");
    queue.put("c");
    queue.put("d");
    queue.put("e");
    System.out.println(StrUtil.format("当前队列的长度: {}", queue.getSize()));
    Thread t1 = new Thread(() -> {
    queue.put("f");
    queue.put("g");
    queue.put("h");
    }, "t1");
    Thread t2 = new Thread(() -> {
    queue.get();
    queue.get();
    });
    t1.start();
    try {
    TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    t2.start();
    }
    }
  3. 启动测试类,查看运行结果。控制台如果应该出现的效果是,队列先初始化完成,然后休眠2s,接下来先取数据,再插入数据,则证明阻塞队列生效。下面是控制台运行的效果:

    • 初始化队列
      初始化队列
    • 休眠2s后取队首元素,再插入队尾元素
      取队首元素,插入队尾元素
    • 此时我们会发现,程序还未停止,因为此时队列已满,但是线程 t1 还未插入 h 元素,因此线程被阻塞着,直至下次队列有空余空间才会被唤醒。
  4. 至此,一个自定义阻塞队列就已经实现了。
  5. 细心的朋友会发现,我打印的 log 里用到了一个 StrUtil.format() 方法,这个和 slf4j 的 log 用法一致,可以使用占位符。这个是用到了一个国产良心工具类,hutool,国产开源,需要大家的支持,觉得好用的话,期望可以去 码云 或者 github 上给个 Star 吧!
o(╯□╰)o我只要一毛钱的鼓励~~