Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1416027
  • 博文数量: 264
  • 博客积分: 5810
  • 博客等级: 大校
  • 技术积分: 3528
  • 用 户 组: 普通用户
  • 注册时间: 2011-03-13 17:15
文章分类

全部博文(264)

文章存档

2011年(264)

分类:

2011-09-06 07:38:16

原文地址:内置定时调度(mongodb) 作者:liukaiyi

使用到工具:
 1. 存储 mongodb
 2. 定时任务 crontab

数据存储空间( database = fs )说明:
  collection =
          functions(方法定义)
          dispatch(调度定义)
          joblist(待运行任务集)
          result(结果)

0. 调度逻辑 :
   × 通过 定时任务 导入日志到 mongodb
   × 根据个性统计需求,写 分析脚本 ( functions ) - 方法定义 ,参数说明
   × 再 定义定时调度(dispatch) - 指定 定时周期 ,运行方法
   × crontab 触发 调度集合 生成job类表,再另一个crontab运行 joblist中的已经实例化好的任务
   × 结果存储到 result 中


*/2 * * * * sh /data/shell/gmodel/dw_model/cron_mongodb.sh joblist_create
1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59 * * * *   sh /data/shell/gmodel/dw_model/cron_mongodb.sh job_run

定时 系统调度 :

#!/bin/sh

act=$1
echo " -$act----------------------- " >> /tmp/mongo.log
if [ $act = "joblist_create" ]; then
 echo "joblist_create run .." >> /tmp/mongo.log
 echo `ps -ef|grep -v grep |grep cron_mongodb |grep joblist_create|wc -l` ;
 if [ `ps -ef|grep -v grep |grep cron_mongodb |grep joblist_create|wc -l` -lt "5" ] ;then
  /root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval "
    load('/data/shell/gmodel/dw_model/utils/utils.js');
    print( 'joblist_create ',new Date,tojson(joblist_create()) ); "
>> /tmp/mongo.log
 fi
fi



if [ $act = "job_run" ]; then
 echo "job_run run .." >> /tmp/mongo.log
 if [ `ps -ef|grep -v grep |grep cron_mongodb |grep job_run|wc -l` -lt "5" ] ;then
 /root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval "
    load('/data/shell/gmodel/dw_model/utils/utils.js');
    print( 'job_run ',new Date,tojson( job_run() ) ); "
>> /tmp/mongo.log
 fi
fi




1. 初始化( init_mong.js ) collection  :

var cfs = db.getMongo().getDB('fs').createCollection("functions") ;
var dfs = db.getMongo().getDB('fs').createCollection("dispatch") ;
var jfs = db.getMongo().getDB('fs').createCollection("joblist") ;
var rfs = db.getMongo().getDB('fs').createCollection("result") ;

var fs = db.getMongo().getDB('fs').functions ;
var ds = db.getMongo().getDB('fs').dispatch ;
var js = db.getMongo().getDB('fs').joblist ;
var rs = db.getMongo().getDB('fs').result ;

//方法名词 和版本号 联合 唯一
fs.ensureIndex({ "name":1,"version":1}, {unique: true});
//调度名称 唯一
ds.ensureIndex({ "name":1 }, {unique: true});
//任务list : 结果 = 名称 , 时间戳 , 动作戳( 方法参数 ) 联合唯一
js.ensureIndex({ "name":1,"active_stamp":1,
"timestamp":1 }, {unique: true});
rs.ensureIndex({ "name":1,"active_stamp":1,
"timestamp":1 }, {unique: true});

/** */
fs.drop();
ds.drop();
rs.drop() ;
js.drop();



2. function 定义 :

fs.save({
    // 包名
    "package" : "statistics" ,
    // function 名称 - 通过 utils 的 getf('statistics','test_statistics') 取得
    "name" : "test_statistics" ,
    // 描述
    "desc" : "测试 调度,没有实现用处" ,
    // 方法 版本号 getf('statistics','test_statistics','0.0')
    // 如果 getf 没有第3个参数 ,默认取最后一个版本
    "version" : "0.0" ,
    // 方法运行 参数 描述
    "param":{
       "ab":"产品ID",
       "st":"开始时间",
       "et":"结束时间"
    },
    // 方法体 运行 取传参 为 this.param.ab ....
   "body" : function(){

     sleep(15000);

     return {"ab":this.param.ab,"st":this.param.st,"et":this.param.et};

   }
});



3. 调度  定义 :

ds.save({
    // 调度 名称 描述
    "name" : "测试-1" ,
    "desc" : "" ,
    // 方法版本 和 方法名
    "version" : "0.0" ,
    "fun_name" : "test_statistics" ,

    // 方法 参数 逻辑定义
    "param" : {
          //调度当前时间 可以使用 内置 "dispatch_time" <时间对象> new Date ,
          //并且 可以使用 utils 中的方法
          // 其 当参数 遇到 @ -
          // 就会 load('utils.js') ; param.st = eval( .... ) ;
          "ab": "100008" ,
          "st": "@ time_displacement( dispatch_time ,'yyyy-MM-dd',-1 ) ",
          "et": "@ time_displacement( dispatch_time ,'yyyy-MM-dd' ) "
   },
   //定时调度 仿照( crontab )

   "run_timing" : {
     "minute" : "00" , //0-59
     "hour" : "*" , //0-23
     "day" : "*" , //1-31
     "weekday" : "*" //'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'
   },
   // 调度当前是否可用
   "is_use" : "true",
   // 此调度生成 job 最后次时间
   // 作用 - 异常处理,可以通过 此属性,补没有生成的 joblist
   "last_run" : "2010-04-21 00:00"
});




