生产者消费者模式
生产者-消费者模式在服务端编程中,是一种很常见的设计模式,比如消息队列的实现,就是这种思想。本文就是用Java语言编写一个简单的生产者消费者例子,从而引出concurrent包下的阻塞队列和ReentrantLock一些玩法。
##基础知识
首先复习下基础知识,在Java中concurrent包下并发队列分为阻塞队列和非阻塞队列,ConcurrentLinkedQueue是非阻塞队列,底层实现用了CAS。阻塞队列包括LinkedBlockingQueue,LinkedBlockingDeque,LinkedTransferQueue,ArrayBlockingQueue,阻塞队列底层是靠ReentrantLock实现。Condition包括await,signal,signalAll,Condition作为条件锁
我们知道Lock的本质是AQS,AQS自己维护的队列是当前等待资源的队列,AQS会在被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。
而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列。
但是,两个队列的作用不同的,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
所以,发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。 signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程,本质是将节点从Condition队列中取出来一个还是所有节点放到AQS的等待队列。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
##生产者-消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public class Productor { static LinkedBlockingQueue<Integer> blockQueue = new LinkedBlockingQueue<>(10);
void provide() throws InterruptedException { for (int i = 0; i < 10; i++) { blockQueue.offer(i); System.out.println("生产:" + i); Thread.sleep(100); } } }
|
1 2 3 4 5 6 7 8 9 10
|
public class Consumer { void cusume() throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { System.out.println("消费:" + Productor.blockQueue.take()); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| static void test0() { Productor productor = new Productor(); Consumer consumer = new Consumer(); Thread cusumerThread = new Thread(() -> { try { consumer.cusume(); } catch (Exception e) {
}
}); cusumerThread.start();
try { productor.provide();
cusumerThread.interrupt(); } catch (Exception e) { e.printStackTrace(); } }
输出: 生产:0 消费:0 生产:1 消费:1 生产:2 消费:2 生产:3 消费:3 生产:4 消费:4 生产:5 消费:5 生产:6 消费:6 生产:7 消费:7 消费:8 生产:8 生产:9 消费:9
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
public class MyBlockQueue<E> { private LinkedList<E> queue = new LinkedList<>(); private Object lock = new Object(); private ReentrantLock reentrantLock = new ReentrantLock(); Condition con = reentrantLock.newCondition();
void offer(E e) { queue.offer(e); synchronized (lock) { lock.notifyAll(); } }
E take() throws InterruptedException { if (queue.size() == 0) { synchronized (lock) { lock.wait(); } } return queue.poll(); }
void offer1(E e) throws InterruptedException {
try { reentrantLock.lockInterruptibly(); queue.offer(e); con.signalAll();
} finally { reentrantLock.unlock(); } }
E take1() throws InterruptedException { if (queue.size() == 0) { try { reentrantLock.lockInterruptibly(); con.await();
} finally { reentrantLock.unlock(); } } return queue.poll(); } }
|