前言 Java中,创建线程的方式一般有三种方法:
继承Thread类创建线程
实现Runnable接口创建线程
使用Callable和Future创建线程
关于三种创建方法本文不再赘述。
可以看出,以上创建线程的方式,都缺乏对线程的管理,我们设想,如果线程在调用过程中使用了某一资源,当该资源处理缓慢或异常时,可能产生大量线程等待的情况,严重时可能造成OOM异常。
针对以上情况,应该对创建线程进行管理,这样线程池便产生了,好在在jdk1.5时,Doug Lea大神已经帮我们实现了这些功能,它们均在java.util.concurrent包下。建议大家想学习多线程,把该包下的源码理解,一定对多线程会有更深入的理解。
本文重点讲述线程池,会对以下这几个类(接口)进行重点讲解。
Executor,ExecutorService,Executors,AbstractExecutorService,ThreadPoolExecutor
线程池的创建 我们先来简单说下线程池的使用:
缓存型线程池
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 Executors.newCachedThreadPool
简单使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0 ; i < 5 ; i++) { final int index = i; try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { @Override public void run () { System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index); } }); }
定长线程池
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 Executors.newFixedThreadPool
简单使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5 ); for (int i = 0 ; i < 10 ; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run () { try { System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index); Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
定时及周期性任务性线程池 创建一个定长线程池,支持定时及周期性任务执行。 Executors.newScheduledThreadPool 简单使用: 1 2 3 4 5 6 7 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5 ); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run () { System.out.println("Thread id=" +Thread.currentThread().getId()+";5s后,每2s执行一次" ); } }, 5 , 2 , TimeUnit.SECONDS);
单线程型线程池
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 Executors.newSingleThreadExecutor
简单使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0 ; i < 10 ; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { @Override public void run () { try { System.out.println("Thread id=" +Thread.currentThread().getId()+";index=" +index); Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
自定义线程池
创建一个自定义线程池,以优化线程池。
根据Executors源码,可以看出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
线程池的实现使用ThreadPoolExecutor这个类实现的。这个类全参参数有以下几个:
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
① corePoolSize:核心线程池大小 取值范围 0-Integer.MaxValue ② maximumPoolSize:最大线程池大小 取值范围 0-Integer.MaxValue ③ keepAliveTime:线程空闲时存活时间 ④ unit:线程空闲时存活时间单位 ⑤ workQueue:工作队列类型,线程队列类型 队列分类: 直接提交策略:SynchronousQueue,其无法设置队列长度,所有线程均直接提交给线程池。 无界队列:LinkedBlockingQueue,如果默认不设置初始长度,这个队列是无界的,可缓存大量等待线程。 有界队列:ArrayBlockingQueue,必须设置初始长度,线程池满,且达到队列最大长度后执行拒绝策略。 ⑥ threadFactory:线程工厂 ⑦ handler:线程池饱和后的拒绝策略 ThreadPoolExecutor定义了四种,我们也可以自己定义: ThreadPoolExecutor.AbortPolicy:拒绝该任务并抛出异常 ThreadPoolExecutor.CallerRunsPolicy:直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务 ThreadPoolExecutor.DiscardOldestPolicy:直接丢弃正在执行的任务,并执行该任务 ThreadPoolExecutor.DiscardPolicy:丢弃该任务
可以看出,当业务情况复杂时,Executors里提供的几种基本的线程池已经不能满足我们的要求,需要我们根据情况自定义线程池,而且可以举个例子,比如对于newCachedThreadPool创建线程池的方法,它传入的maximumPoolSize为Integer的Max值,如果业务资源异常,创建大量线程而不释放,newCachedThreadPool这种创建线程池的方法也能导致OOM异常。
而我们声明最大线程池大小,并声明拒绝策略。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ExecutorService myExecutor = new ThreadPoolExecutor(5 , 10 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5 ), new AbortPolicy()); for (int i = 0 ; i < 11 ; i++) { final int index = i; try { myExecutor.execute(new Runnable() { @Override public void run () { System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index); try { Thread.sleep(10000000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } }
可以有效防止OOM异常以及及时发现系统运行问题。
自定义线程池也是被推荐的创建线程池的方法。
源码分析 下面我们主要对ThreadPoolExecutor这个类进行分析。
我们先看下它的execute方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
再看下addWorker方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { final ReentrantLock mainLock = this .mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
更多代码不一一赘述。上面代码基本是线程池的核心原理。
通俗点讲,线程池工作分为下面几步:
根据传入参数,设置核心线程池数量,最大线程池数量,拒绝策略,线程工作队列 当添加一个线程时,如果线程池线程数小于核心线程数,直接开启一个新线程执行任务。 如果核心线程池满了,那么把它添加到工作队列中。 如果核心线程池和工作队列都满了,则开启非核心线程执行任务。 如果全部都满了,执行拒绝策略。 以上就是对线程池的全部分析。
关于 我的个人博客:
https://www.sakuratears.top
GitHub地址:
https://github.com/javazwt
欢迎关注。