Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1522
  • 博文数量: 8
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 90
  • 用 户 组: 普通用户
  • 注册时间: 2023-07-18 09:31
文章分类

全部博文(8)

文章存档

2023年(8)

我的朋友
最近访客

分类: Java

2023-07-25 14:44:27

池化思想主要就是某些资源创建起来很费事(也很费时)。那么这些资源我先花点时间创建好一批,然后把这些资源都放在一个池子里面,要用的时候去拿,不用的时候就归还给池子。比如数据库链接池,线程池。所以JAVA线程池最原始的需求就是提高性能。
说实话业务场景上我其实用的也不多,唯一一次用就是在一个项目中,需要做数据迁移(重构啊,业务边界重新划分都会碰到这种事情,或者是分库分表等。我碰到的是和其它部门业务边界从新划分,要接手他们的业务,需要把历史存量的数据全部迁移过来,当时为了提升单机性能,就用了多线程来迁移,然后想着既然没用过,就玩玩吧,就用了一次线程池。笔者有幸碰到2次这种事情,有一次是风险极高的迁移,另一次只能算是高风险,嗯,居然没出问题,能活下来真是万幸)。
先看个简单的代码入门下:
要干活的类

点击(此处)折叠或打开

  1. package sty.zchi.threadpool;

  2. public class TempRun implements Runnable{
  3.     private int no;

  4.     public TempRun(int no) {
  5.         this.no = no;
  6.     }
  7.     public void run() {
  8.         for(int i = 0; i < 6; i++) {
  9.             System.out.println(Thread.currentThread().getName() + " no = " + no);
  10.             try {
  11.                 Thread.sleep(3000);
  12.             } catch (Exception e) {
  13.                 e.printStackTrace();
  14.             }
  15.         }
  16.     }
  17. }

主类

点击(此处)折叠或打开

  1. package sty.zchi.threadpool;

  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;

  4. public class MainTest1 {
  5.     public static void main(String[] args) {
  6. // ExecutorService executorService = Executors.newCachedThreadPool();
  7.         ExecutorService executorService = Executors.newFixedThreadPool(3);
  8.         TempRun tempRun1 = new TempRun(1);
  9.         TempRun tempRun2 = new TempRun(2);
  10.         TempRun tempRun3 = new TempRun(3);
  11.         TempRun tempRun4 = new TempRun(4);
  12.         executorService.execute(tempRun1);
  13.         executorService.execute(tempRun2);
  14.         executorService.execute(tempRun3);

  15.         executorService.execute(tempRun4);
  16.     }
  17. }
看看运行结果

看下结果,就不难理解了。其中当池子里面没有资源的话,主线程的语句会被阻塞住,这个应该好理解。

好吧,要说些复杂的内容了。
关于cachedthreadpool和fixedthreadpool
先说结论:cachedthreadpool适合响应时间要求高,且数据量固定的任务。而fixedthreadpool适用于资源有限,数据量不可控的任务。
cachedthreadpool:它不限制创建线程个数,当碰到池子线程资源不够时候,则会动态创建线程供你使用,所以控制不好,会导致OOM(也就是创建的线程太多了)。另外默认的情况下,如果长达60S不用,还会销毁线程资源。
fixedthreadpool:他会限制创建线程的个数,不会动态补充资源。所以当要处理的事情过多超过了其资源,那就挂着等着吧。另外线程就算是限制了,也不会不销毁,除非你手动把他给销毁了。

说到这里有人就会说了,那如果默认60S销毁,我想延长点,比如2分钟后没人用再销毁。也就是我对线程池的控制想要更加灵活点,有什么办法。那么就看看ThreadPoolExecutor
先写一个简单的代码,大家感受下

