2018年(30)
分类: Java
2018-09-19 15:08:01
线程池作用
相对于为每个请求都创建一个线程,线程池通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销,当请求到达时,工作线程通过已经存在,不会由于等待创建线程而延迟任务的执行,从而提高响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败
线程池处理流程
1)判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程
2)判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程
3)判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
示意图:
创建线程池
ThreadPoolExecutor构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... //代码省略 }
一共七个参数:
corePoolSize
线程池中的核心线程数,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲的核心线程能够执行新任务也会创建线程,直到线程数等于corePoolSize就不再创建,继续提交的任务被保存到阻塞队列中。如果调用了线程池的prestartAllCoreThreads()或者prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程
maximumPoolSize
线程池最大线程数,如果当前阻塞队列满了,继续提交任务,若当前线程数小于maximumPoolSize则创建新的线程执行任务。注意如果使用了无界的阻塞队列这个参数就没什么效果
keepAliveTime
线程空闲时保持存活时间,即当线程没有任务执行时,继续存活的时间。若当前线程池的线程数超过corePoolSize,且线程空闲时间超过keepAliveTime,就将这些空闲线程销毁,尽可能降低资源销毁
unit
keepAliveTime的时间单位,可以是天、小时、分、毫秒、微秒和纳秒
workQueue
用于保存等待执行的任务的阻塞队列
threadFactory
创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字
handler
线程池的饱和策略(或者叫拒绝策略),当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。Java线程池提供了以下4种策略:
①.AbortPolicy:直接抛出异常,默认策略
②.CallerRunsPolicy:只用调用者所在线程来运行任务
③.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
④.DiscardPolicy:不处理,直接丢弃
也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略
调用Exectors中的静态工厂方法也可以来创建线程池
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } 复制代码
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量(corePoolSize == maximumPoolSize),这时线程池的规模将不再变化(若某个线程由于发生了未预期的Exception而结束,线程池会补充一个新线程),使用LinkedBlockingQuene作为阻塞队列,适用于负载比较重的服务器
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
创建一个可缓存线程的线程池,默认缓存60s,使用SynchronousQueue作为阻塞队列(没有数据缓存空间的阻塞队列,每一个put操作必须等待一个take操作,若任务提交的速度远远大于CachedThreadPool的处理速度,CachedThreadPool会不断地创建新线程来执行任务,可能会导致系统耗尽CPU和内存资源)。适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
单线程的Executor,线程池中只有一个线程,若线程异常结束,会创建另一个线程替代。newSingleThreadExecutor能确保依照任务在队列中的顺讯来串行执行,内部使用LinkedBlockingQueue作为阻塞队列,适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
可以延迟或定时的方式执行任务,适用于周期任务实现原理
线程池状态
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:
线程池能够接收新任务,且能处理阻塞队列中的任务
SHUTDOWN:
线程池不会接收新任务,但会处理阻塞队列中的任务(shutdown())
STOP:
线程池不会接收新任务,不会处理已添加的任务,并且会中断正在处理的任务(shutdownNow())
TIDYING:
所有的任务已终止,ctl记录的”任务数量”为0
TERMINATED:
线程池彻底终止(terminated())
任务提交
有两种方式向线程池提交任务,分别为execute()和submit()方法。execute()方法提交的任务不能获取返回值,而submit()方法提交的任务会返回一个future类型的对象,可以通过这个future对象判断任务是否执行成功
execute()
execute()方法执行示意图:
execute()源码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 若线程池当前线程数小于核心线程数则创建新线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 若线程数大于等于核心线程数或线程创建失败,则将当前任务放到工作队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 若当前任务无法放进阻塞队列中,则创建新的线程来执行任务 else if (!addWorker(command, false)) // addWoker创建失败,执行reject方法运行相应的拒绝策略 reject(command); } 复制代码
如果当前运行的线程少于corePoolSize,则会调用addWorker()创建新的线程来执行新的任务
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取当前线程池运行状态 int rs = runStateOf(c); // 状态判断,条件不符合添加线程失败 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程池当前线程数 int wc = workerCountOf(c); // 若线程数超过CAPACITY,返回false // 若是添加核心线程,超过核心线程数返回false;若不是超过最大线程数返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS线程数+1 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 若状态与之前不一样,跳到最外层循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { // 再次校验线程状态是否符合添加线程条件 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功后开启线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 复制代码
addWorker()添加线程时判断了两次线程状态是否符合添加线程的条件
第一次判断返回false:
①.线程池状态为STOP、TIDYING或TERMINATED状态
②.线程池状态为SHUTDOWN,任务不为null即线程处于SHUTDOWN状态,不允许添加任务
③.线程池状态为SHUTDOWN,任务为null,但阻塞队列为空,即添加空任务没有意义
第二次判断返回false:
①.线程池状态为STOP、TIDYING或TERMINATED状态
②.线程池状态为SHUTDOWN且任务不为null
线程添加成功后,调用start()方法启动线程,执行Worker类(继承AQS)的run()方法
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 释放锁,允许中断 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 若当前线程所需执行的任务不为空或阻塞队列中有任务 while (task != null || (task = getTask()) != null) { w.lock(); // 若线程池处于STOP、TIDYING或TERMINATED状态时,且线程没有中断标记,则请求中断线程 // 若线程池处于RUNNING或SHUTDOWN状态,且线程有中断标记,再次判断线程池状态是否>=STOP,若是请求中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 根据业务场景自定义方法 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 根据业务场景自定义方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 退出处理 processWorkerExit(w, completedAbruptly); } }
若当前线程的任务执行完,还会调用getTask()找阻塞队列中是否有任务
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // 获取线程池状态 int rs = runStateOf(c); // 若线程池状态为SHUTDOWN且阻塞队列为空,workerCount - 1,返回null // 若线程池状态为STOP、TIDYING或TERMINATED状态,workerCount - 1,返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 若需要超时控制,则调用poll(),否则调用take()从阻塞队列中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
从getTask()源码可以知道线程池中的线程执行完自身任务后会一直执行阻塞队列中的任务。当线程处理完阻塞队列的任务后或者处理任务时出现异常退出循环,会执行processWorkerExit()方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly:true,表明线程运行异常,workerCount-1 // completedAbruptly:false,表明运行正常getTask()方法中已减少线程数量 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 从workers移除,从线程池移除至多一个线程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试终止线程池 tryTerminate(); int c = ctl.get(); // 若当前线程池状态为RUNNING或SHUTDOWN, if (runStateLessThan(c, STOP)) { // 线程运行正常 if (!completedAbruptly) { // 若allowCoreThreadTimeOut为true,且等待队列有任务,至少保留一个线程 // 若allowCoreThreadTimeOut为false,线程数不少于corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 线程运行异常,调用addWorker()添加线程 addWorker(null, false); } }
方法先判断线程运行是否顺利,若运行出现异常将线程数减1。然后调用tryTerminate()尝试终止线程池。若当前线程池状态为RUNNING或SHUTDOWN,视情况是否添加线程
tryTerminate()方法
final void tryTerminate() { for (;;) { int c = ctl.get(); // 若线程池当前状态为RUNNING直接返回不终止 // 若状态为TIDYING或TERMINATED,即已经准备终止 // 若状态为SHUTDOWN且阻塞队列非空,需要执行完任务 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 若线程数不等于0,适当终止一个线程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // // 尝试终止线程池 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 子类实现 terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
submit()
submit()返回future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成。
public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }
在submit方法中调用newTaskFor()将Callable任务会被封装成FutureTask对象
protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable); }
FutureTask状态:
/** Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。
COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
INTERRUPTING:任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED,这是一个最终态。
所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)
FutureTask.get实现
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
若状态为NEW或者COMPLETING时调用awaitDone()对主线程进行阻塞
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 若主线程被中断,抛异常 if (Thread.interrupted()) { // 去除链表中超时或被中断节点 removeWaiter(q); throw new InterruptedException(); } int s = state; // 若状态大于COMPLETING,表明任务已完成,直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 若状态等于COMPLETING,让出cpu资源 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) // CAS设置链表(栈的逻辑结构) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); // 若超时,去除链表中超时或被中断节点 if (nanos <= 0L) { removeWaiter(q); return state; } // 限时祖塞 LockSupport.parkNanos(this, nanos); } else // 一直阻塞 LockSupport.park(this); } }
awaitDone()方法目的是主线程阻塞直至futureTask完成。若状态为COMPLETING,表明任务完成(无论成功或失败),但其结果被保存在outcome字段中,让出cpu资源;若状态大于COMPLETING表明任务完成且结果已存,直接返回;否则维护基于链表的等待栈根据是否限时阻塞线程节点
futureTask.run实现
public void run() { // 若任务完成或已有其他执行此任务 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; // 若任务不为空且状态为new if (c != null && state == NEW) { V result; boolean ran; try { // 执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // 防止并发调用run runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run()方法逻辑很简单,执行成功set()方法保存结果;执行异常setException()保存异常,最后runner置空防止并发调用,若任务被中断,handlePossibleCancellationInterrupt处理由于cancel(true)而取消中断的线程
set,setException方法:
/** * 任务执行成功 状态由NEW -> COMPLETING -> NORMAL */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } /** * 任务执行异常 状态NEW -> COMPLETING -> EXCEPTIONAL */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
两个方法都会finishCompletion()通知主线程任务已经执行完成
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中;
2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值,并通过LockSupport类unpark方法唤醒主线程;
线程池关闭
线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池
shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,其中先前提交的任务将被执行,但不会接受任何新任务
shutdownNow() :尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表
线程池配置
合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析
任务的性质:CPU密集型任务、IO密集型任务和混合型任务
任务的优先级:高、中和低
任务的执行时间:长、中和短
任务的依赖性:是否依赖其他系统资源,如数据库连接
性质不同的任务可以用不同规模的线程池分开处理。
CPU密集型任务:应配置尽可能小的线程,如配置Ncpu+1个线程的线程池
IO密集型任务:其线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu
混合型的任务:如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解
可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行,但优先级低的任务可能永远不能执行
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU
建议使用有界队列,使用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,系统可能会崩溃