Chinaunix首页 | 论坛 | 博客
  • 博客访问: 7529
  • 博文数量: 7
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 45
  • 用 户 组: 普通用户
  • 注册时间: 2014-09-26 17:51
文章分类
文章存档

2015年(1)

2014年(6)

我的朋友
最近访客

分类: Java

2014-12-09 09:44:57

1.为什么要用线程池   
线程也是一种对象,万事万物皆对象,线程池也就是对象池,通过将大对象池化,来达到对象复用,从而节省对象的内存开销以及对象的创建开销,设计模式中的享元模式就是将对象池化的一种模式,比较常听说的还有数据库连接池。对象池经常池化大对象和创建时间较长的对象,由于Java线程依赖于操作系统内核线程,所以线程创建需要切换到内核态,创建和销毁的消耗比较大。 

另外线程属于稀缺资源,不能无限制的创建,通过线程池可以进行统一的创建和监控。 遇到了性能瓶颈,吞吐量上不去的时候,并不是一味的加大线程数就能提高性能,线程达到一定的程度,上下文切换会消耗很大的CPU,并且可能是在做无用功,所以线程数量要有一个度,凡事都应该有个度,常言道"过犹不及"。要找到最佳的线程数,需要根据应用的类型,比如CPU密集型还是IO密集型,CPU的个数,IO等待和执行的比率,以及程序串行化的比率等因素综合考虑,并通过性能测试来具体验证才能获得最优配置,毕竟实践是检验真理的唯一标准嘛,废话少说,抓紧时间看看线程池吧。 

2.类图


这里最关键的是ThreadPoolExecutor类,隐藏了内部类和方法,不然太多太乱了,这样比较简洁,简洁是王道,简洁而非简陋。Executors提供了几个推荐使用的工厂方法。BlockingQueue用于存放暂时不能执行的线程队列。下面分别来看看这个关键类。 

3. ThreadPoolExecutor的构造方法 

[java] view plaincopy
  1. /**  
  2. * @param corePoolSize the number of threads to keep in the  
  3. * pool, even if they are idle.  
  4. * @param maximumPoolSize the maximum number of threads to allow in the  
  5. * pool.  
  6. * @param keepAliveTime when the number of threads is greater than  
  7. * the core, this is the maximum time that excess idle threads  
  8. * will wait for new tasks before terminating.  
  9. * @param unit the time unit for the keepAliveTime  
  10. * argument.  
  11. * @param workQueue the queue to use for holding tasks before they  
  12. * are executed. This queue will hold only the Runnable  
  13. * tasks submitted by the execute method.  
  14. * @param threadFactory the factory to use when the executor  
  15. * creates a new thread.  
  16. * @param handler the handler to use when execution is blocked  
  17. * because the thread bounds and queue capacities are reached.  
  18. *  */   
  19. public ThreadPoolExecutor(int corePoolSize,   
  20.                           int maximumPoolSize,   
  21.                           long keepAliveTime,   
  22.                           TimeUnit unit,   
  23.                           BlockingQueue workQueue,   
  24.                           ThreadFactory threadFactory,   
  25.                           RejectedExecutionHandler handler)   

corePoolSize:即使他们(线程)是空闲的,仍然保存在池中的线程数。 
maximumPoolSize:线程池允许的最大线程数。 
keepAliveTime:如果线程数大于corePoolSize,如果线程超过这个时间,将会被干掉。 
unit:跟keepAliveTime对应的时间单位。 
workQueue:该队列用户存放未被执行的任务。仅仅存放通过execute方法提交的任务。 
threadFactory:用于创建新线程的工厂。 
handler:如果线程不能被执行的拒绝策略。 

其他的都容易理解,corePoolSize 比较容易引起误解,下面解释一下corePoolSize和线程池的处理流程。 
corePoolSize:同其他的线程池不同,线程池并不是一上来就创建corePoolSize的线程数等待执行,一开始是空的线程池,提交一个任务到线程池中才会创建一个线程来执行。在线程数未达到corePoolSize之前,即使池中有空闲的线程,也仍然会创建一个新的线程来执行。 
corePoolSize的线程不会被回收。 


4.Executors的几个工厂方法 
作者建议使用Executors的方法来创建ThreadPoolExecutor对象。 

[java] view plaincopy
  1. public static ExecutorService newFixedThreadPool(int nThreads) {   
  2.     return new ThreadPoolExecutor(nThreads, nThreads,   
  3.                                   0L, TimeUnit.MILLISECONDS,   
  4.                                   new LinkedBlockingQueue());   
  5. }   
