前言
今天我们来学习下生产者与消费者模式。
生产者和消费者模式可以解决绝大多数并发问题,一般由生产者、数据缓冲区、消费者构成。
如下图,其原理是将原来的直接调用(消费者->生产者)变为了生产者生产数据放入缓存区,消费者从缓存区获取数据并消费这种模式。
可以知道MQ就是生产者与消费者模式的典型代表。
我们可以举例比如一个定时任务,每天要批处理数据,比如上传文件,每天如果要上传1000个文件或者更多,这时候我们使用平常的循环上传方法,明显大部分时间均浪费在了上传的时间上。
如果按照每个文件处理需要3s,1000个文件则至少需要3000s时间。
如果我们引入生产者和消费者模式,生产者部分负责查询组装数据并把它们放入数据缓存区,消费者部分负责处理数据并上传,可以大大提高并发性能。
使用生产者与消费者模式的典型优点如下:
并发支持
可以看到,如果消费者处理比较耗时,我们可以使用多个生产者生产数据或者消费者去处理队列数据,从而提高系统并发性能。即消费者和生产者可以为两个独立的并发主体。
解耦
我们将生产者和消费者分开后,即使生产者部分处理数据的逻辑有变化,也不会影响到消费者部分,而相比之前在一起的逻辑,我们可能需要改动整个业务部分以完成数据处理。即生产者和消费者没有过分的依赖关系,只要保证传输数据格式的正确性即可。
解决忙闲不均问题
可以看到生产者和消费者模式可以完美解决忙闲不均的问题,当生产者数据过多时,进入数据缓存区等待消费者慢慢处理,生产者数据少时,由于缓存区的数据,也不至于消费者无事可做。即无论生产者或者消费者谁快谁慢,我们总可以通过对他们的数量控制来均衡资源的分配。
正文
我们通过上面的例子来实践下消费者和生产者模式。
我们正常逻辑可能如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws Exception{ List<String> list = new ArrayList<>(); for(int i = 0;i<1000;i++){ Thread.sleep(10); list.add(i+""); }
for (int i =0;i<list.size();i++){ Thread.sleep(1000); System.out.println(i); } }
|
可以看到这个过程是非常耗时的,我们使用生产者和消费者模式来设计下这个业务场景。
我们数据缓存区使用队列来暂存数据,生产者组生产数据时会将数据放入队列,消费者消费数据时会从队列中获取数据。
我们用阻塞队列LinkedBlockingQueue
来作为数据缓存区,写一个生产者放入数据和消费者取出数据的方法。
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| public class Context<E> { private static final Logger log = LoggerFactory.getLogger(Context.class); private final LinkedBlockingQueue<E> consumptionQueue = new LinkedBlockingQueue<E>(2500); private volatile ThreadState producersThreadState; private volatile ThreadState consumersThreadState;
int getConsumptionQueueSize() { return consumptionQueue.size(); }
public boolean offerDataToConsumptionQueue(E e) throws Exception { setProducersThreadState(ThreadState.RUNNING); if (ThreadState.DEAD == this.getConsumersThreadState()){ return false; } while (true) { if (consumptionQueue.offer(e, 2, TimeUnit.SECONDS)){ return true; } if (ThreadState.DEAD == this.getConsumersThreadState()) { return false; } } }
public E pollDataFromConsumptionQueue() throws Exception { setConsumersThreadState(ThreadState.RUNNING); while (true) { E e = consumptionQueue.poll(20, TimeUnit.MILLISECONDS); if (e != null){ return e; } if (ThreadState.DEAD == this.getProducersThreadState()){ return null; } log.debug("demand exceeds supply(供不应求,需生产数据)..."); Thread.sleep(50); } }
ThreadState getProducersThreadState() { return producersThreadState; }
void setProducersThreadState(ThreadState producersThreadState) { this.producersThreadState = producersThreadState; }
ThreadState getConsumersThreadState() { return consumersThreadState; }
void setConsumersThreadState(ThreadState consumersThreadState) { this.consumersThreadState = consumersThreadState; }
}
|
线程状态枚举:新线程(NEW)、可运行的(RUNNABLE)、运行中(RUNNING)、死亡(DEAD)、阻塞(BLOCKED)。
1 2 3
| enum ThreadState { NEW, RUNNABLE, RUNNING, DEAD, BLOCKED; }
|
然后我们构造两个模板接口,一个生产者模板接口一个消费者模板接口,分别提供生产者产生数据的方法和消费者消费数据的方法。具体实现有各自的业务实现类实现即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public interface ProducerTemplate<C_E> {
void production(Context<C_E> context) throws Exception; }
public interface ConsumerTemplate<C_E> {
void consumption(Context<C_E> context) throws Exception; }
|
创建一个生产者与消费者的协调者类,用来启动生产者或者消费者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
public class Coordinator {
private static final Logger log = LoggerFactory.getLogger(Coordinator.class); private final Lock lock = new ReentrantLock(); private final Condition enabledConsumers = lock.newCondition(); private volatile boolean isEnabledForConsumers; private final Context<?> context; private boolean isWaitingToFinish; private int consumersMaxTotal;
public Coordinator(Context<?> context, int consumersMaxTotal) { this(context, consumersMaxTotal, true); }
public Coordinator(Context<?> context, int consumersMaxTotal, boolean isWaitingToFinish) { this.context = context; this.consumersMaxTotal = consumersMaxTotal; this.isWaitingToFinish = isWaitingToFinish; }
public void start(ProducerTemplate<?> producerTemplate,ConsumerTemplate<?> consumerTemplate) throws Exception { if (context.getConsumersThreadState() != null || context.getProducersThreadState() != null){ return; } ProducersThreadUnit producersThreadUnit = new ProducersThreadUnit(producerTemplate, "production", context); ConsumersThreadUnit consumersThreadUnit = new ConsumersThreadUnit(consumerTemplate, "consumption", context); this.start(producersThreadUnit, consumersThreadUnit); }
public void start(ProducersThreadUnit producersThreadUnit, ConsumersThreadUnit consumersThreadUnit) throws Exception { if (context.getConsumersThreadState() != ThreadState.NEW || context.getProducersThreadState() != ThreadState.NEW){ return; } long time = System.currentTimeMillis(); try { Thread startProducersThread = this.startProducers(producersThreadUnit); Thread startConsumersThread = this.startConsumers(consumersThreadUnit); if (!this.isWaitingToFinish){ return; } startProducersThread.join(); if (startConsumersThread != null){ startConsumersThread.join(); } } catch (Exception e) { log.error("start worker error...", e); throw e; } log.info(String.format("processing is completed... man-hour(millisecond)=[%s]", System.currentTimeMillis() - time)); }
private Thread startProducers(ProducersThreadUnit producersThreadUnit) throws Exception { Thread thread = new Thread(producersThreadUnit); thread.start(); return thread; }
private Thread startConsumers(ConsumersThreadUnit consumersThreadUnit) throws Exception { lock.lock(); try { log.info("wating for producers..."); while (!isEnabledForConsumers){ enabledConsumers.await(5, TimeUnit.SECONDS); } if (context.getConsumptionQueueSize() == 0){ return null; } log.info("start consumers before..."); Thread thread = new Thread(consumersThreadUnit); thread.start(); return thread; } catch (Exception e) { log.error("start consumers error...", e); throw e; } finally { lock.unlock(); } } }
|
生产者和消费者的线程单元如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
public class ProducersThreadUnit implements Runnable {
private Object targetObject; private String targetMethodName; private Object[] targetMethodParameters; private ExecutorService executorService = Executors.newFixedThreadPool(1);
public ProducersThreadUnit(Object targetObject, String targetMethodName, Object... targetMethodParameters) { this.targetObject = targetObject; this.targetMethodName = targetMethodName; this.targetMethodParameters = targetMethodParameters; context.setProducersThreadState(ThreadState.NEW); }
@Override public void run() { try { executorService.execute(new RunnableThreadUnit(targetObject, targetMethodName, targetMethodParameters)); context.setProducersThreadState(ThreadState.RUNNABLE); executorService.shutdown(); while (!executorService.isTerminated() && context.getConsumptionQueueSize() == 0){ Thread.sleep(20); } log.info("production the end or products have been delivered,ready to inform consumers..."); this.wakeConsumers(); log.info("wait until the production is complete..."); while (!executorService.isTerminated()){ Thread.sleep(200); } } catch (Exception e) { log.error(String.format("production error... targetObject=[%s],targetMethodName=[%s],targetMethodParameters=[%s]", targetObject, targetMethodName, targetMethodParameters), e); if (!executorService.isShutdown()){ executorService.shutdown(); } } finally { log.info("production the end..."); context.setProducersThreadState(ThreadState.DEAD); isEnabledForConsumers = true; } }
private void wakeConsumers() { isEnabledForConsumers = true; lock.lock(); try { enabledConsumers.signal(); } catch (Exception e) { log.error("inform to consumers error...", e); } finally { lock.unlock(); } }
}
public class ConsumersThreadUnit implements Runnable {
private Object targetObject; private String targetMethodName; private Object[] targetMethodParameters;
public ConsumersThreadUnit(Object targetObject, String targetMethodName, Object... targetMethodParameters) { this.targetObject = targetObject; this.targetMethodName = targetMethodName; this.targetMethodParameters = targetMethodParameters; context.setConsumersThreadState(ThreadState.NEW); }
@Override public void run() { ThreadPoolExecutor threadPoolExecutor = null; int concurrencyMaxTotal = Coordinator.this.consumersMaxTotal; try { threadPoolExecutor = new ThreadPoolExecutor(0, concurrencyMaxTotal, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); while (concurrencyMaxTotal > 0) { if (threadPoolExecutor.getPoolSize() > context.getConsumptionQueueSize()) { if (ThreadState.DEAD == context.getProducersThreadState()) { break; }else { Thread.sleep(50); continue; } } RunnableThreadUnit consumers = new RunnableThreadUnit(targetObject, targetMethodName, targetMethodParameters); threadPoolExecutor.execute(consumers); context.setConsumersThreadState(ThreadState.RUNNABLE); log.info("submit consumption task..."); concurrencyMaxTotal--; } threadPoolExecutor.shutdown(); while (!threadPoolExecutor.isTerminated()) { Thread.sleep(100); } } catch (Exception e) { log.error(String.format("consumption error... targetObject=[%s],targetMethodName=[%s],targetMethodParameters=[%s]", targetObject, targetMethodName, targetMethodParameters), e); if (threadPoolExecutor != null && !threadPoolExecutor.isShutdown()) { threadPoolExecutor.shutdown(); } } finally { log.info("consumption the end..."); context.setConsumersThreadState(ThreadState.DEAD); } } }
public class RunnableThreadUnit implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(RunnableThreadUnit.class);
private Object object; private String methodName; private Object[] methodParameters;
public RunnableThreadUnit(Object object, String methodName, Object... methodParameters) { if (object == null || StringUtils.isBlank(methodName) || methodParameters == null) { throw new RuntimeException("init runnable thread unit error..."); } this.object = object; this.methodName = methodName; this.methodParameters = methodParameters; }
@Override public void run() { try { Class<?>[] classes = new Class[methodParameters.length]; for (int i = 0; i < methodParameters.length; i++) { classes[i] = methodParameters[i].getClass(); } Method method = object.getClass().getMethod(methodName, classes); method.invoke(object, methodParameters); } catch (Exception e) { logger.error(String.format("execute runnable thread unit error... service=[%s],invokeMethodName=[%s]", object, methodName), e); } }
}
|
可以看到我们使用反射获取了production和consumption方法,并执行它们。启动了两个线程,生产者线程和消费者线程去处理业务,其中消费者线程利用了线程池,可以放置concurrencyMaxTotal个子线程去消费任务。
我们创建一个测试类进行测试,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class CourrentTest implements ProducerTemplate<String>, ConsumerTemplate<String>{ @Override public void production(Context<String> context) throws Exception { for(int i = 0;i<1000;i++){ Thread.sleep(10); if(!context.offerDataToConsumptionQueue(i+"")){ return; } } } @Override public void consumption(Context<String> context) throws Exception { while (true) { String str = context.pollDataFromConsumptionQueue(); if (str == null) { break; } Thread.sleep(1000); System.out.println(str); } } public static void main(String[] args) throws Exception{ CourrentTest courrentTest = new CourrentTest(); new Coordinator(new Context<String>(),10).start(courrentTest,courrentTest); } }
|
运行后可以看到输出的结果。
这儿我们可以看到对比较耗时的上传方法(消费者端)进行了并发处理以提高效率,生产端如果保证了数据的安全性,我们可以使用并行流等放入数据以提高放入数据的效率。
其实我们看到这儿,可以理解线程池也是一个类似于生产者消费者模式的东西。线程池里面有任务就会去执行,相当于消费者,线程池里的队列相当于缓存区,而生产者就是我们一个个放入线程的Runable方法。
上述代码的运行原理图大致如下:
PS: 上述代码可以在我的GitHub项目里找到。
https://github.com/JavaZWT/framework-base
另外提供了一个简易模板SimpleTemplate可以适用生产者方法和消费者方法在一个类里的情况,只继承这一个方法即可。不用分别继承ConsumerTemplate和ProducerTemplate接口了。
总结
通过对上面一个列子使用生产者和消费者模式,我们了解了这种模式的一些适用情形和优点。
当然也了解了它的一些缺点,对于解决并发问题的方案,最要重视的应该就是数据安全问题了。
我们在平时工作中也可以考虑什么样的场景下可以使用这种模式,其实这种模式的适用场景还是蛮多的,对于一些处理较耗时的操作,文件上传、图片生成转换等都可以考虑这种模式。