Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5277441
  • 博文数量: 1144
  • 博客积分: 11974
  • 博客等级: 上将
  • 技术积分: 12312
  • 用 户 组: 普通用户
  • 注册时间: 2005-04-13 20:06
文章存档

2017年(2)

2016年(14)

2015年(10)

2014年(28)

2013年(23)

2012年(29)

2011年(53)

2010年(86)

2009年(83)

2008年(43)

2007年(153)

2006年(575)

2005年(45)

分类: LINUX

2009-08-30 16:51:38


perl多进程实战之四-BerkeleyDB
    上次的文档讨论的文件存储方式是DB_File,结论是它的并发性太差,tie的过程太慢,所以,在并发量高的情况下,必须采用另外的方案,其中的一种办法就是用BerkeleyDB。
    BDB也算是相当有名的数据库了,但是perl对它的封装感觉并不是很好,废话少说了,举例子:
 
#!/usr/bin/perl
# test_bdb_1.pl
# create by lianming: 2009-08-17
# last modify by lianming: 2009-08-17
 
use BerkeleyDB;
use strict;
use warnings;
 
use vars qw(%h $k $v);
 
my $filename = "test_1.db";
unlink $filename;
my $env = new BerkeleyDB::Env
        -Home   => "/opt/B_db_test/DB",
        -Flags  => DB_CREATE| DB_INIT_MPOOL
|| die "Cannot open environment: $BerkeleyDB::Error\n";
 
my $db = tie (%h, "BerkeleyDB::Btree",
    -Filename => $filename,
    -Flags => DB_CREATE,
    -Env => $env)
|| die " Cannot open file $filename:$! $BerkeleyDB::Error\n";
 

for (my $i = 0; $i < 10; $i++) {
        $h{$i} = $i*100;
}
 
while (($k, $v) = each(%h)) {
    print "$k->$v\n";
}
 
undef $db;
untie %h;
 
    执行结果如下:
 
0->0
1->100
2->200
3->300
4->400
5->500
6->600
7->700
8->800
9->900
 
    操作上和DB_File是差不多的,要先设定运行的环境,BerkeleyDB在运行的时候,会创建临时文件、日志文件等,所以需要先指定一个目录,不指定目录的话,就在当前目录下创建了。
    DB_INIT_MPOOL是初始化内存池,在内存中存放数据的意思。
    数据存储的方法有四种,hash,btree,recno,queue。前两种用来存放哈希值,后两种存放数组。
    也可以存放duplicate key,在创建db的时候,加上 “-Property  => DB_DUP”即可。
    BerkeleyDB还会用到共享内存来做cache,可以再env设置中用CacheSize来设定。
 
    api方法的调用,例如:
   
    $db->db_put($key, $value); 存储数据
    $db->db_del($key); 删除数据
    $db->db_get($key ,$value); 获取数据
 
    $cursor = $db->db_cursor(); 创建一个指向db的指针,同时也是一个读锁
    $cursor->c_get($key ,$value, $flag); 移动指针,并读取指针指向的记录的key和value
        其中的flag有以下几个:
        DB_FIRST:第一条记录
        DB_LAST:最后一条记录
        DB_NEXT:下一条记录
        DB_PREV:前一条记录
        DB_NEXT_DUP:如果有duplicate key存在的话,指向下一个相同key的记录
        DB_CURRENT:当前记录
 
    加锁:
 
    BerkeleyDB的锁叫cds,需要在tie之前在env里设置init_cds,加锁函数为cds_lock,返回一个lock,解锁的时候对这个lock调用方法cds_unlock或者直接undef掉即可。
 
    例子如下:
 
#!/usr/bin/perl
# test_bdb_2.pl
# create by lianming: 2009-08-17
# last modify by lianming: 2009-08-17
 
use BerkeleyDB;
use strict qw(vars);
use warnings;
 
use vars qw(%h $k $v);
 
my $filename = "test_1.db";
unlink $filename;
my $env = new BerkeleyDB::Env
        -Home   => "/opt/B_db_test/DB",
        -Flags  => DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB
|| die "Cannot open environment: $BerkeleyDB::Error\n";
 
my $db = tie (%h, "BerkeleyDB::Btree",
    -Filename => $filename,
    -Flags => DB_CREATE,
    -Env => $env)
|| die " Cannot open file $filename:$! $BerkeleyDB::Error\n";
 
for (my $i = 0; $i < 10; $i++) {
        $h{$i} = $i*100;
}
 
$db->db_del(11);
 
print "Print by cursor:\n";
my ($status, $k, $v) = (0,0,0);
my $cursor = $db->db_cursor();
for ($status = $cursor->c_get($k, $v, DB_FIRST);$status == 0;$status = $cursor->c_get($k, $v, DB_NEXT)) {
        print "$k->$v\n";
}
 
