生产者消费者问题是一个经典的多线程问题,今天花了点时间写了一个,有助于巩固多线程知识
基础版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| public class ProducerConsumer { public static void main(String[] args) { Container container = new Container(10); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start();
} }
class Food { private int i;
public Food(int i) { this.i = i; }
@Override public String toString() { return "产品" + i; } }
class Container { private Food[] foods; private int capacity; private int index;
public Container(int capacity) { this.capacity = capacity; foods = new Food[this.capacity]; index = 0; }
public synchronized void add (Food food) { foods[index] = food; index++; }
public synchronized Food takeAway() { Food f = foods[index]; foods[index] = null; index--; return f; }
}
class Producer implements Runnable{ private Container container;
public Producer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { Food f = new Food(i); container.add(f); } } }
class Consumer implements Runnable{
private Container container;
public Consumer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { container.takeAway(); } } }
|
  运行结果会报ArrayIndexOutOfBoundsException
, 显然是因为当容器容量满了后生产者任然在向其中放入产品或者是容器内的产品被取完后消费者任试图从其中取产品。因此,需要在放入及拿出之前进行判断,如果容器已满或者容器内已无产品时让消费者和生产者进行等待。这就需要用到wait
方法。
  该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()
之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法。进入 wait()
方法后,当前线程释放锁。在从 wait()返回前,线程与其他线程竞争重新获得锁。如果调用 wait()
时,没有持有适当的锁,则抛出 IllegalMonitorStateException
改进版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| public class ProducerConsumer { public static void main(String[] args) { Container container = new Container(10); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start();
} }
class Food { private int i;
public Food(int i) { this.i = i; }
@Override public String toString() { return "产品" + i; } }
@Slf4j class Container { private Food[] foods; private int capacity; private int index;
public Container(int capacity) { this.capacity = capacity; foods = new Food[this.capacity]; index = -1; }
public synchronized void add (Food food) { if (index >= capacity - 1){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } index++; foods[index] = food; log.info("添加产品{}", food); log.info("现有产品{}个", index+1); }
public synchronized Food takeAway() { if (index < 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
Food f = foods[index]; foods[index] = null; index--; log.info("拿出产品{}", f); log.info("现有产品{}个", index + 1); return f; }
}
class Producer implements Runnable{ private Container container;
public Producer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { Food f = new Food(i); container.add(f); } } }
class Consumer implements Runnable{
private Container container;
public Consumer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { container.takeAway(); } } }
|
  改进版在添加或者那区时进行判断,不符合条件则让线程wait
。运行结果如下图:
运行不报错,但程序发生阻塞。仔细一想原因也很简单:生产者线程运行较快,容器很快生产者线程随即阻塞。之后消费者线程开始运行,当容器变空时消费者线程也发生阻塞。因此需要用到notify
方法。
  该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,的如果调用 notify()
时没有持有适当的锁,也会抛出 IllegalMonitorStateException
。
再改进版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| public class ProducerConsumer { public static void main(String[] args) { Container container = new Container(10); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); } }
class Food { private int i;
public Food(int i) { this.i = i; }
@Override public String toString() { return "产品" + i; } }
@Slf4j class Container { private Food[] foods; private int capacity; private int index;
public Container(int capacity) { this.capacity = capacity; foods = new Food[this.capacity]; index = -1; }
public synchronized void add (Food food) { if (index >= capacity - 1){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } notify(); index++; foods[index] = food; log.info("添加产品{}", food); log.info("现有产品{}个", index+1); }
public synchronized Food takeAway() { if (index < 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } notify(); Food f = foods[index]; foods[index] = null; index--; log.info("拿出产品{}", f); log.info("现有产品{}个", index + 1); return f; }
}
class Producer implements Runnable{ private Container container;
public Producer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { Food f = new Food(i); container.add(f); } } }
class Consumer implements Runnable{
private Container container;
public Consumer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { container.takeAway(); } } }
|
运行结果正常。 然而这是单消费者,单生产者的情况。当生产者消费者都有多个时情况又发生了改变:
1 2 3 4 5 6 7 8 9
| public static void main(String[] args) { Container container = new Container(10); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); }
|
运行结果:
  再次出现了数组下标越界错误。经过分析也找出了问题所在:当容器满了之后,有多个生产者线程去添加产品,但都发生了阻塞。这时消费者进程运行,而当消费者线程运行后有阻塞的生产者线程1被唤醒重新运行:调用nodify()
方法正好唤醒的是生产者进程2,生产者线程1添加产品。之后生产者线程2运行直接添加产品,数组下标越界错误。思考了一番,将if
判断改为while
判断。
再再改进版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| public class ProducerConsumer { public static void main(String[] args) { Container container = new Container(10); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); new Thread(new Producer(container)).start(); new Thread(new Consumer(container)).start(); } }
class Food { private int i;
public Food(int i) { this.i = i; }
@Override public String toString() { return "产品" + i; } }
@Slf4j class Container { private Food[] foods; private int capacity; private int index;
public Container(int capacity) { this.capacity = capacity; foods = new Food[this.capacity]; index = -1; }
public synchronized void add (Food food) { while (index >= capacity - 1){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } notify(); index++; foods[index] = food; log.info("添加产品{}", food); log.info("现有产品{}个", index+1); }
public synchronized Food takeAway() { while (index < 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } notify(); Food f = foods[index]; foods[index] = null; index--; log.info("拿出产品{}", f); log.info("现有产品{}个", index + 1); return f; }
}
class Producer implements Runnable{ private Container container;
public Producer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { Food f = new Food(i); container.add(f); } } }
class Consumer implements Runnable{
private Container container;
public Consumer(Container container) { this.container = container; }
@Override public void run() { for (int i = 0; i < 20; i++) { container.takeAway(); } } }
|
运行成功。
以前看书时看到run
方法内的循环判断都是用while
而不是for
或if
。经过这个bug修复后,有了更好的理解。