生产者消费者模型

生产者消费者问题是研究多线程程序时绕不开的经典问题之一,比较典型的描述就是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。

解决生产者/消费者问题的方法可分为两类:

  1. 采用某种机制保护生产者和消费者之间的同步;
  2. 在生产者和消费者之间建立一个管道。

第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。因此本文只介绍同步机制实现的生产者/消费者问题。

同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。在Java中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。

  1. wait() / notify()方法
  2. await() / signal()方法
  3. BlockingQueue阻塞队列方法

一、wait、notify方法

1、方法介绍

前面介绍Object类的方法时提到过里面有几个自带的关于线程的方法,

  • wait
  • notify
  • notifyAll

这三个方法一般配合在一起使用,用于线程间的通信。

关于wait方法的标准写法如下:

1
2
3
4
5
6
7
8
// The standard idiom for calling the wait method in Java 
synchronized (sharedObject) {
while (condition) {
sharedObject.wait();
// (Releases lock, and reacquires on wakeup)
}
// do action based upon condition e.g. take or put into queue
}

我们可以利用wait()来让一个线程在某些条件下暂停运行。例如,在生产者消费者模型中,生产者线程在缓冲区为满的时候,消费者在缓冲区为空的时 候,都应该暂停运行。如果某些线程在等待某些条件触发,那当那些条件为真时,你可以用 notify 和 notifyAll 来通知那些等待中的线程重新开始运行。不同之处在于,notify 仅仅通知一个线程,并且我们不知道哪个线程会收到通知,然而 notifyAll 会通知所有等待中的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import java.util.LinkedList;

