Chinaunix首页 | 论坛 | 博客
  • 博客访问: 117187
  • 博文数量: 165
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1655
  • 用 户 组: 普通用户
  • 注册时间: 2022-09-26 14:37
文章分类

全部博文(165)

文章存档

2024年(2)

2023年(95)

2022年(68)

我的朋友

分类: 架构设计与优化

2023-02-16 11:07:07

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、{BANNED}最佳大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和{BANNED}最佳大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

1.png

setMaximumPoolSize方法: 首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

2.png

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

/**
 * 动态线程池
 *
 */ public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
} 

(2)动态线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean { /**
     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */ private static final ConcurrentMap DTP_REGISTRY = new ConcurrentHashMap<>(); /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */ public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
        log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    } @Override public void afterPropertiesSet() throws Exception {
        this.refresh(); //创建定时任务线程池 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build()); //延迟1秒执行,每个1分钟check一次 executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }

    private void refresh() {
        String dynamicThreadPool = "";
        try {
            if (DTP_REGISTRY.isEmpty()) {
                log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {
                log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference>() {
            });
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) { doRefresh(properties);
            }
        } catch (Exception e) {
            log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    } /**
     * @param properties
     */ private void doRefresh(ThreadPoolProperties properties) {
        if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1 || properties.getMaxPoolSize() < 1 || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
            log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {
            log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
            threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }
       
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }

    private class RefreshThreadPoolConfig extends TimerTask {
        private RefreshThreadPoolConfig() {
        } @Override public void run() {
            DynamicThreadPoolRefresh.this.refresh();
        }
    }

} 

线程池配置类

@Data public class ThreadPoolProperties { /**
     * 线程池名称
     */ private String threadPoolBeanName; /**
     * 线程池核心线程数量
     */ private int corePoolSize; /**
     * 线程池{BANNED}最佳大线程池数量
     */ private int maxPoolSize;
} 

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

ducc配置平台使用见:?

动态线程池配置key:dynamic.thread.pool

配置value:

[  { "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor", "corePoolSize": 32, "maxPoolSize": 128  }] 

(4) 应用启动刷新应用本地动态线程池配置

@Slf4j public class DynamicThreadPoolPostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DynamicThreadPoolTaskExecutor) { DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        } return bean;
    }
} 

3.动态线程池应用

动态线程池Bean声明

  <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">  <property name="corePoolSize" value="128"/>  <property name="maxPoolSize" value="512"/>  <property name="queueCapacity" value="500"/>  <property name="keepAliveSeconds" value="60"/>  <property name="rejectedExecutionHandler">     <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> property> bean>  <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">  <property name="corePoolSize" value="32"/>  <property name="maxPoolSize" value="128"/>  <property name="queueCapacity" value="500"/>  <property name="keepAliveSeconds" value="60"/>  <property name="rejectedExecutionHandler">     <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> property> bean>  <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/> <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/> 

业务类注入Spring Bean后,直接使用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor; Runnable asyncTask = ()->{...}; CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor); 

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

阅读(312) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~