Chinaunix首页 | 论坛 | 博客
  • 博客访问: 83681
  • 博文数量: 22
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 221
  • 用 户 组: 普通用户
  • 注册时间: 2015-08-28 11:18
文章分类

全部博文(22)

文章存档

2021年(4)

2020年(1)

2016年(10)

2015年(7)

我的朋友

分类: Java

2021-11-10 18:48:18


 xxl-job执行器路由选择策略

 

- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括:


        FIRST(第一个):固定选择第一个机器;

        LAST(最后一个):固定选择最后一个机器;

        ROUND(轮询):;

        RANDOM(随机):随机选择在线的机器;

        CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。

        LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;

        LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;

        FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

        BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

        SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;



路由策略枚举:


public enum ExecutorRouteStrategyEnum {


    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),

    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),

    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),

    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),

    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),

    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),

    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),

    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),

    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),

    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);



具体每一种路由策略实现方式:


具体代码路径:


com/xxl/job/admin/core/route/strategy



1. 获取地址列表中的第一个:

public class ExecutorRouteFirst extends ExecutorRouter {


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList){

        return new ReturnT(addressList.get(0));

    }


}


2. 获取地址列表中的最后一个:

public class ExecutorRouteLast extends ExecutorRouter {


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {

        return new ReturnT(addressList.get(addressList.size()-1));

    }


}



3. 轮询: 缓存时间是1天, 叠加次数做多为一百万,超过后进行重置,但是重置时采用随机方式,随机到一个小于100的数字,基于计数器,对地址列表求摸


public class ExecutorRouteRound extends ExecutorRouter {


    private static ConcurrentMap routeCountEachJob = new ConcurrentHashMap<>();

    private static long CACHE_VALID_TIME = 0;


    private static int count(int jobId) {

        // cache clear

        if (System.currentTimeMillis() > CACHE_VALID_TIME) {

            routeCountEachJob.clear();

            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;

        }


        AtomicInteger count = routeCountEachJob.get(jobId);

        if (count == null || count.get() > 1000000) {

            // 初始化时主动Random一次,缓解首次压力

            count = new AtomicInteger(new Random().nextInt(100));

        } else {

            // count++

            count.addAndGet(1);

        }

        routeCountEachJob.put(jobId, count);

        return count.get();

    }


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {

        String address = addressList.get(count(triggerParam.getJobId())%addressList.size());

        return new ReturnT(address);

    }


}



4. 随机:


public class ExecutorRouteRandom extends ExecutorRouter {


    private static Random localRandom = new Random();


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {

        String address = addressList.get(localRandom.nextInt(addressList.size()));

        return new ReturnT(address);

    }


}



5. 一致性哈希: 暂未查看




6. LFU: 缓存时间还是一天,对地址列表进行筛选,如果新加入的地址列表或者使用次数超过一百万次的话,就会随机重置为小于地址列表地址个数的值。 最后返回的就是value值最小的地址


public class ExecutorRouteLFU extends ExecutorRouter {


    private static ConcurrentMap jobLfuMap = new ConcurrentHashMap();

    private static long CACHE_VALID_TIME = 0;


    public String route(int jobId, List addressList) {


        // cache clear

        if (System.currentTimeMillis() > CACHE_VALID_TIME) {

            jobLfuMap.clear();

            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;

        }


        // lfu item init

        HashMap lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;

        if (lfuItemMap == null) {

            lfuItemMap = new HashMap();

            jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖

        }


        // put new

        for (String address: addressList) {

            if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {

                lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力

            }

        }

        // remove old

        List delKeys = new ArrayList<>();

        for (String existKey: lfuItemMap.keySet()) {

            if (!addressList.contains(existKey)) {

                delKeys.add(existKey);

            }

        }

        if (delKeys.size() > 0) {

            for (String delKey: delKeys) {

                lfuItemMap.remove(delKey);

            }

        }


        // load least userd count address

        List lfuItemList = new ArrayList(lfuItemMap.entrySet());

        Collections.sort(lfuItemList, new Comparator() {

            @Override

            public int compare(Map.Entry o1, Map.Entry o2) {

                return o1.getValue().compareTo(o2.getValue());

            }

        });


        Map.Entry addressItem = lfuItemList.get(0);

        String minAddress = addressItem.getKey();

        addressItem.setValue(addressItem.getValue() + 1);


        return addressItem.getKey();

    }


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {

        String address = route(triggerParam.getJobId(), addressList);

        return new ReturnT(address);

    }


}