4. utils.js 时间操作 工具

在其 load 上 添加一个 方法

/* 时间位移
* time_displacement( new Date,'yyyy-MM-dd',-1 )
*/

function time_displacement(ndt,fmt,day,hour,min){
  if(typeof ndate == "string") ndt = parseDate(ndt) ;
  var ndate = new Date( ndt );

  if(typeof day != "undefined") ndate.setDate( ndate.getDate()+day );
  if(typeof hour != "undefined") ndate.setHours( ndate.getHours()+hour );
  if(typeof min != "undefined") ndate.setMinutes( ndate.getMinutes()+min );

  return formatDate(ndate,fmt);
}



5. utils.js  方法 function 取得

/**
* var fun = getf("assert","structure");
* fun.body fun.varsion
*/

function getf(pack_age,fname,vers){
  var dbs = 'fs';
  var fs_coll = db.getMongo().getDB(dbs).functions ;
  if(typeof vers == "undefined")
     var fun = eval(' fs_coll.find({ "package":"'+pack_age+'", "name":"'+fname+'" }).sort({"version":-1})[0] ');
  else
    var fun = eval(' fs_coll.findOne({ "package":"'+pack_age+'", "name":"'+fname+'","version":"'+vers+'" }) ');

  fun._id = "~";
  return fun ;
}



6. 调度核心方法之一 : 调度时间戳生成

/*
    生成 时间戳 :
        根据 输入 时间差 ,
        根据 定时 minute,hour,day,weekday 中查看是否有可能,触发运行
        比如 : 2010-04-10 00:01<> 2010-04-11 23:59
                在 00 10 * * 中 有 两次次触发
    方法运行 :
        get_timestamps( '2010-04-10 00:01','2010-04-11 23:59','00','10','*','*' )
        就是 2010-04-10 10:00 , 2010-04-11 10:00
    参数格式说明 :
        stime,etime > new Date \ 'yyyy-MM-dd'
        minute,hour,day = \d\d
        weekday = 'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'
*/

function get_timestamps( stime,etime,
        minute,hour,day,weekday ){
    if(typeof stime == "string") stime = parseDate(stime);
    if(typeof etime == "string") etime = parseDate(etime);

    if(stime.getTime() > etime.getTime() )return [];
    var et = formatDate(etime,'yyyy-MM-dd HH:mm');
    var min_cycle = null ;
    if( minute=="*" ) min_cycle = 1 ;
    else if( min_cycle == null && hour=="*" ) min_cycle = 1*60 ;
    else if ( min_cycle == null ) min_cycle = 1*60*24 ;

    print( "min_cycle = ",min_cycle );
    var tts = {};
    //stime.setMinutes( stime.getMinutes() + min_cycle ) ;
    while( stime.getTime() < etime.getTime() ){
        var st = formatDate(stime,'yyyy-MM-dd HH:mm');
        if( minute != "*" && minute!="" ) st = st.replace(/(.*)(\d{2})$/,"$1"+minute);
        if( hour != "*" && hour!="" ) st = st.replace(/(.*)(\d{2})(:\d{2})$/,"$1"+hour+"$3" ) ;
        if( day != "*" && day!="" )st = st.replace(/(\d{4}-\d{2}-)(\d{2})(.*)/,"$1"+day+"$3") ;

        if( ( weekday != "*" && weekday != ww ) || ( et < st) ) {
            stime.setMinutes( stime.getMinutes() + min_cycle ) ;
            continue ;
        }

        // yyyy-MM-dd HH:mm
        tts[ st ] = 0;
        stime.setMinutes( stime.getMinutes() + min_cycle ) ;
    }
    var arr = [];
    for(var c in tts ) { arr.push(c); }
    return arr ;
}





joblist 生成 :

/* 生成调度
     ntime 调度运行 环境时间 默认当前
joblist 生成 格式
{
    "_id" : ObjectId("4bd55d4a6d623399d78fd793"),
    "name" : "测试-1",
    "timestamp" : "2010-04-22 00:00",
    "active_stamp" : "",
    "func_obj" : {
        "_id" : "~",
        "package" : "statistics",
        "name" : "test_statistics",
        "desc" : "测试 调度,没有实现用处",
        "version" : "0.0",
        "param" : {
            "ab" : "100008",
            "st" : "2010-04-21",
            "et" : "2010-04-22"
        },
        "body" : function cf__24__f_cf__12__f_() {
                    sleep(15000);
                    return {ab:this.param.ab, st:this.param.st, et:this.param.et};
                }
    },
    "running_time" : {
        "dispatch_start" : "",
        "start" : "",
        "end" : ""
    },
    "level" : "3",
    "run_status" : "init",
    "result" : ""
}
*/

