所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架满的时候,消费者可以从货架上拿走商品,生产者此时等待货架的空位,这样不断的循环。那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者模型。
总结一下:生产者消费者能够解决的问题如下:
在具体实现生产者消费者模型之前需要先描述几个用到的方法:
先看一下wait()是干什么的?
1.wait()是Object里面的方法,而不是Thread里面的,这一点很容易搞错。它的作用是将当前线程置于预执行队列,并在wait()所在的代码处停止,等待唤醒通知。 2.wait()只能在同步代码块或者同步方法中执行,如果调用wait()方法,而没有持有适当的锁,就会抛出异常。 wait()方法调用后悔释放出锁,线程与其他线程竞争重新获取锁。
举个例子:
public class TestWait implements Runnable { private final Object object=new Object(); @Override public void run() { synchronized (object){ System.out.println("线程执行开始。。。"); try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程执行结束。。。"); } } public static void main(String[] args) { TestWait testWait=new TestWait(); Thread thread=new Thread(testWait); thread.start(); } }
结果如下:
从结果中我们可以看出线程调用了wait()方法后一直在等待,不会继续往下执行。这也就能解释上面说的wait()一旦执行,除非接收到唤醒操作或者是异常中断,否则不会继续往下执行。
在上面的代码中我们看到wait()调用以后线程一直在等待,在实际当中我们难免不希望是这样的,那么这个时候就用到了另一个方法notify方法:
1.notify()方法也是要在同步代码块或者同步方法中调用的,它的作用是使停止的线程继续执行,调用notify()方法后,会通知那些等待当前线程对象锁的线程,并使它们重新获取该线程的对象锁,如果等待线程比较多的时候,则有线程规划器随机挑选出一个呈wait状态的线程。 2.notify()调用之后不会立即释放锁,而是当执行notify()的线程执行完成,即退出同步代码块或同步方法时,才会释放对象锁。
还是上面的例子,刚才我们调用了wait()方法后,线程便一直在等待,接下来我们给线程一个唤醒的信号,代码如下:
public class TestWait implements Runnable { private final Object object=new Object(); public void setFlag(boolean flag) { this.flag = flag; } private boolean flag=true; @Override public void run() { if(flag){ this.testwait(); } else { this.testnotify(); } } public void testwait(){ synchronized (object){ try { System.out.println("线程开始执行。。。"); Thread.sleep(1000); object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程执行结束。。。"); } } public void testnotify(){ synchronized (object){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } object.notify(); } } public static void main(String[] args) { TestWait testWait=new TestWait(); Thread thread=new Thread(testWait); thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } testWait.setFlag(false); Thread thread1=new Thread(testWait); thread1.start(); } }
结果如下: 我们看到在调用notify()方法之后,线程又继续了。
从字面意思就可以看出notifyAll是唤醒所有等待的线程。
public class TestWait implements Runnable { private final Object object=new Object(); private boolean flag=true; public void setFlag(boolean flag) { this.flag = flag; } @Override public void run() { if(flag){ this.testwait(); } else { this.testnotify(); } } public void testwait(){ synchronized (object){ try { System.out.println(Thread.currentThread().getName()+"线程开始执行。。。"); Thread.sleep(1000); object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"线程执行结束。。。"); } } public void testnotify(){ synchronized (object){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } object.notifyAll(); } } public static void main(String[] args) { TestWait testWait=new TestWait(); Thread thread=new Thread(testWait,"线程1"); thread.start(); Thread thread1=new Thread(testWait,"线程2"); thread1.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } testWait.setFlag(false); Thread thread2=new Thread(testWait); thread2.start(); } }
结果如下: 可见notifyAll()方法确实唤醒了所有等待的线程。
出现阻塞的情况大体分为如下5种:
线程调用 sleep方法,主动放弃占用的处理器资源。 线程调用了阻塞式IO方法,在该方法返回前,该线程被阻塞。 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。 线程等待某个通知。 程序调用了 suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。
run()方法运行结束后进入销毁阶段,整个线程执行完毕。
>
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。
商品类
public class Goods { private int id; private String name; public Goods(int id, String name) { this.id = id; this.name = name; } }
生产者类
public class Producer implements Runnable { private Goods goods; @Override public void run() { while (true) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (TestPC.queue) { goods=new Goods(1,"商品"); if (TestPC.queue.size()<MAX_POOL) { TestPC.queue.add(goods); System.out.println(Thread.currentThread().getName()+"生产商品"); } else { try { TestPC.queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
消费者类
public class Consumer implements Runnable { @Override public void run() { while (true){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (TestPC.queue){ if(!TestPC.queue.isEmpty()){ TestPC.queue.poll(); System.out.println(Thread.currentThread().getName()+"消费商品"); } else { TestPC.queue.notify(); } } } } }
测试类
public class TestPC { public static final int MAX_POOL=10; public static final int MAX_PRODUCER=5; public static final int MAX_CONSUMER=4; public static Queue<Goods> queue=new ArrayBlockingQueue<>(MAX_POOL); public static void main(String[] args) { Producer producer=new Producer(); Consumer consumer=new Consumer(); for(int i=0;i<MAX_PRODUCER;i++) { Thread threadA = new Thread(producer, "生产者线程"+i); threadA.start(); } for(int j=0;j<MAX_CONSUMER;j++) { Thread threadB = new Thread(consumer, "消费者线程"+j); threadB.start(); } } }
部分结果展示:
原文链接:https://blog.csdn.net/qq_40550018/article/details/87859399