使用它则始终只有corePoolSize个线程在争抢CPU资源执行,其他的都得老老实实的在队列中候着。 
其中corePoolSize和maximumPoolSize相等,LinkedBlockingQueue是一个无界的workQueue。 
由于是个无界的queue,所以如果线程执行的时间很长,并且不停的向其提交新的任务的话,会一直放入queue,会导致系统垮掉。 
[java] view plaincopy
  1. public static ExecutorService newSingleThreadExecutor() {   
  2.     return new FinalizableDelegatedExecutorService   
  3.         (new ThreadPoolExecutor(11,   
  4.                                 0L, TimeUnit.MILLISECONDS,   
  5.                                 new LinkedBlockingQueue()));   
  6. }   
上面的一个特例,nThreads = 1 

[java] view plaincopy
  1. public static ExecutorService newCachedThreadPool() {   
  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
  3.                                   60L, TimeUnit.SECONDS,   
  4.                                   new SynchronousQueue());   
  5. }   
由于maximumPoolSize = Integer.MAX_VALUE,所以如果新提交任务时,所有的线程都在工作的话,会一直创建新的线程,直到内存溢出或者是系统垮掉。所以请务必记住,线程执行完毕之后,空闲60s的话,会被回收掉,不能占着茅坑不拉屎。就好像是你下载了很多,但是却一个也不跟着学习一样,是不会有进步的。如果空闲期间来了新的任务,则空闲线程正好可以被复用。很多人容易在这块忽略,并且这块一旦是出了问题,非常不容易定位。 
通常为了保障系统的高可用,要限制资源的使用,其中就包括线程资源,所以建议使用有界队列,这样虽然某些请求会失败,但是最起码可以保障整个系统的可用性。 


5.拒绝策略 
ThreadPoolExecutor中有几个实现RejectedExecutionHandler的内部类: 

AbortPolicy: 这个比较暴力,直接抛异常,拒绝执行。 
DiscardPolicy:这个是个空实现,直接丢弃。 
DiscardOldestPolicy:丢弃队列中的,执行新来的。喜新厌旧啊这个。 
CallerRunsPolicy: 使用调用者线程执行任务。 

6.BlockingQueue 
ArrayBlockingQueue:基于数组结构的阻塞队列。 
LinkedBlockingQueue:基于链表结构的阻塞队列。 Executors.newFiexedThreadPool()使用的就是这个队列。 
SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程的移除操作。Executors.newCachedThreadPool()就是使用这个队列。也就是说如果此时有空闲线程则转交给空闲线程处理,如果没有空闲线程则会增加新的线程来处理。 

7.线程池的关闭 
线程池的状态: 
RUNNING: 接受新任务,执行队列中的任务。 
SHUTDOWN: 不接受新任务,但是执行队列中的任务。RUNNING->SHUTDOWN,调用shutdown()方法。 
STOP:不接受新任务,不执行队列中的任务,并终止正在执行的任务。RUNNING或者SHUTDOWN->STOP,调用shutdownNow()方法。 
TERMIANTED:同STOP,但是所有线程已经terminated。 
线程池一定要记得shutdown(),这就跟创建了数据库资源finally中记得关闭一样,如Connection ResultSet PreparedStatement Statement,还有文件,网络socket对象,创建后都要记得关闭。 

代码示例:

[java] view plaincopy
  1. public class TestThreadPool {   
  2.   
  3.     private static int produceTaskSleepTime = 2;   
  4.   
  5.     private static int produceTaskMaxNumber = 10;   
  6.   
  7.     public static void main(String[] args) {   
  8.   
  9.         // 构造一个线程池   
  10.         ThreadPoolExecutor threadPool = new ThreadPoolExecutor(243,   
  11.                 TimeUnit.SECONDS, new ArrayBlockingQueue(3),   
  12.                 new ThreadPoolExecutor.DiscardPolicy());   
  13.   
  14.   
  15.        // ExecutorService threadPool = Executors.newCachedThreadPool();   
  16.   
  17.         for (int i = 1; i <= produceTaskMaxNumber; i++) {   
  18.             try {   
  19.                 String task = "task@ " + i;   
  20.                 System.out.println("创建任务并提交到线程池中:" + task);   
  21.                 threadPool.execute(new ThreadPoolTask(task));   
  22.   
  23.                 Thread.sleep(produceTaskSleepTime);   
  24.             } catch (Exception e) {   
  25.                 e.printStackTrace();   
  26.             }   
  27.         }   
  28.         threadPool.shutdown();   
  29.     }   
  30. }   
  31.   
  32. public class ThreadPoolTask implements Runnable, Serializable {   
  33.   
  34.     private Object attachData;   
  35.   
  36.     ThreadPoolTask(Object tasks) {   
  37.         this.attachData = tasks;   
  38.     }   
  39.   
  40.     public void run() {   
  41.   
  42.         System.out.println("开始执行任务:" + attachData);   
  43.         try{   
  44.             Thread.sleep(10000);   
  45.         }catch (InterruptedException e){   
  46.             e.printStackTrace();   
  47.         }   
  48.         attachData = null;   
  49.     }   
  50.   
  51.     public Object getTask() {   
  52.         return this.attachData;   
  53.     }   
  54. }   

阅读(783) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~