点击(此处)折叠或打开

  1. package sty.zchi.threadpool;

  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.LinkedBlockingQueue;
  5. import java.util.concurrent.ThreadFactory;
  6. import java.util.concurrent.ThreadPoolExecutor;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.atomic.AtomicInteger;

  9. public class MainTest2 {
  10.     private final static AtomicInteger count = new AtomicInteger(0);
  11.     public static void main(String args[]) {
  12.         ThreadFactory threadFactory = new ThreadFactory() {
  13.             @Override
  14.             public Thread newThread(Runnable r) {
  15.                 int c = count.incrementAndGet();
  16.                 Thread t = new Thread(r);
  17.                 t.setName("zchi-thread-" + c);
  18.                 return t;
  19.             }
  20.         };
  21.         BlockingQueue<Runnable> tempBlockList = new LinkedBlockingQueue<>(2);
  22.         ExecutorService executorService = new ThreadPoolExecutor(3, 5,
  23.                 30, TimeUnit.SECONDS, tempBlockList, threadFactory,
  24.                 new ThreadPoolExecutor.AbortPolicy());
  25.         TempRun tempRun1 = new TempRun(1);
  26.         TempRun tempRun2 = new TempRun(2);
  27.         TempRun tempRun3 = new TempRun(3);
  28.         TempRun tempRun4 = new TempRun(4);
  29.         TempRun tempRun5 = new TempRun(5);
  30.         TempRun tempRun6 = new TempRun(6);
  31.         executorService.execute(tempRun1);
  32.         executorService.execute(tempRun2);
  33.         executorService.execute(tempRun3);
  34.         executorService.execute(tempRun4);
  35.         executorService.execute(tempRun5);
  36.         executorService.execute(tempRun6);
  37.         System.out.println(tempBlockList.size());
  38.     }
  39. }

好吧,运行结果大家自己跑跑看,这里我们重点看下ExecutorService executorService = new ThreadPoolExecutor(3, 5,30, TimeUnit.SECONDS, tempBlockList, threadFactory,new ThreadPoolExecutor.AbortPolicy()); 为了方便阅读,我们贴下代码ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

  • corePoolSize:核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。
  • workQueue:这个放前面,因为这样读文章会更好一点,缓冲队列,当我们提交了一堆RUNABLE时候,发现大于corePoolSize,比如我提交了4个RUNABLE,但corePoolSize是3,这个时候剩下1个Runable就会被暂时搁置,放在这个workQueue里面,等到有线程空闲下来,再被拿出来执行。
  • maximumPoolSize:最大线程数,这就好理解了。这里有个点注意,小于等于corePoolSize的线程,只要有需求(也就是有RUNABLE过来了)就会被创建。大于corePoolSize而小于等于maximumPoolSize的线程就不是这么容易创建了,他必需等RUNABLE堆满了workQueue,才会创建线程去执行。比如corePoolSize=3, maximumPoolSize=6, workQueue.size=2时候,你只有一下扔进来6个,打满workQueue,这个时候就会创建更多的线程。
  • keepAliveTime和TimeUnit这个就好理解了,没事干的时候,线程活多久。一个是数值,一个是单位。
  • ThreadFactory:线程工厂,随便看看就好。
  • handler:饱和拒绝策略,当你线程也到了maximumPoolSize, workQueue也被装满了,你再想塞RUNABLE进来就要拒绝了。这里就是告诉他怎么拒绝,下面有几个默认的决绝策略
  • new ThreadPoolExecutor.AbortPolicy()
    默认的策略,再来的Runable直接扔掉,还给你抛异常。扔的是想进来的数据
    new ThreadPoolExecutor.DiscardPolicy();
    照样扔数据,但不抛异常。扔的是想进来的数据
    new ThreadPoolExecutor.DiscardOldestPolicy();
    照样扔数据,但不抛异常。但扔的是被压在workQueue队头的数据
    new ThreadPoolExecutor.CallerRunsPolicy();
    不扔数据,也不敢了,直接让主线程上来搞。

    好吧,回过头来再看看Executor创建线程池,这个就不说了,自己看看代码就好。

阅读(54) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~