使用到工具:
1. 存储 mongodb
2. 定时任务 crontab
数据存储空间( database = fs )说明:
collection =
functions(方法定义)
dispatch(调度定义)
joblist(待运行任务集)
result(结果)
0. 调度逻辑 :
× 通过 定时任务 导入日志到 mongodb
× 根据个性统计需求,写 分析脚本 ( functions ) - 方法定义 ,参数说明
× 再 定义定时调度(dispatch) - 指定 定时周期 ,运行方法
× crontab 触发 调度集合 生成job类表,再另一个crontab运行 joblist中的已经实例化好的任务
× 结果存储到 result 中
*/2 * * * * sh /data/shell/gmodel/dw_model/cron_mongodb.sh
joblist_create 1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59 * * * * sh /data/shell/gmodel/dw_model/cron_mongodb.sh
job_run 定时 系统调度 :
#!/bin/sh
act=$1
echo "
-$act----------------------- " >> /tmp/mongo.log
if [ $act = "joblist_create"
]; then
echo "joblist_create run .."
>> /tmp/mongo.log
echo `ps -ef|grep -v grep |grep
cron_mongodb |grep joblist_create|wc -l` ;
if [ `ps -ef|grep -v grep |grep
cron_mongodb |grep joblist_create|wc -l` -lt "5" ] ;then
/root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval "
load('/data/shell/gmodel/dw_model/utils/utils.js');
print( 'joblist_create ',new Date,tojson(joblist_create()) ); "
>> /tmp/mongo.log
fi
fi
if [ $act = "job_run"
]; then
echo "job_run run .." >> /tmp/mongo.log
if [ `ps -ef|grep -v grep |grep
cron_mongodb |grep job_run|wc -l`
-lt "5" ] ;then
/root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval "
load('/data/shell/gmodel/dw_model/utils/utils.js');
print( 'job_run ',new Date,tojson( job_run() ) ); " >> /tmp/mongo.log
fi
fi
|
1. 初始化( init_mong.js ) collection :
var cfs = db.getMongo().getDB('fs').createCollection("functions") ;
var dfs = db.getMongo().getDB('fs').createCollection("dispatch") ;
var jfs = db.getMongo().getDB('fs').createCollection("joblist") ;
var rfs = db.getMongo().getDB('fs').createCollection("result") ;
var fs = db.getMongo().getDB('fs').functions ;
var ds = db.getMongo().getDB('fs').dispatch ;
var js = db.getMongo().getDB('fs').joblist ;
var rs = db.getMongo().getDB('fs').result ;
//方法名词 和版本号 联合 唯一
fs.ensureIndex({ "name":1,"version":1}, {unique: true});
//调度名称 唯一
ds.ensureIndex({ "name":1 }, {unique: true});
//任务list : 结果 = 名称 , 时间戳 , 动作戳( 方法参数 ) 联合唯一
js.ensureIndex({ "name":1,"active_stamp":1, "timestamp":1 }, {unique: true});
rs.ensureIndex({ "name":1,"active_stamp":1, "timestamp":1 }, {unique: true});
/** */
fs.drop();
ds.drop();
rs.drop() ;
js.drop();
|
2. function 定义 :
fs.save({
// 包名
"package" : "statistics" ,
// function 名称 - 通过 utils 的 getf('statistics','test_statistics') 取得
"name" : "test_statistics" ,
// 描述
"desc" : "测试 调度,没有实现用处" ,
// 方法 版本号 getf('statistics','test_statistics','0.0')
// 如果 getf 没有第3个参数 ,默认取最后一个版本
"version" : "0.0" ,
// 方法运行 参数 描述
"param":{
"ab":"产品ID",
"st":"开始时间",
"et":"结束时间"
},
// 方法体 运行 取传参 为 this.param.ab ....
"body" : function(){
sleep(15000);
return {"ab":this.param.ab,"st":this.param.st,"et":this.param.et};
}
});
|
3. 调度 定义 :
ds.save({
// 调度 名称 描述
"name" : "测试-1" ,
"desc" : "" ,
// 方法版本 和 方法名
"version" : "0.0" ,
"fun_name" : "test_statistics" ,
// 方法 参数 逻辑定义
"param" : {
//调度当前时间 可以使用 内置 "dispatch_time" <时间对象> new Date ,
//并且 可以使用 utils 中的方法
// 其 当参数 遇到 @ -
// 就会 load('utils.js') ; param.st = eval( .... ) ;
"ab": "100008" ,
"st": "@ time_displacement( dispatch_time ,'yyyy-MM-dd',-1 ) ",
"et": "@ time_displacement( dispatch_time ,'yyyy-MM-dd' ) "
},
//定时调度 仿照( crontab )
"run_timing" : {
"minute" : "00" , //0-59
"hour" : "*" , //0-23
"day" : "*" , //1-31
"weekday" : "*" //'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'
},
// 调度当前是否可用
"is_use" : "true",
// 此调度生成 job 最后次时间
// 作用 - 异常处理,可以通过 此属性,补没有生成的 joblist
"last_run" : "2010-04-21 00:00"
});
|
4. utils.js 时间操作 工具
在其 load 上 添加一个 方法
/* 时间位移
* time_displacement( new Date,'yyyy-MM-dd',-1 )
*/
function time_displacement(ndt,fmt,day,hour,min){
if(typeof ndate == "string") ndt = parseDate(ndt) ;
var ndate = new Date( ndt );
if(typeof day != "undefined") ndate.setDate( ndate.getDate()+day );
if(typeof hour != "undefined") ndate.setHours( ndate.getHours()+hour );
if(typeof min != "undefined") ndate.setMinutes( ndate.getMinutes()+min );
return formatDate(ndate,fmt);
}
|
5. utils.js 方法 function 取得
/**
* var fun = getf("assert","structure");
* fun.body fun.varsion
*/
function getf(pack_age,fname,vers){
var dbs = 'fs';
var fs_coll = db.getMongo().getDB(dbs).functions ;
if(typeof vers == "undefined")
var fun = eval(' fs_coll.find({ "package":"'+pack_age+'", "name":"'+fname+'" }).sort({"version":-1})[0] ');
else
var fun = eval(' fs_coll.findOne({ "package":"'+pack_age+'", "name":"'+fname+'","version":"'+vers+'" }) ');
fun._id = "~";
return fun ;
}
|
6. 调度核心方法之一 : 调度时间戳生成
/*
生成 时间戳 :
根据 输入 时间差 ,
根据 定时 minute,hour,day,weekday 中查看是否有可能,触发运行
比如 : 2010-04-10 00:01<> 2010-04-11 23:59
在 00 10 * * 中 有 两次次触发
方法运行 :
get_timestamps( '2010-04-10 00:01','2010-04-11 23:59','00','10','*','*' )
就是 2010-04-10 10:00 , 2010-04-11 10:00
参数格式说明 :
stime,etime > new Date \ 'yyyy-MM-dd'
minute,hour,day = \d\d
weekday = 'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'
*/
function get_timestamps( stime,etime,
minute,hour,day,weekday ){
if(typeof stime == "string") stime = parseDate(stime);
if(typeof etime == "string") etime = parseDate(etime);
if(stime.getTime() > etime.getTime() )return [];
var et = formatDate(etime,'yyyy-MM-dd HH:mm');
var min_cycle = null ;
if( minute=="*" ) min_cycle = 1 ;
else if( min_cycle == null && hour=="*" ) min_cycle = 1*60 ;
else if ( min_cycle == null ) min_cycle = 1*60*24 ;
print( "min_cycle = ",min_cycle );
var tts = {};
//stime.setMinutes( stime.getMinutes() + min_cycle ) ;
while( stime.getTime() < etime.getTime() ){
var st = formatDate(stime,'yyyy-MM-dd HH:mm');
if( minute != "*" && minute!="" ) st = st.replace(/(.*)(\d{2})$/,"$1"+minute);
if( hour != "*" && hour!="" ) st = st.replace(/(.*)(\d{2})(:\d{2})$/,"$1"+hour+"$3" ) ;
if( day != "*" && day!="" )st = st.replace(/(\d{4}-\d{2}-)(\d{2})(.*)/,"$1"+day+"$3") ;
if( ( weekday != "*" && weekday != ww ) || ( et < st) ) {
stime.setMinutes( stime.getMinutes() + min_cycle ) ;
continue ;
}
// yyyy-MM-dd HH:mm
tts[ st ] = 0;
stime.setMinutes( stime.getMinutes() + min_cycle ) ;
}
var arr = [];
for(var c in tts ) { arr.push(c); }
return arr ;
}
|
joblist 生成 :
/* 生成调度
ntime 调度运行 环境时间 默认当前
joblist 生成 格式
{
"_id" : ObjectId("4bd55d4a6d623399d78fd793"),
"name" : "测试-1",
"timestamp" : "2010-04-22 00:00",
"active_stamp" : "",
"func_obj" : {
"_id" : "~",
"package" : "statistics",
"name" : "test_statistics",
"desc" : "测试 调度,没有实现用处",
"version" : "0.0",
"param" : {
"ab" : "100008",
"st" : "2010-04-21",
"et" : "2010-04-22"
},
"body" : function cf__24__f_cf__12__f_() {
sleep(15000);
return {ab:this.param.ab, st:this.param.st, et:this.param.et};
}
},
"running_time" : {
"dispatch_start" : "",
"start" : "",
"end" : ""
},
"level" : "3",
"run_status" : "init",
"result" : ""
}
*/
function joblist_create(ntime){
if(typeof ntime == "undefined") var nt = new Date ;
else if ( typeof ntime == "object" ) var nt = ntime ;
else var nt = parseDate(ntime);
var run_stat = {"all":0, "success":0,"error":0};
var dbs = 'fs';
var ds = db.getMongo().getDB(dbs).dispatch ;
var js = db.getMongo().getDB(dbs).joblist ;
var ff = ds.find({"is_use":"true"}) ;
//遍历出所有 可用 调度定义
while(ff.hasNext()){
run_stat.all += 1 ;
var disp = ff.next();
try{
//取得调度方法体
var fun = getf('statistics',disp.fun_name,disp.version);
//调度最晚生成 joblist 时间
if(typeof disp.last_run == "undefined" || disp.last_run=="" ) disp.last_run = formatDate(nt,'yyyy-MM-dd HH:mm:ss');
//根据 调度 最晚生成 joblist 时间 和 当前ntime 时间,取得 job 运行时间
var tks = get_timestamps( disp.last_run , nt ,
disp.run_timing.minute,disp.run_timing.hour,disp.run_timing.day,disp.run_timing.weekday ) ;
for(var i=0;i<tks.length;i++ ){
// 调度参数生成中的 隐含对象 生成
var dispatch_time = parseDate( tks[i] ) ;
fun = getf('statistics',disp.fun_name,disp.version);
var pks = [];
// 参数实例化
for(var pk in fun.param){
pks.push(pk);
var pv = disp.param[pk] ;
if( /@/.test(pv) ) fun.param[pk] = eval( pv.replace(/^@(.*)$/,"$1") );
else fun.param[pk]=pv ;
}
// 生成动作戳
var active_stamp = "" ;
var pkss = pks.sort();
for(var ii;ii<pkss.length;ii++){
active_stamp += pkss[ii]+fun.param[pkss[ii]];
}
if( js.count({ "name": disp.name,"active_stamp":active_stamp ,"timestamp":tks[i]}) != 0 ) continue ;
js.save({
"name": disp.name ,
"timestamp" : tks[i] ,
"active_stamp":active_stamp,
"func_obj" : fun ,
"running_time":{
"dispatch_start" : "" ,
"start" : "" ,
"end" : ""
},
"level" : "3" ,
"run_status" : "init" ,// 初始等待 init ,计算等待 wait , 运行中 running , 异常结束 error ,正常结束 end
"result" : "" // 结果去向 - 扩展
});
run_stat.success += 1 ;
disp.last_run = tks[i] ;
}
ds.save( disp );
}catch(err){ print(err); run_stat.error += 1 ;}
}
return run_stat ;
}
|
job_run :
// 运行
function job_run(ntime){
if(typeof ntime == "undefined") var nt = new Date ;
else if ( typeof ntime == "object" ) var nt = ntime ;
else var nt = parseDate(ntime);
var run_stat = {"all":0,"success":0,"error":0};
var dbs = 'fs';
var ds = db.getMongo().getDB(dbs).dispatch ;
var js = db.getMongo().getDB(dbs).joblist ;
var rs = db.getMongo().getDB(dbs).result ;
var ajob = [];
var ff = js.find({ "run_status":"init" }).sort({ "level":1 }).limit(5) ;
var dispatch_start = formatDate(nt,'yyyy-MM-dd HH:mm');
while(ff.hasNext()){
run_stat.all += 1 ;
try{
var job = ff.next();
job.run_status = "wait" ;
job.running_time.dispatch_start = dispatch_start ;
js.save(job);
ajob.push(job);
}catch(err){print(err);}
}
print("start run = ",run_stat.all );
for(var i=0;i<ajob.length;i++){
try{
var job = ajob[i] ;
if( rs.count({ "name":job.name,"active_stamp":job.active_stamp ,"timestamp":job.timestamp }) != 0 ) continue ;
var fun = job.func_obj;
job.run_status = "running" ;
job.running_time.start = formatDate(nt,'yyyy-MM-dd HH:mm');
js.save(job);
var res = fun.body() ;
rs.save({
"name" : job.name ,
"param" : fun.param ,
"timestamp" : job.timestamp ,
"active_stamp":job.active_stamp ,
"result" : res
});
job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm');
job.run_status = "end" ;
js.save(job);
run_stat.success += 1 ;
}catch(err){
print(err);
job.run_status = "error" ;
job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm');
js.save(job);
run_stat.error += 1 ;
}
}
return run_stat ;
}
|
阅读(5500) | 评论(0) | 转发(2) |