分类: Java
2021-11-10 18:48:18
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
具体代码路径:
com/xxl/job/admin/core/route/strategy
1. 获取地址列表中的第一个:
public class ExecutorRouteFirst extends ExecutorRouter {
@Override
public ReturnT route(TriggerParam triggerParam, List addressList){
return new ReturnT(addressList.get(0));
}
}
2. 获取地址列表中的最后一个:
public class ExecutorRouteLast extends ExecutorRouter {
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
return new ReturnT(addressList.get(addressList.size()-1));
}
}
3. 轮询: 缓存时间是1天, 叠加次数做多为一百万,超过后进行重置,但是重置时采用随机方式,随机到一个小于100的数字,基于计数器,对地址列表求摸
public class ExecutorRouteRound extends ExecutorRouter {
private static ConcurrentMap routeCountEachJob = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;
private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
AtomicInteger count = routeCountEachJob.get(jobId);
if (count == null || count.get() > 1000000) {
// 初始化时主动Random一次,缓解首次压力
count = new AtomicInteger(new Random().nextInt(100));
} else {
// count++
count.addAndGet(1);
}
routeCountEachJob.put(jobId, count);
return count.get();
}
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
return new ReturnT(address);
}
}
4. 随机:
public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random();
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT(address);
}
}
5. 一致性哈希: 暂未查看
6. LFU: 缓存时间还是一天,对地址列表进行筛选,如果新加入的地址列表或者使用次数超过一百万次的话,就会随机重置为小于地址列表地址个数的值。 最后返回的就是value值最小的地址
public class ExecutorRouteLFU extends ExecutorRouter {
private static ConcurrentMap
private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// lfu item init
HashMap lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
if (lfuItemMap == null) {
lfuItemMap = new HashMap();
jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖
}
// put new
for (String address: addressList) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力
}
}
// remove old
List delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
}
}
// load least userd count address
List Collections.sort(lfuItemList, new Comparator @Override
public int compare(Map.Entry o1, Map.Entry o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Map.Entry addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
return addressItem.getKey();
}
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT(address);
}
}
7. LRU:缓存时间还是一天,对地址列表进行筛选, 采用LinkedHashMap实现LRU算法
其中LinkedHashMap的构造器中有一个参数:
//accessOrder 为true, 每次调用get或者put都会将该元素放置到链表最后,因而获取第一个元素就是当前没有使用过的元素
public class ExecutorRouteLRU extends ExecutorRouter {
private static ConcurrentMap private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List addressList) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// init lru
LinkedHashMap lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
/**
* LinkedHashMap
* a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
* b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
*/
//accessOrder 为true, 每次调用get或者put都会将该元素放置到链表最后,因而获取第一个元素就是当前没有使用过的元素
lruItem = new LinkedHashMap(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
}
// put new
for (String address: addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// remove old
List delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
}
}
// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT(address);
}
}
8. FAILOVER 会返回第一个心跳检测ok的执行器,主要是使用xxl-job的执行器 RESTful API中的 beat
public class ExecutorRouteFailover extends ExecutorRouter {
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT beatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
beatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e );
}
beatResultSB.append( (beatResultSB.length()>0)?" .append(I18nUtil.getString("jobconf_beat") + ":")
.append(" .append(" .append(" // beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
beatResult.setMsg(beatResultSB.toString());
beatResult.setContent(address);
return beatResult;
}
}
return new ReturnT(ReturnT.FAIL_CODE, beatResultSB.toString());
}
}
9. BUSYOVER 会返回空闲的第一个执行器的地址,主要是使用xxl-job的执行器 RESTful API中的 idleBeat
ublic class ExecutorRouteBusyover extends ExecutorRouter {
@Override
public ReturnT route(TriggerParam triggerParam, List addressList) {
StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT idleBeatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
} catch (Exception e) {
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e );
}
idleBeatResultSB.append( (idleBeatResultSB.length()>0)?" .append(I18nUtil.getString("jobconf_idleBeat") + ":")
.append(" .append(" .append(" // beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
idleBeatResult.setMsg(idleBeatResultSB.toString());
idleBeatResult.setContent(address);
return idleBeatResult;
}
}
return new ReturnT(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}
}
":"")
address:").append(address)
code:").append(beatResult.getCode())
msg:").append(beatResult.getMsg());
":"")
address:").append(address)
code:").append(idleBeatResult.getCode())
msg:").append(idleBeatResult.getMsg());
10. 分片分发 暂未查看