生产者消费者问题是一个经典的多线程问题,今天花了点时间写了一个,有助于巩固多线程知识

基础版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class ProducerConsumer {
public static void main(String[] args) {
Container container = new Container(10);
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();

}
}

/**
* 具体生产类
*/
class Food {
private int i;

public Food(int i) {
this.i = i;
}

@Override
public String toString() {
return "产品" + i;
}
}


/**
* 具体容器
*/
class Container {
private Food[] foods;
private int capacity;
private int index;

public Container(int capacity) {
this.capacity = capacity;
foods = new Food[this.capacity];
index = 0;
}

//一个容器只能允许同时只有一个生产者或消费者对其操作,以防同步问题,因此添加synchronized关键字
public synchronized void add (Food food) {
foods[index] = food;
index++;
}

//一个容器只能允许同时只有一个生产者或消费者对其操作,以防同步问题,因此添加synchronized关键字
public synchronized Food takeAway() {
Food f = foods[index];
foods[index] = null;
index--;
return f;
}

}


/**
* 生产者类
*/
class Producer implements Runnable{
private Container container;

public Producer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
Food f = new Food(i);
container.add(f);
}
}
}


class Consumer implements Runnable{

private Container container;

public Consumer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
container.takeAway();
}
}
}

&ensp;&ensp;运行结果会报ArrayIndexOutOfBoundsException, 显然是因为当容器容量满了后生产者任然在向其中放入产品或者是容器内的产品被取完后消费者任试图从其中取产品。因此,需要在放入及拿出之前进行判断,如果容器已满或者容器内已无产品时让消费者生产者进行等待。这就需要用到wait方法。
&ensp;&ensp;该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法。进入 wait()方法后,当前线程释放锁。在从 wait()返回前,线程与其他线程竞争重新获得锁。如果调用 wait()时,没有持有适当的锁,则抛出 IllegalMonitorStateException

改进版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class ProducerConsumer {
public static void main(String[] args) {
Container container = new Container(10);
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();

}
}

/**
* 具体生产类
*/
class Food {
private int i;

public Food(int i) {
this.i = i;
}

@Override
public String toString() {
return "产品" + i;
}
}


/**
* 具体容器
*/
@Slf4j
class Container {
private Food[] foods;
private int capacity;
private int index;

public Container(int capacity) {
this.capacity = capacity;
foods = new Food[this.capacity];
index = -1;
}

public synchronized void add (Food food) {
if (index >= capacity - 1){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
index++;
foods[index] = food;
log.info("添加产品{}", food);
log.info("现有产品{}个", index+1);
}

public synchronized Food takeAway() {
if (index < 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Food f = foods[index];
foods[index] = null;
index--;
log.info("拿出产品{}", f);
log.info("现有产品{}个", index + 1);
return f;
}

}


/**
* 生产者类
*/
class Producer implements Runnable{
private Container container;

public Producer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
Food f = new Food(i);
container.add(f);
}
}
}


class Consumer implements Runnable{

private Container container;

public Consumer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
container.takeAway();
}
}
}

&ensp;&ensp;改进版在添加或者那区时进行判断,不符合条件则让线程wait。运行结果如下图:

TIM截图20180504100400.png-128.1kB

运行不报错,但程序发生阻塞。仔细一想原因也很简单:生产者线程运行较快,容器很快生产者线程随即阻塞。之后消费者线程开始运行,当容器变空时消费者线程也发生阻塞。因此需要用到notify方法。
&ensp;&ensp;该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,的如果调用 notify()时没有持有适当的锁,也会抛出 IllegalMonitorStateException

再改进版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class ProducerConsumer {
public static void main(String[] args) {
Container container = new Container(10);
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
}
}

/**
* 具体生产类
*/
class Food {
private int i;

public Food(int i) {
this.i = i;
}

@Override
public String toString() {
return "产品" + i;
}
}


/**
* 具体容器
*/
@Slf4j
class Container {
private Food[] foods;
private int capacity;
private int index;

public Container(int capacity) {
this.capacity = capacity;
foods = new Food[this.capacity];
index = -1;
}

public synchronized void add (Food food) {
if (index >= capacity - 1){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notify();
index++;
foods[index] = food;
log.info("添加产品{}", food);
log.info("现有产品{}个", index+1);
}

public synchronized Food takeAway() {
if (index < 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notify();
Food f = foods[index];
foods[index] = null;
index--;
log.info("拿出产品{}", f);
log.info("现有产品{}个", index + 1);
return f;
}

}


/**
* 生产者类
*/
class Producer implements Runnable{
private Container container;

public Producer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
Food f = new Food(i);
container.add(f);
}
}
}


class Consumer implements Runnable{

private Container container;

public Consumer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
container.takeAway();
}
}
}

运行结果正常。 然而这是单消费者,单生产者的情况。当生产者消费者都有多个时情况又发生了改变:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Container container = new Container(10);
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
}

运行结果:

TIM截图20180504104842.png-43.8kB
&ensp;&ensp;再次出现了数组下标越界错误。经过分析也找出了问题所在:当容器满了之后,有多个生产者线程去添加产品,但都发生了阻塞。这时消费者进程运行,而当消费者线程运行后有阻塞的生产者线程1被唤醒重新运行:调用nodify()方法正好唤醒的是生产者进程2,生产者线程1添加产品。之后生产者线程2运行直接添加产品,数组下标越界错误。思考了一番,将if判断改为while判断。

再再改进版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public class ProducerConsumer {
public static void main(String[] args) {
Container container = new Container(10);
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
}
}

/**
* 具体生产类
*/
class Food {
private int i;

public Food(int i) {
this.i = i;
}

@Override
public String toString() {
return "产品" + i;
}
}


/**
* 具体容器
*/
@Slf4j
class Container {
private Food[] foods;
private int capacity;
private int index;

public Container(int capacity) {
this.capacity = capacity;
foods = new Food[this.capacity];
index = -1;
}

public synchronized void add (Food food) {
while (index >= capacity - 1){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notify();
index++;
foods[index] = food;
log.info("添加产品{}", food);
log.info("现有产品{}个", index+1);
}

public synchronized Food takeAway() {
while (index < 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notify();
Food f = foods[index];
foods[index] = null;
index--;
log.info("拿出产品{}", f);
log.info("现有产品{}个", index + 1);
return f;
}

}


/**
* 生产者类
*/
class Producer implements Runnable{
private Container container;

public Producer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
Food f = new Food(i);
container.add(f);
}
}
}


class Consumer implements Runnable{

private Container container;

public Consumer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
container.takeAway();
}
}
}

运行成功。
以前看书时看到run方法内的循环判断都是用while而不是forif。经过这个bug修复后,有了更好的理解。