池化思想主要就是某些资源创建起来很费事(也很费时)。那么这些资源我先花点时间创建好一批,然后把这些资源都放在一个池子里面,要用的时候去拿,不用的时候就归还给池子。比如数据库链接池,线程池。所以JAVA线程池最原始的需求就是提高性能。
说实话业务场景上我其实用的也不多,唯一一次用就是在一个项目中,需要做数据迁移(重构啊,业务边界重新划分都会碰到这种事情,或者是分库分表等。我碰到的是和其它部门业务边界从新划分,要接手他们的业务,需要把历史存量的数据全部迁移过来,当时为了提升单机性能,就用了多线程来迁移,然后想着既然没用过,就玩玩吧,就用了一次线程池。笔者有幸碰到2次这种事情,有一次是风险极高的迁移,另一次只能算是高风险,嗯,居然没出问题,能活下来真是万幸)。
先看个简单的代码入门下:
要干活的类
-
package sty.zchi.threadpool;
-
-
public class TempRun implements Runnable{
-
private int no;
-
-
public TempRun(int no) {
-
this.no = no;
-
}
-
public void run() {
-
for(int i = 0; i < 6; i++) {
-
System.out.println(Thread.currentThread().getName() + " no = " + no);
-
try {
-
Thread.sleep(3000);
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
主类
-
package sty.zchi.threadpool;
-
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
-
public class MainTest1 {
-
public static void main(String[] args) {
-
// ExecutorService executorService = Executors.newCachedThreadPool();
-
ExecutorService executorService = Executors.newFixedThreadPool(3);
-
TempRun tempRun1 = new TempRun(1);
-
TempRun tempRun2 = new TempRun(2);
-
TempRun tempRun3 = new TempRun(3);
-
TempRun tempRun4 = new TempRun(4);
-
executorService.execute(tempRun1);
-
executorService.execute(tempRun2);
-
executorService.execute(tempRun3);
-
-
executorService.execute(tempRun4);
-
}
-
}
看看运行结果
看下结果,就不难理解了。其中当池子里面没有资源的话,主线程的语句会被阻塞住,这个应该好理解。
好吧,要说些复杂的内容了。
关于cachedthreadpool和fixedthreadpool
先说结论:
cachedthreadpool适合响应时间要求高,且数据量固定的任务。而fixedthreadpool适用于资源有限,数据量不可控的任务。
cachedthreadpool:它不限制创建线程个数,当碰到池子线程资源不够时候,则会动态创建线程供你使用,所以控制不好,会导致OOM(也就是创建的线程太多了)。另外默认的情况下,如果长达60S不用,还会销毁线程资源。
fixedthreadpool:他会限制创建线程的个数,不会动态补充资源。所以当要处理的事情过多超过了其资源,那就挂着等着吧。另外线程就算是限制了,也不会不销毁,除非你手动把他给销毁了。
说到这里有人就会说了,那如果默认60S销毁,我想延长点,比如2分钟后没人用再销毁。也就是我对线程池的控制想要更加灵活点,有什么办法。那么就看看ThreadPoolExecutor
先写一个简单的代码,大家感受下
-
package sty.zchi.threadpool;
-
-
import java.util.concurrent.BlockingQueue;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.LinkedBlockingQueue;
-
import java.util.concurrent.ThreadFactory;
-
import java.util.concurrent.ThreadPoolExecutor;
-
import java.util.concurrent.TimeUnit;
-
import java.util.concurrent.atomic.AtomicInteger;
-
-
public class MainTest2 {
-
private final static AtomicInteger count = new AtomicInteger(0);
-
public static void main(String args[]) {
-
ThreadFactory threadFactory = new ThreadFactory() {
-
@Override
-
public Thread newThread(Runnable r) {
-
int c = count.incrementAndGet();
-
Thread t = new Thread(r);
-
t.setName("zchi-thread-" + c);
-
return t;
-
}
-
};
-
BlockingQueue<Runnable> tempBlockList = new LinkedBlockingQueue<>(2);
-
ExecutorService executorService = new ThreadPoolExecutor(3, 5,
-
30, TimeUnit.SECONDS, tempBlockList, threadFactory,
-
new ThreadPoolExecutor.AbortPolicy());
-
TempRun tempRun1 = new TempRun(1);
-
TempRun tempRun2 = new TempRun(2);
-
TempRun tempRun3 = new TempRun(3);
-
TempRun tempRun4 = new TempRun(4);
-
TempRun tempRun5 = new TempRun(5);
-
TempRun tempRun6 = new TempRun(6);
-
executorService.execute(tempRun1);
-
executorService.execute(tempRun2);
-
executorService.execute(tempRun3);
-
executorService.execute(tempRun4);
-
executorService.execute(tempRun5);
-
executorService.execute(tempRun6);
-
System.out.println(tempBlockList.size());
-
}
-
}
好吧,运行结果大家自己跑跑看,这里我们重点看下ExecutorService executorService = new ThreadPoolExecutor(3, 5,30, TimeUnit.SECONDS, tempBlockList, threadFactory,new ThreadPoolExecutor.AbortPolicy()); 为了方便阅读,我们贴下代码ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
-
corePoolSize:核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。
-
workQueue:这个放前面,因为这样读文章会更好一点,缓冲队列,当我们提交了一堆RUNABLE时候,发现大于corePoolSize,比如我提交了4个RUNABLE,但corePoolSize是3,这个时候剩下1个Runable就会被暂时搁置,放在这个workQueue里面,等到有线程空闲下来,再被拿出来执行。
-
maximumPoolSize:最大线程数,这就好理解了。这里有个点注意,小于等于corePoolSize的线程,只要有需求(也就是有RUNABLE过来了)就会被创建。大于corePoolSize而小于等于maximumPoolSize的线程就不是这么容易创建了,他必需等RUNABLE堆满了workQueue,才会创建线程去执行。比如corePoolSize=3, maximumPoolSize=6, workQueue.size=2时候,你只有一下扔进来6个,打满workQueue,这个时候就会创建更多的线程。
-
keepAliveTime和TimeUnit这个就好理解了,没事干的时候,线程活多久。一个是数值,一个是单位。
-
ThreadFactory:线程工厂,随便看看就好。
-
handler:饱和拒绝策略,当你线程也到了maximumPoolSize, workQueue也被装满了,你再想塞RUNABLE进来就要拒绝了。这里就是告诉他怎么拒绝,下面有几个默认的决绝策略
-
new ThreadPoolExecutor.AbortPolicy()
|
默认的策略,再来的Runable直接扔掉,还给你抛异常。扔的是想进来的数据
|
new ThreadPoolExecutor.DiscardPolicy();
|
照样扔数据,但不抛异常。扔的是想进来的数据
|
new ThreadPoolExecutor.DiscardOldestPolicy();
|
照样扔数据,但不抛异常。但扔的是被压在workQueue队头的数据
|
new ThreadPoolExecutor.CallerRunsPolicy();
|
不扔数据,也不敢了,直接让主线程上来搞。
|
好吧,回过头来再看看Executor创建线程池,这个就不说了,自己看看代码就好。
阅读(153) | 评论(0) | 转发(0) |