public class Storage {

// 仓库最大存储量
private final int MAX_SIZE = 100;

// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();

// 生产num个产品
public void produce(int num, Thread thread){
synchronized(list){
// 如果仓库剩余容量不足
while(list.size()+num>MAX_SIZE){
System.out.println(thread.getName()+"【要生产的产品数量】:" + num + "\t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try{
list.wait();
}catch(InterruptedException e){

}
}
// 生产条件满足情况下,生产num个产品
for(int i=0; i<num; i++){
list.add(new Object());
}
System.out.println("【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());
list.notifyAll();
}
}

// 消费num个产品
public void consume(int num, Thread thread){
synchronized(list){
// 如果仓库存储量不足
while(list.size()<num){
System.out.println(thread.getName()+"【要消费的产品数量】:" + num + "\t【库存量】:"
+ list.size() + "/t暂时不能执行消费任务!");
try{
list.wait();
}catch(InterruptedException e){

}
}
// 消费条件满足情况下,消费num个产品
for(int i=0; i<num; i++){
list.remove();
}
System.out.println("【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());
list.notifyAll();
}
}

static class Producer extends Thread{
Storage storage;// 所在放置的仓库
int num;// 每次生产的产品数量

public Producer(String name, Storage storage, int num){
super(name);
this.storage=storage;
this.num=num;
}

public void run(){
storage.produce(num, this);
}
}

static class Consumer extends Thread{
Storage storage;// 所在放置的仓库
int num;// 每次生产的产品数量

public Consumer(String name, Storage storage, int num){
super(name);
this.storage=storage;
this.num=num;
}

public void run(){
storage.consume(num, this);
}

}

public static void main(String[] args) {
Storage storage = new Storage(); // 仓库对象

// 生产者对象
Producer p1=new Producer("producer1", storage, 10);
Producer p2=new Producer("producer2", storage, 10);
Producer p3=new Producer("producer3", storage, 10);
Producer p4=new Producer("producer4", storage, 10);
Producer p5=new Producer("producer5", storage, 10);
Producer p6=new Producer("producer6", storage, 10);
Producer p7=new Producer("producer7", storage, 10);

// 消费者对象
Consumer c1=new Consumer("consumer1", storage, 50);
Consumer c2=new Consumer("consumer2", storage, 50);
Consumer c3=new Consumer("consumer3", storage, 50);

// 线程开始执行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
}

输出结果(每次的输出结果都是不一样的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
consumer1【要消费的产品数量】:50    【库存量】:0/t暂时不能执行消费任务!
【已经生产产品数】:10 【现仓储量为】:10
consumer1【要消费的产品数量】:50 【库存量】:10/t暂时不能执行消费任务!
【已经生产产品数】:10 【现仓储量为】:20
【已经生产产品数】:10 【现仓储量为】:30
【已经生产产品数】:10 【现仓储量为】:40
consumer3【要消费的产品数量】:50 【库存量】:40/t暂时不能执行消费任务!
【已经生产产品数】:10 【现仓储量为】:50
【已经消费产品数】:50 【现仓储量为】:0
consumer3【要消费的产品数量】:50 【库存量】:0/t暂时不能执行消费任务!
consumer1【要消费的产品数量】:50 【库存量】:0/t暂时不能执行消费任务!
【已经生产产品数】:10 【现仓储量为】:10
【已经生产产品数】:10 【现仓储量为】:20
consumer1【要消费的产品数量】:50 【库存量】:20/t暂时不能执行消费任务!
consumer3【要消费的产品数量】:50 【库存量】:20/t暂时不能执行消费任务!

实现类Storage中定义了produce和consume方法,供生产者和消费者线程调用,修改逻辑后生产者和消费者的代码完全不用改变

2、wait与sleep的区别

sleep方法

sleep()方法是Thread类的静态方法,使目前正在执行的线程休眠millis毫秒。

  • 当线程睡眠时,它睡在某个地方,在苏醒之前不会返回到可运行状态。
  • 当睡眠时间到期,则返回到可运行状态。
1
2
3
4
5
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

区别

这两个方法都会使线程进入阻塞状态,也都会抛出中断异常,但是它们有一下几点区别:

  1. 关于方法来源与作用:

    • wait是Object类的方法,用来线程间的通信
    • sleep是Thread类的方法,是线程用来控制自身流程的
  2. 关于锁的释放:

    • sleep方法不会释放锁
    • wait方法会释放锁
  3. 关于使用区域:

    • wait,notify和notifyAll只能在同步控制方法或者同步控制块里面使用
    • sleep可以在任何地方使用。

二、await、signal方法

在JDK5.0之后,Java提供了更加健壮的线程处理机制,包括同步、锁定、线程池等,它们可以实现更细粒度的线程控制。await()和signal()就是其中用来做同步的两种方法,它们的功能基本上和wait() / nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {


private final int MAX_SIZE = 100;// 仓库最大存储量

private LinkedList<Object> list = new LinkedList<Object>();// 仓库存储的载体

private final Lock lock=new ReentrantLock();// 锁
private Condition full=lock.newCondition();// 仓库满的条件变量
private Condition empty=lock.newCondition();// 仓库空的条件变量


// 生产num个产品
public void produce(int num, Thread thread){
lock.lock();
try{//显示锁最后必须显示释放
// 如果仓库剩余容量不足
System.out.println(thread.getName());
while(list.size()+num>MAX_SIZE){
System.out.println("【要生产的产品数量】:" + num + "\t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try{
full.await();
}catch(InterruptedException e){

}
}
// 生产条件满足情况下,生产num个产品
for(int i=0; i<num; i++){
list.add(new Object());
}
System.out.println("【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());
empty.signal();
}finally{
lock.unlock();
}
}

// 消费num个产品
public void consume(int num, Thread thread){
lock.lock();
try{
// 如果仓库存储量不足
System.out.println(thread.getName());
while(list.size()<num){
System.out.println("【要消费的产品数量】:" + num + "\t【库存量】:"
+ list.size() + "/t暂时不能执行消费任务!");
try{
empty.await();
}catch(InterruptedException e){

}
}
// 消费条件满足情况下,消费num个产品
for(int i=0; i<num; i++){
list.remove();
}
System.out.println("【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());
full.signal();
}finally{
lock.unlock();
}
}
/*
* 其余代码与上面完全一样
*/
}

这里体现出了设计上的好处,只要改变Storage类的代码即可。

从修改的地方可以看出await、signal方法与wait、notify非常相似,而且是他们的升级。

通过将两个条件分开到两个等待线程集中,Condition更容易满足单词通知的需求。signal比signalAll更高效,它极大地减少每次在缓存操作中发生的上下文切换与锁请求的次数,个人理解就是当生产者生产了以后,只通知消费者,而且只通知一个消费者。

三、BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。

  • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
  • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class BlockingQueueDemo {

static class Producer implements Runnable {
private BlockingQueue<String> queue;
private int timeout; // 生产一个产品后暂停的时间
private String category; // 仅仅起标记产品作用

public Producer(BlockingQueue<String> queue, int timeout, String category) {
super();
this.queue = queue;
this.timeout = timeout;
this.category = category;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// put()方法也是一个会阻塞的方法,如果队列已满的时候这个方法会一起阻塞直到
// 队列中重新出现空间为止
queue.put("product " + category);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
TimeUnit.MILLISECONDS.sleep(timeout); // 每生产一个产品就暂停timeout毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Customer implements Runnable {
private BlockingQueue<String> queue;

public Customer(BlockingQueue<String> queue) {
super();
this.queue = queue;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("product got:" + queue.take());
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
// 暂停10毫秒,这里主要是为了证明take()是一个阻塞方法,如果 BlockingQueue中
// 没有元素,它会一起阻塞直到队列中有元素为止
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
// 这里简单的说一下BlockingQueue的实现,它基于生产者-消费者模式,其中有两个重要的阻塞方法
// put()和take(),而这两个方法的实现用到了Lock和Condition,具体实现请参考API
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
Thread t1 = new Thread(new Producer(queue, 500, "peak")); // 生产者线程,并且要比nike生产的快
Thread t2 = new Thread(new Producer(queue, 1000, "nike")); // 第二个生产者线程
Thread t3 = new Thread(new Customer(queue)); // 消费者线程
t1.start();
t2.start();
t3.start();
}
}

四、参考地址

http://developer.51cto.com/art/201508/487488.htm

http://blog.csdn.net/monkey_d_meng/article/details/6251879

http://blog.csdn.net/moreevan/article/details/6763508