文章

生产者消费者基本模型

2020.12.8 ・ 共 1323 字,您可能需要 3 分钟阅读

Tags: 多线程, Java, 学习笔记

在多线程开发之中,最为著名的案例就是生产者与消费者操作。

  1. 生产者负责信息内容的生产。
  2. 每当生产者完成一项完整的信息之后,消费者则取走信息。
  3. 如果生产者没有结束,那么消费者应该等待它生产完成;如果消费者还没有对信息进行消费,那么生产者则应该等待消费者消费完成后再生产。

将生产者与消费者定义为两个独立的线程类对象,对于现在生产的数据,举例为如下组成。

  1. title = t1 , content = c1;
  2. title = t2, content = c2;

生产者与消费者是两个独立的线程,那么这两个独立的线程之间就需要有一个数据保存的集中点。

image-20201208090422979

class Producer implements Runnable {
  private Message msg;

  public Producer(Message msg) {
    this.msg = msg;
  }

  public void run() {
    for (int x = 0; x < 100; x++) {
      if (x % 2 == 0) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        this.msg.setTitle("t1");
        this.msg.setContent("c1");
      } else {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        this.msg.setTitle("t2");
        this.msg.setContent("c2");
      }
    }
  }
}

class Consumer implements Runnable {
  private Message msg;

  public Consumer(Message msg) {
    this.msg = msg;
  }

  public void run() {
    for (int x = 0; x < 100; x++) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println(this.msg.getTitle() + this.msg.getContent());
    }
  }
}

class Message {
  private String title;
  private String content;

  public void setTitle(String title) {
    this.title = title;
  }

  public void setContent(String content) {
    this.content = content;
  }

  public String getTitle() {
    return title;
  }

  public String getContent() {
    return content;
  }
}

public class Test {
  public static void main(String[] args) {
    Message msg = new Message();
    new Thread(new Producer(msg)).start(); // 生产者线程
    new Thread(new Consumer(msg)).start(); // 消费者线程
  }
}

根据以上代码执行,有两个重要问题。

  1. 数据不同步。
  2. 期望结果是生产一个取走一个,但是发现有重复生产,重复取出的现象存在。

要想解决数据同步,最简单的做法是使用Synchronized关键字,于是同步处理就可以在Message类中完成。

class Producer implements Runnable {
  private Message msg;

  public Producer(Message msg) {
    this.msg = msg;
  }

  public void run() {
    for (int x = 0; x < 100; x++) {
      if (x % 2 == 0) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        try {
          this.msg.set("t1", "c1");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      } else {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        try {
          this.msg.set("t2", "c2");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

class Consumer implements Runnable {
  private Message msg;

  public Consumer(Message msg) {
    this.msg = msg;
  }

  public void run() {
    for (int x = 0; x < 100; x++) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      try {
        System.out.println(this.msg.get());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

class Message {
  private String title;
  private String content;

  public synchronized void set(String title, String content) throws InterruptedException {
    this.title = title;
    Thread.sleep(100);
    this.content = content;
  }

  public synchronized String get() throws InterruptedException {
    Thread.sleep(100);
    return this.title + "--" + this.content;
  }
}

public class Test {
  public static void main(String[] args) {
    Message msg = new Message();
    new Thread(new Producer(msg)).start(); // 生产者线程
    new Thread(new Consumer(msg)).start(); // 消费者线程
  }
}

此时已经解决同步,但是数据重复仍然存在。

要想解决生产者与消费者的数据重复问题,最好的解决方案就是使用等待与唤醒机制,主要依靠Object中的方法进行处理。

  • 等待
    • 一直等到死public final void wait() throws InterruptedException;
    • 设置等待时间
      • public final void wait(long timeout) throws InterruptedException;
      • public final void wait(long timeout, int nanos) throws InterruptedException;
  • 唤醒
    • 唤醒第一个等待线程public final void notify()
    • 唤醒全部等待线程public final void notifyAll()

此时,如果有若干个等待线程的话,那么notify()表示唤醒的是第一个等待线程,其他的继续等待;notifyAll()表示的是唤醒所有等待线程,哪个现成的优先级高就有可能先执行。

class Producer implements Runnable {
  private Message msg;

  public Producer(Message msg) {
    this.msg = msg;
  }

  public void run() {
    for (int x = 0; x < 100; x++) {
      if (x % 2 == 0) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        try {
          this.msg.set("t1", "c1");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      } else {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        try {
          this.msg.set("t2", "c2");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

class Consumer implements Runnable {
  private Message msg;

  public Consumer(Message msg) {
    this.msg = msg;
  }

  public void run() {
    for (int x = 0; x < 100; x++) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      try {
        System.out.println(this.msg.get());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

class Message {
  private String title;
  private String content;
  private boolean flag = true;
  // flag = true 允许生产不允许消费
  // flag = false 允许消费不允许生产
  public synchronized void set(String title, String content) throws InterruptedException {
    if (!flag) {
      super.wait();
    }
    this.title = title;
    Thread.sleep(100);
    this.content = content;
    this.flag = false; // 已经生产过了
    super.notify();
  }

  public synchronized String get() throws InterruptedException {
    if (this.flag) {
      super.wait();
    }
    Thread.sleep(100);
    try {
      return this.title + "--" + this.content;
    } finally {
      this.flag = true;
      super.notify();
    }
  }
}

public class Test {
  public static void main(String[] args) {
    Message msg = new Message();
    new Thread(new Producer(msg)).start(); // 生产者线程
    new Thread(new Consumer(msg)).start(); // 消费者线程
  }
}

此为多线程开发之中最原始的处理方案,全部手动实现。