生产者消费者模式-Java

生产者消费者模式

生产者-消费者模式在服务端编程中,是一种很常见的设计模式,比如消息队列的实现,就是这种思想。本文就是用Java语言编写一个简单的生产者消费者例子,从而引出concurrent包下的阻塞队列和ReentrantLock一些玩法。

##基础知识

首先复习下基础知识,在Java中concurrent包下并发队列分为阻塞队列非阻塞队列,ConcurrentLinkedQueue是非阻塞队列,底层实现用了CAS。阻塞队列包括LinkedBlockingQueue,LinkedBlockingDeque,LinkedTransferQueue,ArrayBlockingQueue,阻塞队列底层是靠ReentrantLock实现。Condition包括await,signal,signalAll,Condition作为条件锁

我们知道Lock的本质是AQS,AQS自己维护的队列是当前等待资源的队列,AQS会在被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。

而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列。

但是,两个队列的作用不同的,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

  • 1、线程1调用reentrantLock.lock时,尝试获取锁。如果成功,则返回,从AQS的队列中移除线程;否则阻塞,保持在AQS的等待队列中。

  • 2、线程1调用await方法被调用时,对应操作是被加入到Condition的等待队列中,等待signal信号;同时释放锁。

  • 3、锁被释放后,会唤醒AQS队列中的头结点,所以线程2会获取到锁。
  • 4、线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒,只是被加入AQS等待队列。
  • 5、signal方法执行完毕,线程2调用unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,线程1被唤醒,线程1恢复执行。

所以,发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。 signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程,本质是将节点从Condition队列中取出来一个还是所有节点放到AQS的等待队列。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。

##生产者-消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 用阻塞队列实现生产-消费者模式
*/
public class Productor {
static LinkedBlockingQueue<Integer> blockQueue = new LinkedBlockingQueue<>(10);

void provide() throws InterruptedException {
for (int i = 0; i < 10; i++) {
blockQueue.offer(i);
System.out.println("生产:" + i);
Thread.sleep(100);
}
}
}
1
2
3
4
5
6
7
8
9
10
/**
* 基于阻塞队列消费者
*/
public class Consumer {
void cusume() throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("消费:" + Productor.blockQueue.take());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static void test0() {
//创建生产者
Productor productor = new Productor();
//创建消费者
Consumer consumer = new Consumer();
Thread cusumerThread = new Thread(() -> {
try {
consumer.cusume();
} catch (Exception e) {

}

});
cusumerThread.start();

try {
productor.provide();

cusumerThread.interrupt();
} catch (Exception e) {
e.printStackTrace();
}
}

输出:
生产:0
消费:0
生产:1
消费:1
生产:2
消费:2
生产:3
消费:3
生产:4
消费:4
生产:5
消费:5
生产:6
消费:6
生产:7
消费:7
消费:8
生产:8
生产:9
消费:9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 自己实现的简单的阻塞队列
*/
public class MyBlockQueue<E> {
private LinkedList<E> queue = new LinkedList<>();
//第一种实现方式,采用了Object wait和notify的方式
private Object lock = new Object();
//第二种实现方法,采用了ReentrantLock获取Condition,通过condition await和signal方式实现
private ReentrantLock reentrantLock = new ReentrantLock();
Condition con = reentrantLock.newCondition();

void offer(E e) {
queue.offer(e);
synchronized (lock) {
lock.notifyAll();
}
}

E take() throws InterruptedException {
if (queue.size() == 0) {
synchronized (lock) {
lock.wait();
}
}
return queue.poll();
}

void offer1(E e) throws InterruptedException {

try {
reentrantLock.lockInterruptibly();
queue.offer(e);
con.signalAll();

} finally {
reentrantLock.unlock();
}
}

E take1() throws InterruptedException {
if (queue.size() == 0) {
try {
reentrantLock.lockInterruptibly();
con.await();

} finally {
reentrantLock.unlock();
}
}
return queue.poll();
}
}

评论