undef $cursor;
 
my $lock = $db->cds_lock();
$db->db_put(11, 1100);
$lock->cds_unlock;
 
print "Print by hash:\n";
while (($k, $v) = each(%h)) {
        print "$k->$v\n";
}
 
undef $lock;
undef $db;
untie %h;
 
    执行结果如下:
 
Print by cursor:
0->0
1->100
2->200
3->300
4->400
5->500
6->600
7->700
8->800
9->900
Print by hash:
0->0
1->100
11->1100
2->200
3->300
4->400
5->500
6->600
7->700
8->800
9->900
 
    DB_File的锁效率是很低的,我们可以来测一下它的锁怎么样:
 
#!/usr/bin/perl
# test_bdb_3.pl
# create by lianming: 2009-08-17
# last modify by lianming: 2009-08-17
 
use BerkeleyDB;
use strict qw(vars);
use warnings;
 
use vars qw(%h $k $v);
 
my $filename = "test_1.db";
unlink $filename;
my $env = new BerkeleyDB::Env
        -Home   => "/opt/B_db_test/DB",
        -Flags  => DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB
|| die "Cannot open environment: $BerkeleyDB::Error\n";
 
my $db = tie (%h, "BerkeleyDB::Btree",
    -Filename => $filename,
    -Flags => DB_CREATE,
    -Env => $env)
|| die " Cannot open file $filename:$! $BerkeleyDB::Error\n";
 

my $time = time();
print "Start: $time\n";
for (my $i = 0; $i < 1000000; $i++) {
        my $lock = $db->cds_lock();
        $h{$i} = $i*100;
        $lock -> cds_unlock();
}
$time = time();
print "End: $time\n";
 
undef $db;
untie %h;
 
    执行结果如下:
 
Start: 1250453694
End: 1250453775
    100w条数据插入,插入之前加锁,插入结束后解锁,耗时还不到100s,足可见它强大的并发性。但是,在多进程的时候,发现一个问题。
 
#!/usr/bin/perl
 
use strict qw(vars);
use warnings;
use BerkeleyDB;
#use IPC::SysV qw(IPC_PRIVATE S_IRWXU IPC_CREAT);
#use IPC::Semaphore;
 

use POSIX ":sys_wait_h";
 
our $zombies = 0;
our $procs = 0;
 
$SIG{CHLD} = sub { $zombies++ };
 
sub REAPER {
    my $pid;
    while (($pid = waitpid(-1, WNOHANG)) > 0) {
        $zombies --;
    }
}
 
sub child {
        my $cnt = $_[0];
        my %hash;
        my $file_name = "/opt/B_db_test/DB/Bt001.db";
        unlink $file_name;
        my ($k, $v);
 
        my $env = new BerkeleyDB::Env
                -Home => "/opt/B_db_test/DB",
                -Flags  => DB_CREATE| DB_INIT_CDB | DB_INIT_MPOOL || die "Cannot open environment: $BerkeleyDB::Error\n";
        sleep(3);
 
        my $db = tie (%hash, "BerkeleyDB::Btree",
                -Filename => $file_name,
                -Flags => DB_CREATE,
                -Env => $env) || die " Cannot open file $file_name:$! $BerkeleyDB::Error\n";
 
        for (my $i = 1; $i < 100000; $i++) {
                $k = $cnt*100000+$i;
                $v = $k*2;
                my $lock = $db->cds_lock();
                #print "$cnt get lock\n";
                $db->db_put($k, $v);
                undef $lock;
                #print "$cnt drop lock\n";
        }
        undef $db;
        untie %hash;
        my $time_end = time();
        print "$time_end: complete\n";
        exit 0;
}
 
for ($procs; $procs<10; $procs++) {
        my $pid = fork();
 
        if (!defined($pid)) {
                print "Fork Error: $!\n";
                exit 1;
        }
 
        if ($pid == 0) {
                &child($procs);
                exit 0;
        } else {
                my $time = time();
                print "$time:$procs process forked!\n";
                &REAPER if ($zombies > 0);
        }
        sleep(0.2);
}
 
exit 0;
 
    同时开10个进程,对它进行并发的插入,每个进程插入量为10w条,在执行的时候,我们就会发现一些问题了。执行结果如下:
 
1250454137:0 process forked!
1250454137:1 process forked!
1250454137:2 process forked!
1250454137:3 process forked!
1250454137:4 process forked!
1250454137:5 process forked!
1250454137:6 process forked!
1250454137:7 process forked!
1250454137:8 process forked!
1250454137:9 process forked!
 
    本来,进程fork和退出的时候都应该print一行出来,但是却只有fork时候的输出,没有结束时候的输出……用ps auxf看的时候,可以看到10个进程全部挂在那里不动了!!
 
