![upload successful]()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws Exception{ List<String> list = new ArrayList<>(); for(int i = 0;i<1000;i++){ Thread.sleep(10); list.add(i+""); }
for (int i =0;i<list.size();i++){ Thread.sleep(1000); System.out.println(i); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| public class Context<E> { private static final Logger log = LoggerFactory.getLogger(Context.class); private final LinkedBlockingQueue<E> consumptionQueue = new LinkedBlockingQueue<E>(2500); private volatile ThreadState producersThreadState; private volatile ThreadState consumersThreadState;
int getConsumptionQueueSize() { return consumptionQueue.size(); }
public boolean offerDataToConsumptionQueue(E e) throws Exception { setProducersThreadState(ThreadState.RUNNING); if (ThreadState.DEAD == this.getConsumersThreadState()){ return false; } while (true) { if (consumptionQueue.offer(e, 2, TimeUnit.SECONDS)){ return true; } if (ThreadState.DEAD == this.getConsumersThreadState()) { return false; } } }
public E pollDataFromConsumptionQueue() throws Exception { setConsumersThreadState(ThreadState.RUNNING); while (true) { E e = consumptionQueue.poll(20, TimeUnit.MILLISECONDS); if (e != null){ return e; } if (ThreadState.DEAD == this.getProducersThreadState()){ return null; } log.debug("demand exceeds supply(供不应求,需生产数据)..."); Thread.sleep(50); } }
ThreadState getProducersThreadState() { return producersThreadState; }
void setProducersThreadState(ThreadState producersThreadState) { this.producersThreadState = producersThreadState; }
ThreadState getConsumersThreadState() { return consumersThreadState; }
void setConsumersThreadState(ThreadState consumersThreadState) { this.consumersThreadState = consumersThreadState; }
1 2 3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
public interface ProducerTemplate<C_E> {
void production(Context<C_E> context) throws Exception; }
public interface ConsumerTemplate<C_E> {
void consumption(Context<C_E> context) throws Exception; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
public class Coordinator {
private static final Logger log = LoggerFactory.getLogger(Coordinator.class); private final Lock lock = new ReentrantLock(); private final Condition enabledConsumers = lock.newCondition(); private volatile boolean isEnabledForConsumers; private final Context<?> context; private boolean isWaitingToFinish; private int consumersMaxTotal;
public Coordinator(Context<?> context, int consumersMaxTotal) { this(context, consumersMaxTotal, true); }
public Coordinator(Context<?> context, int consumersMaxTotal, boolean isWaitingToFinish) { this.context = context; this.consumersMaxTotal = consumersMaxTotal; this.isWaitingToFinish = isWaitingToFinish; }
public void start(ProducerTemplate<?> producerTemplate,ConsumerTemplate<?> consumerTemplate) throws Exception { if (context.getConsumersThreadState() != null || context.getProducersThreadState() != null){ return; } ProducersThreadUnit producersThreadUnit = new ProducersThreadUnit(producerTemplate, "production", context); ConsumersThreadUnit consumersThreadUnit = new ConsumersThreadUnit(consumerTemplate, "consumption", context); this.start(producersThreadUnit, consumersThreadUnit); }
public void start(ProducersThreadUnit producersThreadUnit, ConsumersThreadUnit consumersThreadUnit) throws Exception { if (context.getConsumersThreadState() != ThreadState.NEW || context.getProducersThreadState() != ThreadState.NEW){ return; } long time = System.currentTimeMillis(); try { Thread startProducersThread = this.startProducers(producersThreadUnit); Thread startConsumersThread = this.startConsumers(consumersThreadUnit); if (!this.isWaitingToFinish){ return; } startProducersThread.join(); if (startConsumersThread != null){ startConsumersThread.join(); } } catch (Exception e) { log.error("start worker error...", e); throw e; } log.info(String.format("processing is completed... man-hour(millisecond)=[%s]", System.currentTimeMillis() - time)); }
private Thread startProducers(ProducersThreadUnit producersThreadUnit) throws Exception { Thread thread = new Thread(producersThreadUnit); thread.start(); return thread; }
private Thread startConsumers(ConsumersThreadUnit consumersThreadUnit) throws Exception { lock.lock(); try { log.info("wating for producers..."); while (!isEnabledForConsumers){ enabledConsumers.await(5, TimeUnit.SECONDS); } if (context.getConsumptionQueueSize() == 0){ return null; } log.info("start consumers before..."); Thread thread = new Thread(consumersThreadUnit); thread.start(); return thread; } catch (Exception e) { log.error("start consumers error...", e); throw e; } finally { lock.unlock(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
public class ProducersThreadUnit implements Runnable {
private Object targetObject; private String targetMethodName; private Object[] targetMethodParameters; private ExecutorService executorService = Executors.newFixedThreadPool(1);
public ProducersThreadUnit(Object targetObject, String targetMethodName, Object... targetMethodParameters) { this.targetObject = targetObject; this.targetMethodName = targetMethodName; this.targetMethodParameters = targetMethodParameters; context.setProducersThreadState(ThreadState.NEW); }
@Override public void run() { try { executorService.execute(new RunnableThreadUnit(targetObject, targetMethodName, targetMethodParameters)); context.setProducersThreadState(ThreadState.RUNNABLE); executorService.shutdown(); while (!executorService.isTerminated() && context.getConsumptionQueueSize() == 0){ Thread.sleep(20); } log.info("production the end or products have been delivered,ready to inform consumers..."); this.wakeConsumers(); log.info("wait until the production is complete..."); while (!executorService.isTerminated()){ Thread.sleep(200); } } catch (Exception e) { log.error(String.format("production error... targetObject=[%s],targetMethodName=[%s],targetMethodParameters=[%s]", targetObject, targetMethodName, targetMethodParameters), e); if (!executorService.isShutdown()){ executorService.shutdown(); } } finally { log.info("production the end..."); context.setProducersThreadState(ThreadState.DEAD); isEnabledForConsumers = true; } }
private void wakeConsumers() { isEnabledForConsumers = true; lock.lock(); try { enabledConsumers.signal(); } catch (Exception e) { log.error("inform to consumers error...", e); } finally { lock.unlock(); } }
public class ConsumersThreadUnit implements Runnable {
private Object targetObject; private String targetMethodName; private Object[] targetMethodParameters;
public ConsumersThreadUnit(Object targetObject, String targetMethodName, Object... targetMethodParameters) { this.targetObject = targetObject; this.targetMethodName = targetMethodName; this.targetMethodParameters = targetMethodParameters; context.setConsumersThreadState(ThreadState.NEW); }
@Override public void run() { ThreadPoolExecutor threadPoolExecutor = null; int concurrencyMaxTotal = Coordinator.this.consumersMaxTotal; try { threadPoolExecutor = new ThreadPoolExecutor(0, concurrencyMaxTotal, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); while (concurrencyMaxTotal > 0) { if (threadPoolExecutor.getPoolSize() > context.getConsumptionQueueSize()) { if (ThreadState.DEAD == context.getProducersThreadState()) { break; }else { Thread.sleep(50); continue; } } RunnableThreadUnit consumers = new RunnableThreadUnit(targetObject, targetMethodName, targetMethodParameters); threadPoolExecutor.execute(consumers); context.setConsumersThreadState(ThreadState.RUNNABLE); log.info("submit consumption task..."); concurrencyMaxTotal--; } threadPoolExecutor.shutdown(); while (!threadPoolExecutor.isTerminated()) { Thread.sleep(100); } } catch (Exception e) { log.error(String.format("consumption error... targetObject=[%s],targetMethodName=[%s],targetMethodParameters=[%s]", targetObject, targetMethodName, targetMethodParameters), e); if (threadPoolExecutor != null && !threadPoolExecutor.isShutdown()) { threadPoolExecutor.shutdown(); } } finally { log.info("consumption the end..."); context.setConsumersThreadState(ThreadState.DEAD); } } }
public class RunnableThreadUnit implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(RunnableThreadUnit.class);
private Object object; private String methodName; private Object[] methodParameters;
public RunnableThreadUnit(Object object, String methodName, Object... methodParameters) { if (object == null || StringUtils.isBlank(methodName) || methodParameters == null) { throw new RuntimeException("init runnable thread unit error..."); } this.object = object; this.methodName = methodName; this.methodParameters = methodParameters; }
@Override public void run() { try { Class<?>[] classes = new Class[methodParameters.length]; for (int i = 0; i < methodParameters.length; i++) { classes[i] = methodParameters[i].getClass(); } Method method = object.getClass().getMethod(methodName, classes); method.invoke(object, methodParameters); } catch (Exception e) { logger.error(String.format("execute runnable thread unit error... service=[%s],invokeMethodName=[%s]", object, methodName), e); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class CourrentTest implements ProducerTemplate<String>, ConsumerTemplate<String>{ @Override public void production(Context<String> context) throws Exception { for(int i = 0;i<1000;i++){ Thread.sleep(10); if(!context.offerDataToConsumptionQueue(i+"")){ return; } } } @Override public void consumption(Context<String> context) throws Exception { while (true) { String str = context.pollDataFromConsumptionQueue(); if (str == null) { break; } Thread.sleep(1000); System.out.println(str); } } public static void main(String[] args) throws Exception{ CourrentTest courrentTest = new CourrentTest(); new Coordinator(new Context<String>(),10).start(courrentTest,courrentTest); } }
![upload successful]()
PS: 上述代码可以在我的GitHub项目里找到。