function joblist_create(ntime){

  if(typeof ntime == "undefined") var nt = new Date ;
  else if ( typeof ntime == "object" ) var nt = ntime ;
  else var nt = parseDate(ntime);

  var run_stat = {"all":0, "success":0,"error":0};
  var dbs = 'fs';
  var ds = db.getMongo().getDB(dbs).dispatch ;
  var js = db.getMongo().getDB(dbs).joblist ;

  var ff = ds.find({"is_use":"true"}) ;
  //遍历出所有 可用 调度定义

  while(ff.hasNext()){
    run_stat.all += 1 ;
    var disp = ff.next();
    try{
        //取得调度方法体

        var fun = getf('statistics',disp.fun_name,disp.version);

        //调度最晚生成 joblist 时间

        if(typeof disp.last_run == "undefined" || disp.last_run=="" ) disp.last_run = formatDate(nt,'yyyy-MM-dd HH:mm:ss');

        //根据 调度 最晚生成 joblist 时间 和 当前ntime 时间,取得 job 运行时间

        var tks = get_timestamps( disp.last_run , nt ,
                    disp.run_timing.minute,disp.run_timing.hour,disp.run_timing.day,disp.run_timing.weekday ) ;

        for(var i=0;i<tks.length;i++ ){

            // 调度参数生成中的 隐含对象 生成

            var dispatch_time = parseDate( tks[i] ) ;
            fun = getf('statistics',disp.fun_name,disp.version);

            var pks = [];
            // 参数实例化

            for(var pk in fun.param){
                pks.push(pk);
                var pv = disp.param[pk] ;
                if( /@/.test(pv) ) fun.param[pk] = eval( pv.replace(/^@(.*)$/,"$1") );
                else fun.param[pk]=pv ;
            }

            // 生成动作戳

            var active_stamp = "" ;
            var pkss = pks.sort();
            for(var ii;ii<pkss.length;ii++){
                active_stamp += pkss[ii]+fun.param[pkss[ii]];
            }

            if( js.count({ "name": disp.name,"active_stamp":active_stamp ,"timestamp":tks[i]}) != 0 ) continue ;
            js.save({
                "name": disp.name ,
                "timestamp" : tks[i] ,
                "active_stamp":active_stamp,

                "func_obj" : fun ,

                "running_time":{
                   "dispatch_start" : "" ,
                   "start" : "" ,
                   "end" : ""
                },

                "level" : "3" ,
                "run_status" : "init" ,// 初始等待 init ,计算等待 wait , 运行中 running , 异常结束 error ,正常结束 end

                "result" : "" // 结果去向 - 扩展

            });
            run_stat.success += 1 ;
            disp.last_run = tks[i] ;
        }
        ds.save( disp );
    }catch(err){ print(err); run_stat.error += 1 ;}
  }
  return run_stat ;
}




job_run :

// 运行

function job_run(ntime){


  if(typeof ntime == "undefined") var nt = new Date ;
  else if ( typeof ntime == "object" ) var nt = ntime ;
  else var nt = parseDate(ntime);

  var run_stat = {"all":0,"success":0,"error":0};
  var dbs = 'fs';

  var ds = db.getMongo().getDB(dbs).dispatch ;
  var js = db.getMongo().getDB(dbs).joblist ;
  var rs = db.getMongo().getDB(dbs).result ;

  var ajob = [];
  var ff = js.find({ "run_status":"init" }).sort({ "level":1 }).limit(5) ;
  var dispatch_start = formatDate(nt,'yyyy-MM-dd HH:mm');
  while(ff.hasNext()){
    run_stat.all += 1 ;
    try{
        var job = ff.next();
        job.run_status = "wait" ;
        job.running_time.dispatch_start = dispatch_start ;
        js.save(job);

        ajob.push(job);
    }catch(err){print(err);}
 }
 print("start run = ",run_stat.all );
 for(var i=0;i<ajob.length;i++){
    try{
        var job = ajob[i] ;
        if( rs.count({ "name":job.name,"active_stamp":job.active_stamp ,"timestamp":job.timestamp }) != 0 ) continue ;

        var fun = job.func_obj;
        job.run_status = "running" ;
        job.running_time.start = formatDate(nt,'yyyy-MM-dd HH:mm');
        js.save(job);

        var res = fun.body() ;

        rs.save({
            "name" : job.name ,
            "param" : fun.param ,
            "timestamp" : job.timestamp ,
            "active_stamp":job.active_stamp ,
            "result" : res
        });

        job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm');
        job.run_status = "end" ;
        js.save(job);
        run_stat.success += 1 ;
    }catch(err){
        print(err);
        job.run_status = "error" ;
        job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm');
        js.save(job);
        run_stat.error += 1 ;
    }
 }
 return run_stat ;
}


阅读(5788) | 评论(0) | 转发(0) |
0

上一篇:NOSQL精华必读

下一篇:sudo ssh

给主人留下些什么吧!~~