root     17441  0.0  0.0 11376 4064 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17442  0.1  0.0 11376 3664 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17443  0.0  0.0 11376 3752 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17444  0.0  0.0 11376 3664 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17445  0.0  0.0 11376 3632 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17446  0.0  0.0 11376 3660 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17447  0.0  0.0 11376 3608 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17448  0.0  0.0 11376 3608 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17449  0.0  0.0 11376 3608 pts/0    S    04:22   0:00 perl lock_berk.pl
root     17450  0.0  0.0 11376 3608 pts/0    S    04:22   0:00 perl lock_berk.pl
 
    于是,去掉子进程里两行注释,让它在获得锁的时候打一行出来,放弃锁的时候也打一行出来,这次输出如下:
 
0 get lock
0 drop lock
0 get lock
0 drop lock
0 get lock
0 drop lock
0 get lock
0 drop lock
0 get lock
0 drop lock
0 get lock
0 drop lock
0 get lock
0 drop lock
5 get lock
 
    可以看到,进程5在刚刚进入,第一次获得了锁之后,就不动了。
    多试几次,就会发现,有时候是没问题的,但是大部分的时候总是有某个进程乱入的时候,获得锁之后会一下子卡住,于是别的进程得不到锁,也一起卡住了……
 
    再做一个小改动,就是在子进程tie之后,都sleep一段时间,等所有进程都完成tie之后再进行插入,这种问题就不会发生了……
 
#!/usr/bin/perl
 
use strict qw(vars);
use warnings;
use BerkeleyDB;
#use IPC::SysV qw(IPC_PRIVATE S_IRWXU IPC_CREAT);
#use IPC::Semaphore;
 

use POSIX ":sys_wait_h";
 
our $zombies = 0;
our $procs = 0;
 
$SIG{CHLD} = sub { $zombies++ };
 
sub REAPER {
    my $pid;
    while (($pid = waitpid(-1, WNOHANG)) > 0) {
        $zombies --;
    }
}
 
sub child {
        my $cnt = $_[0];
        my %hash;
        my $file_name = "/opt/B_db_test/DB/Bt001.db";
        unlink $file_name;
        my ($k, $v);
 
        my $env = new BerkeleyDB::Env
                -Home => "/opt/B_db_test/DB",
                -Flags  => DB_CREATE| DB_INIT_CDB | DB_INIT_MPOOL || die "Cannot open environment: $BerkeleyDB::Error\n";
 
        my $db = tie (%hash, "BerkeleyDB::Btree",
                -Filename => $file_name,
                -Flags => DB_CREATE,
                -Env => $env) || die " Cannot open file $file_name:$! $BerkeleyDB::Error\n";
        sleep(3);
 
        for (my $i = 1; $i < 100000; $i++) {
                $k = $cnt*100000+$i;
                $v = $k*2;
                my $lock = $db->cds_lock();
                $db->db_put($k, $v);
                undef $lock;
        }
        undef $db;
        untie %hash;
        my $time_end = time();
        print "$time_end: complete\n";
        exit 0;
}
 
for ($procs; $procs<10; $procs++) {
        my $pid = fork();
 
        if (!defined($pid)) {
                print "Fork Error: $!\n";
                exit 1;
        }
 
        if ($pid == 0) {
                &child($procs);
                exit 0;
        } else {
                my $time = time();
                print "$time:$procs process forked!\n";
                &REAPER if ($zombies > 0);
        }
}
 
exit 0;
 
    执行结果如下:
 
1250454587:0 process forked!
1250454587:1 process forked!
1250454587:2 process forked!
1250454587:3 process forked!
1250454587:4 process forked!
1250454587:5 process forked!
1250454587:6 process forked!
1250454588:7 process forked!
1250454588:8 process forked!
1250454588:9 process forked!
 
[root@cacti227.cm2 /home/lianming/file_db_test/bin]
# 1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
1250454676: complete
 
    同样是100w条记录的插入,用时和单进程差不多,也不会发生卡住的情况……看来,问题还是出在tie上,在DB被写入的时候tie,是会出问题的。
    解决的办法是再加一层锁,可以考虑用信号量。
 
    BerkeleyDB博大精深,不是一两篇文章可以介绍清楚的,有兴趣的可以下载下来去看文档,不过perl对它的封装是不咋地,那个锁的地方太恶心了……
 
    进程间的通讯,还有一个办法,就是通过socket,下一篇文档就大致介绍下perl脚本编写的用于socket通讯的办法。
阅读(1511) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~