7. LRU:缓存时间还是一天,对地址列表进行筛选, 采用LinkedHashMap实现LRU算法


其中LinkedHashMap的构造器中有一个参数:


//accessOrder 为true, 每次调用get或者put都会将该元素放置到链表最后,因而获取第一个元素就是当前没有使用过的元素



public class ExecutorRouteLRU extends ExecutorRouter {


    private static ConcurrentMap jobLRUMap = new ConcurrentHashMap();

    private static long CACHE_VALID_TIME = 0;


    public String route(int jobId, List addressList) {


        // cache clear

        if (System.currentTimeMillis() > CACHE_VALID_TIME) {

            jobLRUMap.clear();

            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;

        }


        // init lru

        LinkedHashMap lruItem = jobLRUMap.get(jobId);

        if (lruItem == null) {

            /**

             * LinkedHashMap

             *      aaccessOrdertrue=访问顺序排序(get/put时排序);false=插入顺序排期;

             *      bremoveEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;

             */

            //accessOrder 为true, 每次调用get或者put都会将该元素放置到链表最后,因而获取第一个元素就是当前没有使用过的元素

            lruItem = new LinkedHashMap(16, 0.75f, true);

            jobLRUMap.putIfAbsent(jobId, lruItem);

        }


        // put new

        for (String address: addressList) {

            if (!lruItem.containsKey(address)) {

                lruItem.put(address, address);

            }

        }

        // remove old

        List delKeys = new ArrayList<>();

        for (String existKey: lruItem.keySet()) {

            if (!addressList.contains(existKey)) {

                delKeys.add(existKey);

            }

        }

        if (delKeys.size() > 0) {

            for (String delKey: delKeys) {

                lruItem.remove(delKey);

            }

        }


        // load

        String eldestKey = lruItem.entrySet().iterator().next().getKey();

        String eldestValue = lruItem.get(eldestKey);

        return eldestValue;

    }


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {

        String address = route(triggerParam.getJobId(), addressList);

        return new ReturnT(address);

    }


}



8. FAILOVER  会返回第一个心跳检测ok的执行器,主要是使用xxl-job的执行器 RESTful API中的 beat


public class ExecutorRouteFailover extends ExecutorRouter {


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {


        StringBuffer beatResultSB = new StringBuffer();

        for (String address : addressList) {

            // beat

            ReturnT beatResult = null;

            try {

                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);

                beatResult = executorBiz.beat();

            } catch (Exception e) {

                logger.error(e.getMessage(), e);

                beatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e );

            }

            beatResultSB.append( (beatResultSB.length()>0)?"

":"")

                    .append(I18nUtil.getString("jobconf_beat") + ":")

                    .append("
address:").append(address)

                    .append("
code:").append(beatResult.getCode())

                    .append("
msg:").append(beatResult.getMsg());


            // beat success

            if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {


                beatResult.setMsg(beatResultSB.toString());

                beatResult.setContent(address);

                return beatResult;

            }

        }

        return new ReturnT(ReturnT.FAIL_CODE, beatResultSB.toString());


    }

}



9. BUSYOVER  会返回空闲的第一个执行器的地址,主要是使用xxl-job的执行器 RESTful API中的 idleBeat

ublic class ExecutorRouteBusyover extends ExecutorRouter {


    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {

        StringBuffer idleBeatResultSB = new StringBuffer();

        for (String address : addressList) {

            // beat

            ReturnT idleBeatResult = null;

            try {

                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);

                idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));

            } catch (Exception e) {

                logger.error(e.getMessage(), e);

                idleBeatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e );

            }

            idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"

":"")

                    .append(I18nUtil.getString("jobconf_idleBeat") + ":")

                    .append("
address:").append(address)

                    .append("
code:").append(idleBeatResult.getCode())

                    .append("
msg:").append(idleBeatResult.getMsg());


            // beat success

            if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {

                idleBeatResult.setMsg(idleBeatResultSB.toString());

                idleBeatResult.setContent(address);

                return idleBeatResult;

            }

        }


        return new ReturnT(ReturnT.FAIL_CODE, idleBeatResultSB.toString());

    }


}


10. 分片分发   暂未查看

阅读(2654) | 评论(0) | 转发(0) |
0

上一篇:Spring源码学习--2.Spring循环依赖问题

下一篇:没有了

给主人留下些什么吧!~~