Chinaunix首页 | 论坛 | 博客
  • 博客访问: 28794
  • 博文数量: 6
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 112
  • 用 户 组: 普通用户
  • 注册时间: 2013-11-24 18:36
文章分类

全部博文(6)

文章存档

2013年(6)

我的朋友

分类: 其他平台

2013-11-26 09:46:02

当你需要给一个集群的某项服务做简单的排他性管理的时候,强力推荐Veripool公司的一系列模块:IPC::Locker、Schedule::Load。

今天先说IPC::Locker模块。部署很简单,直接在集群所有节点上运行cpanm IPC::Locker即可。该模块依赖几个都是perl的核心模块比如IO::Socket::INET、IO::Poll和POSIX。所以理论上你也可以把代码打个包分发。

随包分发的还有几个现成的脚本程序lockerd、lockersh、pidstat、pidstatd和pidwatch。

后面三个关注的remote设备上的pid是否存在等,但是相信一般情况下,我们不会自己来通过pid管理集群,所以在使用上只要理解lockerd和lockersh其实也是用pidstatd来解决pid问题的就够了。

其实代码很简单,看看就明白,无非就是lockerd用的IPC::Locker::Server是启动了一个IO::Socket::INET做tcp server,主要维护几个东西,一个是@{$self->{lock}}列表,一个是@{$self->{host}}列表,一个是$self->{locked}的Bool值。

而lockersh用的IPC::Locker则是连接上lockerd的端口,检查$self->{locked}状态,如果没locked就发送LOCK请求,然后fork一个进行exec你定义的shell命令,执行完成后,unlock发送UNLOCK请求给lockerd。

做个简单实验:

  1. 在serverA上运行lockerd &
  2. 在serverB上运行lockersh –dhost serverA –lock test_task ‘while true;do echo “OK”;done’
  3. 在serverC上运行lockersh –dhost serverA –lock test_task ‘while true;do echo “OK”;done’
  4. 在serverD上运行lockersh –dhost serverA –lock other_task ‘while true;do echo “OK”;done’

观察一下,结果是在serverB和serverD上同时在执行echo “OK”。而serverC被lock住了。继续:

  1. 在serverB的session上按下Ctrl+C终止程序,然后再次运行上述命令
  2. 在serverC的session上按下Ctrl+C终止程序

观察一下,结果是停止B时C的即开始,停止C的后B的继续。这些都不影响serverD的运行。

  1. 终止serverD的程序,改为运行lockersh –dhost serverA –lock test_task ‘while true;do echo “OK”;done’

观察一下,发现B、C、D是按照lockersh的执行次序解锁的。因为hostlist是一个列表,在server上是用for循环的。

注意:必须要先运行lockerd并且保证不中途退出。经过测试,如果lockerd中途退出再重新运行的话,因为locklist是保存在内存里会丢失的。结果就会出现之前的lockersh还在执行(他已经获得了lock,在unlock之前不会再和server通信的),之后再启动的新lockersh会在新lockerd上又获得一次lock的情况……

后一个Schedule::Load则可以根据集群设备的loadavg,top等,决定在哪台设备上运行job。还没测试。之后再记录。

补充:贴一个脚本,仿照lockersh改写的squid集群重启及报警控制:

#!/usr/bin/perl -w
use FindBin;
use lib "$FindBin::Bin/../lib";
use strict;
use warnings;
use autodie;
use vars qw ($Debug);
use Furl;
use IO::File;
use Getopt::Long;
use IPC::Locker;
use IPC::PidStat;

#======================================================================
my $pscount = `ps aux|grep -v grep|grep $0|wc -l`;
print "Already run, waiting for lock now" and exit unless $pscount == 1;

#======================================================================
my %server_params = ( lock => [] );
my $cluserv;
$Debug = 0;
Getopt::Long::config("require_order");
if (!GetOptions(
        "dhost=s" => sub { shift; $server_params{host} = shift; },
        "cluster=s" => sub {
            shift;
            push @{ $server_params{lock} }, split( ':', shift );
        },
        "port=i"    => sub { shift; $server_params{port}    = shift; },
        "timeout=i" => sub { shift; $server_params{timeout} = shift; },
        "verbose!"  => sub { shift; $server_params{verbose} = shift; },
        "debug"     => \&debug,
        "service=s" => \$cluserv,
    )
    )
{
    die "%Error: Bad usage, see lockersh --help\n";
}
$#{ $server_params{lock} } >= 0
    or die "%Error: --cluster not specified; see lockersh --help\n";

# Fork once to start parent process
my $foreground_pid = $$;    # Unlike most forks, the job goes in the parent

# Do this while we still have STDERR.
my $lock = new IPC::Locker(
    verbose        => 0,
    timeout        => 0,
    autounlock     => 1,
    destroy_unlock => 0,
    %server_params,
);
$lock or die "%Error: Did not connect to lockerd,";
$lock->lock;
if ( my $pid = fork() ) {    # Parent process, foreground job
    print "\tForeground: $cluserv\n" if $Debug;

    # The child forks again quickly.  Sometimes, SIG_CHLD leaks to us and
    # wrecks the exec'd command, so wait for it now.
    my $rv = waitpid( $pid, 0 );
    if ( $rv != $pid ) {
        die "%Error: waitpid() returned $rv: $!";
    }
    elsif ($?) {
        die "%Error: Child process died with status $?,";
    }
    print "Exec in $$\n" if $Debug;
    &service($cluserv);
}

#else, rest is for child process.
# Disassociate from controlling terminal
POSIX::setsid() or die "%Error: Can't start a new session: $!";

# Change working directory
chdir "/";
open( STDIN, "+>/dev/null" ) or die "%Error: Can't re-open STDIN: $!";
if ( !$Debug ) {
    open( STDOUT, "+>&STDIN" );
    open( STDERR, "+>&STDIN" );
}

# Prevent possibility of acquiring a controlling terminal
exit(0) if fork();

# Wait for child to complete.  We can't waitpid, as we're not the parent
while ( IPC::PidStat::local_pid_exists($foreground_pid) ) { sleep 1; }
print "Parent $foreground_pid completed\n" if $Debug;

# Unlock
$lock->unlock;
$lock = undef;
print "Child exiting\n" if $Debug;

sub debug {
    $Debug              = 1;
    $IPC::Locker::Debug = 1;
}

sub service {
    my $cluserv = shift;
    die "Only support squid now!" unless $cluserv eq "squid";
    die "Reload failed. Check squid.conf!" if eval "${cluserv}_reload";
    while (1) {
        my $hit_rate = eval "${cluserv}_check";
        notify "HIT Ratio: ${hit_rate}% now.\n";
        exit if $hit_rate > 50;
        sleep 300;
    }
}

sub squid_check {
    my $hit_rate;
    print "Run squid_check" if $Debug;
    my $squid_port = `awk '/^http_port/{print $2}' /etc/squid/squid.conf`;
    open my $fh, "squidclient -p ${squid_port} mgr:info |";
    while (<$fh>) {
        next unless /^\s+Request Hit Ratios:\s+5min:\s*(-?\d+\.\d)%,/;
        print "regex $1" if $Debug;
        $hit_rate = $1;
        last;
    }
    close $fh;
    return $hit_rate;
}

sub squid_reload {
    print "Reload squid daemon. Do not reload within 10 mins of squid start"
        if $Debug;
    system( "squid", "-k", "reconfigure" );
    return $?;
}

sub notify {
    my $furl = Furl->new( agent => "Clustrol/0.1" );
    $furl->post( "", [ data => "$_" ], );
}
__END__
 
阅读(747) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~