Chinaunix首页 | 论坛 | 博客
  • 博客访问: 308632
  • 博文数量: 47
  • 博客积分: 2455
  • 博客等级: 大尉
  • 技术积分: 558
  • 用 户 组: 普通用户
  • 注册时间: 2008-12-25 15:43
文章分类

全部博文(47)

文章存档

2016年(2)

2012年(10)

2011年(13)

2010年(1)

2009年(19)

2008年(2)

分类: 数据库开发技术

2012-03-09 22:35:11

在BerkeleyDB中, 有以下基本概念

1 Environment: 环境, 并不直接作为应用数据的存放容器, 但是为数据访问提供服务. 提供的主要服务有:
  - logging: 日志服务, 记录所有数据修改的操作, 用于事务取消和故障恢复.
  - locking: 锁服务, 为并发访问数据提供控制与支持.
  - memory pool: 缓存服务, 为快速数据访问提供支持.
  - transaction: 事务服务, 使得可以将修改分组为事务, 让修改具有事务特性(原子性, 一致性, 隔离性, 持久性), 同时提供有效的取消操作.

其中logging, locking和memory pool, 是作为单独的子系统存在的, 所以应用程序需要锁,日志或者缓存服务时, 可以直接使用这些功能, 而无需了解庞大的各种数据访问操作.

环境类似于关系数据库系统中的 数据库, 即为快速,可靠的数据访问提供所有的底层支持

在C API中, 环境是用结构DB_ENV来表示的, 其提供的服务和功能是通过该结构的各种函数指针来完成的.

2 Database: 数据库, 用来存放数据的容器, 包含以键/值对形式存在的数据. 在语义上, 相当关系数据库系统中的一个表. 根据数据在磁盘和内存组织方式的不同以及键/值对形式的不同, 数据库提供了5中不同的存取方法:
 
 - DB_BTREE: 数据在内存和磁盘上以变形B+树的形式存放, 键/值都可以是任意的字节串. 特点是, 数据之间表现出空间关联性. 所以用游标顺序遍历时, 数据是按逻辑上的从小到大来返回的.

- DB_HASH: 数据放在不同的hash桶里, hash函数将键值映射为地址. hash函数本身可能不具有逻辑上的含义(比如并不保证会将逻辑上更大的键值映射到地址更高的桶上去). 键/值也都可以是任意的字节串. 但是, 由于数据之间不表现出空间关联性, 用游标遍历时, 是逐个返回各个桶的内容, 而桶内部和桶之间的内容并没有逻辑上大小关系.

- DB_RECNO: 数据存放在Btree中, 键只能为整型的序列号(db_recno_t), 值可以为任意的字节串. 支持定长和不定长的值. 在创建高序列号值时, 低序列对会被隐式创建, 但值实际上为空字节串且不可读取/删除, 除非显式创建.

- DB_HEAP: db-5.2引入, 键为固定的定长结构体(DB_HEAP_RID), 值可以为任意字节串. 与其他不同的在于, 该方式将尽量利用已有空间来存放新数据, 而不像其他方式一样, 会导致数据占用的磁盘空间越来越大, 尽管数据容量在修改过程中无明显变化.

以上4种类型中, 锁的级别都是页级的, 即一旦加锁, 一个数据页上的所有记录都被锁住.

- DB_QUEUE: 类似DB_RECNO, 键只可为整型序列号(db_recno_t). 但不同的是,值为定长字节串. 当高序列号值被创建时, 低序列号值会被自动创建, 而且也是定长字节串, 虽然无法读取与删除. 由于所有数据占用相同长度, 且中间空白自己隐式创建内容, 给出一个序列号, 可以直接算出其值所在的地址, 所以就可以直接将锁加在记录上, 而不是页上. 这样减小了锁的粒度, 提高了并发程度, 当场景适当时, DB_QUEUE可以提供最快的速度和并发能力. 缺点在于自动填充定长记录的方式会导致极大的空间浪费, 如果需要创建的序列号中有很多空隙的话. 所以该方式适用于内容接近定长, 且数据序列号键值有较好连续性的情况.

在C API中, 数据库是用结构DB来表示的, 其提供的服务和功能是通过该结构的各种函数指针来完成的.

3 Cursor: 游标. 就是在数据库上打开的一个记录数据位置且可以用来对数据进行操作的句柄. 由于游标记录数据位置, 所以可以有如下操作:
  获得下一个/上一个, 取最后一个/第一个, 取不同键值的下一个/上一个, 取相同键值的上一个/下一个 等等. 当然, 上一个下一个的含义随数据库本身的数据组织方式不同而不同.

在C API中, 游标是用结构DBC来表示的, 其提供的服务和功能是通过该结构的各种函数指针来完成的.

4 Key/Data Item: 数据项, 用来表示BerkeleyDB中的键或者值, 数据项有两个基本的要素, 数据的起始地址和数据的长度. 在存数据时, 用户需要设置此两要素, BerkeleyDB将根据此要素获得数据存入数据库中. 在读数据时, 默认情况下, 用户提供一个初始化(0化)的数据项即可, BerkeleyDB将分配空间存放返回的数据, 并且设置该两要素, 使得应用可以从数据库中获得数据.

在C API中, 数据项是用结构DBT来表示的, 有两个主要的成员data(void *)和size(u_int32_t).

5 Master: 主控站点, 在BerkeleyDB集群中, 处于主控地位的站点. 只有该站点可以进行更新操作, 且其将更新操作发往其他的受控站点.

在BerkeleyDB中, 主控站点实际上就是一个环境(DB_ENV),只是指定其具有了主控属性(DB_REP_MASTER),并且调用了启动集群的操作(DB_ENV->repmgr_start 或 DB_ENV->rep_start).

6 Replica: 受控站点, 在BerkeleyDB集群中, 处于受控地位的站点. 该站点只能接受读数据的操作, 其更新只有通过主控站点分发才能进行. 当主控站点崩溃时, 受控站点将进行选举, 胜利者将成为新的主控站点, 余下的角色不变.

在BerkeleyDB中, 受控站点也是一个环境(DB_ENV),只是指定其具有了受控属性(DB_REP_CLIENT),并且调用了启动集群的操作(DB_ENV->repmgr_start 或 DB_ENV->rep_start).

7 Sequence: 序列. 提供了一个获得严格递增或递减序列的方式. 该序列在多线程环境下和多线程环境下仍然可以保证值的唯一性.  用户可以指定值的范围, 可以指定是递增还是递减, 还可以指定是否回环(到达最大值后是否继续从头开始)

在C API中, 序列是用结构DB_SEQUENCE来表示的, 其提供功能是通过该结构的各种函数指针来完成的.

8 Transaction: 事务. 是一种组织数据操作的方式, 使得数据修改具有ACID特性. 当用一个事务句柄传给若干个操作时, 这些操作就属于同一个事务. 当其中一个出问题, 事务可以回倒保证这些操作就像完全没发生一样, 以此保证原子性. 事务本身就是一个加锁者, 保证属于本事务的所有操作涉及的对象不会被其他的数据修改甚至读取, 以此保证一致性和独立性. 当事务提交时, 操作修改将被flush到磁盘, 以此保证持久性.

在C API中, 事务是用结构DB_TXN来表示的, 其提供功能是通过该结构的各种函数指针来完成的.

下面的示例程序设计到本文中提到的所有概念.

基本逻辑是:

创建3个环境(environment), 形成一个集群(cluster/replication group). 一个作为主控站点(master), 两个作为受控站点(replica).

在 主控站点创建一个序列(sequence), 且创建一个应用数据库(database), 然后以序列为键, 以一个随机长度的字节串作为值, 将此键值对存入应用数据库, 这些存数据的操作将被封装到事务(transaction)中. 完成所有操作后, 等待一段时间, 使得修改得以到达受控站点.

而后分别打开所有站点的应用数据库, 使用游标(cursor)遍历数据库, 并且比较获得的数据.


点击(此处)折叠或打开

  1. #include "unistd.h"
  2. #include "fcntl.h"
  3. #include <limits.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <dirent.h>
  8. #include <sys/stat.h>
  9. #include "db.h"

  10. #define CHK_RET(ret, str) do { \
  11.     int _ret = (ret); \
  12.     if (_ret) { \
  13.         fprintf(stderr, "Error(%d, %s): %s\n", \
  14.          _ret, (str), db_strerror(_ret)); \
  15.         exit(1); \
  16.     } \
  17. } while(0)

  18. #define TEST_ASSERT(expr) do { \
  19.     if(!(expr)) { \
  20.         printf("Assert Failure(%s:%d): "#expr, __FILE__, __LINE__); \
  21.         exit(1); \
  22.     } \
  23. }while(0)

  24. int test1(int test_cnt);

  25. int main(int argc, char *argv[]) {
  26.     int ret = 0, cnt;

  27.     cnt = 1000;
  28.     if (argc >= 2) {
  29.         cnt = atol(argv[1]);
  30.     }

  31.     ret = test1(cnt);
  32. }

  33. static int open_env(const char *envdir, DB_ENV **envp);
  34. static int setup_site(DB_ENV *dbenv, u_int lport, u_int rport1,
  35.     u_int rport2, int is_master);
  36. static int open_db(DB_ENV *dbenv, const char *fname, const char *dname,
  37.     u_int32_t oflag, DB **dbp);
  38. static int open_sequence(DB *db, DBT *keyp, DB_SEQUENCE **seqp);
  39. static int verify_datadb(DB_ENV *menv, DB_ENV *cenv1, DB_ENV *cenv2,
  40.     const char *fname, const char *dname);

  41. int test1(int test_cnt) {
  42.     DB_ENV *menv, *cenv1, *cenv2, *dbenv;
  43.     DB *seqdb, *datadb;
  44.     const char *mdir, *cdir1, *cdir2;
  45.     const char *seqdbname, *datadbname, *seqkey;
  46.     DB_SITE *site;
  47.     DB_SEQUENCE *seq;
  48.     db_seq_t seqv;
  49.     char buf[100];
  50.     u_int32_t envo_flag, dbo_flag;
  51.     DBT seqdbt, kdbt, ddbt;
  52.     DB_TXN *txn;


  53.     int i, ret;


  54.     mdir = "TESTDIR/masterdir";
  55.     cdir1 = "TESTDIR/clientdir1";
  56.     cdir2 = "TESTDIR/clientdir2";
  57.     seqdbname = "seq.db";
  58.     datadbname = "data.db";
  59.     seqkey = "sequence1";

  60.     for (i = 0; i < sizeof(buf); i++) {
  61.         buf[i] = (char)(i+1);
  62.     }
  63.     srand(time(NULL));


  64.     dbo_flag = DB_THREAD | DB_AUTO_COMMIT;

  65.     /* Open the Environment(s) */
  66.     CHK_RET((ret = open_env(mdir, &menv)), "open_env(mdir)");
  67.     CHK_RET((ret = open_env(cdir1, &cenv1)), "open_env(cdir1)");
  68.     CHK_RET((ret = open_env(cdir2, &cenv2)), "open_env(cdir2)");

  69.     /*
  70.      * Set up the cluster
  71.      * menv is the master
  72.      * cenv1 and cenv2 are replicas
  73.      */
  74.     CHK_RET((ret = setup_site(menv, 30101, 30102, 30103, 1)), "setup_site(menv)");
  75.     CHK_RET((ret = setup_site(cenv1, 30102, 30101, 30103, 0)), "setup_site(cenv1)");
  76.     CHK_RET((ret = setup_site(cenv2, 30103, 30101, 30102, 0)), "setup_site(cenv2)");

  77.     dbenv = menv;

  78.     /* Sequence needs a underlying database, so create and open it */
  79.     CHK_RET((ret = open_db(dbenv, seqdbname, NULL, DB_CREATE, &seqdb)), "open_db(seqdb)");

  80.     /* Create the application database */
  81.     CHK_RET((ret = open_db(dbenv, datadbname, NULL, DB_CREATE, &datadb)), "open_db(datadb)");

  82.     /* Create and opent he sequence */
  83.     memset(&seqdbt, 0, sizeof(DBT));
  84.     seqdbt.data = seqkey;
  85.     seqdbt.size = strlen(seqkey);
  86.     CHK_RET((ret = open_sequence(seqdb, &seqdbt, &seq)), "open_sequence(seqdb)");

  87.     memset(&kdbt, 0, sizeof(DBT));
  88.     kdbt.data = &seqv;
  89.     kdbt.size = sizeof(db_seq_t);
  90.     memset(&ddbt, 0, sizeof(DBT));
  91.     ddbt.data = buf;

  92.     /*
  93.      * Put the data into the application database,
  94.      * The actions are protected by transactions
  95.      */
  96.     txn = NULL;
  97.     CHK_RET((ret = dbenv->txn_begin(dbenv, NULL, &txn, 0)), "dbenv->txn_begin");
  98.     for (i = 0; i < test_cnt; i++) {
  99.         CHK_RET((ret = seq->get(seq, NULL, rand()%5+1, &seqv, 0)), "seq->get");
  100.         if ((i + 1) % 37 == 0) {
  101.             CHK_RET((txn->commit(txn, 0)), "txn->commit");
  102.             CHK_RET((ret = dbenv->txn_begin(dbenv, NULL, &txn, 0)),
  103.              "dbenv->txn_begin");
  104.         }
  105.         ddbt.size = rand() % 99 + 1;
  106.         ret = datadb->put(datadb, txn, &kdbt, &ddbt, 0);
  107.         if (ret != 0) {
  108.             CHK_RET((ret = txn->abort(txn)), "txn->abort");
  109.             CHK_RET((ret = dbenv->txn_begin(dbenv, NULL, &txn, 0)),
  110.              "dbenv->txn_begin");
  111.         }
  112.     }
  113.     CHK_RET((txn->commit(txn, 0)), "txn->commit");

  114.     sleep(test_cnt/50);

  115.     /*
  116.      * Verify the content in the cluster
  117.      * We make sure the items are identical in master and replicas
  118.      */
  119.     CHK_RET((ret = verify_datadb(menv, cenv1, cenv2, datadbname, NULL)),
  120.      "verify_datadb");

  121.     CHK_RET((ret = datadb->close(datadb, 0)), "datadb->close");
  122.     CHK_RET((ret = seq->close(seq, 0)), "seq->close");
  123.     CHK_RET((ret = seqdb->close(seqdb, 0)), "seqdb->close");
  124.     CHK_RET((ret = cenv2->close(cenv2, 0)), "cenv2->close");
  125.     CHK_RET((ret = cenv1->close(cenv1, 0)), "cenv1->close");
  126.     CHK_RET((ret = menv->close(menv, 0)), "menv->close");
  127.     return ret;
  128. }

  129. static int open_env(const char *envdir, DB_ENV **envp) {
  130.     DB_ENV *dbenv = NULL;
  131.     int ret = 0;
  132.     mode_t dirmode = S_IRWXU | S_IRWXG;
  133.     u_int32_t envo_flag;

  134.     envo_flag = DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG |
  135.      DB_INIT_LOCK | DB_INIT_REP | DB_THREAD | DB_CREATE;

  136.     mkdir(envdir, dirmode);
  137.     CHK_RET((ret = db_env_create(&dbenv, 0)), "db_env_create");
  138.     CHK_RET((ret = dbenv->set_cachesize(dbenv, 0, 16*1048576, 1)),
  139.      "dbenv->set_cachesize");
  140.     CHK_RET((ret = dbenv->open(dbenv, envdir, envo_flag, 0644)), "dbenv->open");
  141.     *envp = dbenv;
  142.     return ret;
  143. }

  144. static int setup_site(DB_ENV *dbenv, u_int lport, u_int rport1,
  145.     u_int rport2, int is_master) {
  146.     u_int ports[3];
  147.     int i, ret;
  148.     DB_SITE *site = NULL;
  149.     u_int32_t role;

  150.     ports[0] = lport;
  151.     ports[1] = rport1;
  152.     ports[2] = rport2;

  153.     /* Set up the information for other sites in the cluster */
  154.     for (i = 0; i < 3; i++) {
  155.         CHK_RET((ret = dbenv->repmgr_site(dbenv, "127.0.0.1",
  156.          ports[i], &site, 0)), "dbenv->repmgr_site");
  157.         if (i == 0) {
  158.             CHK_RET((ret = site->set_config(site, DB_LOCAL_SITE, 1)),
  159.              "site->set_config(DB_LOCAL_SITE)");
  160.             if (is_master) {
  161.                 CHK_RET((ret = site->set_config(site, DB_GROUP_CREATOR, 1)),
  162.                  "site->set_config(DB_GROUP_CREATOR");
  163.             }
  164.         } else {
  165.             CHK_RET((ret = site->set_config(site, DB_BOOTSTRAP_HELPER, 1)),
  166.              "site->set_config(DB_BOOTSTRAP_HELPER)");
  167.         }
  168.         CHK_RET((ret = site->close(site)), "site->close");
  169.         site = NULL;
  170.     }

  171.     role = DB_REP_CLIENT;
  172.     if (is_master)
  173.         role = DB_REP_MASTER;

  174.     /* Start the cluster, retry some times if not up in time */
  175.     for (i = 0; i < 10; i++) {
  176.         ret = dbenv->repmgr_start(dbenv, 3, role);
  177.         if (ret != DB_REP_UNAVAIL)
  178.             break;
  179.     }
  180.     CHK_RET(ret, "dbenv->repmgr_start");

  181.     return ret;
  182. }

  183. static int open_db(DB_ENV *dbenv, const char *fname, const char *dname, u_int32_t oflag, DB **dbp) {
  184.     DB *db;
  185.     u_int32_t dbo_flag;
  186.     int ret;

  187.     db = NULL;
  188.     dbo_flag = DB_AUTO_COMMIT | DB_THREAD | oflag;

  189.     CHK_RET((ret = db_create(&db, dbenv, 0)), "db_create(dbenv)");
  190.     CHK_RET((ret = db->open(db, NULL, fname, dname, DB_BTREE, dbo_flag, 0644)),
  191.      "db->open(DB_BTREE)");
  192.     *dbp = db;
  193.     return ret;
  194. }

  195. static int open_sequence(DB *db, DBT *keyp, DB_SEQUENCE **seqp) {
  196.     DB_SEQUENCE *seq;
  197.     int ret;


  198.     CHK_RET((ret = db_sequence_create(&seq, db, 0)), "db_sequence_create");
  199.     /* Increment and allow wrap */
  200.     CHK_RET((ret = seq->set_flags(seq, DB_SEQ_INC | DB_SEQ_WRAP)),
  201.      "seq->set_flags(DB_SEQ_INC|DB_SEQ_WRAP)");
  202.     CHK_RET((ret = seq->set_range(seq, 1, UINT_MAX)), "seq->set_range(1, UINT_MAX)");
  203.     CHK_RET((ret = seq->initial_value(seq, 1)), "seq->initial_value(1)");
  204.     CHK_RET((ret = seq->open(seq, NULL, keyp, DB_CREATE|DB_THREAD)), "seq->open");
  205.     *seqp = seq;
  206.     return ret;
  207. }

  208. static int verify_datadb(DB_ENV *menv, DB_ENV *cenv1, DB_ENV *cenv2,
  209.     const char *fname, const char *dname) {
  210.     DB_ENV *dbenvs[3];
  211.     DB *dbs[3];
  212.     DBC *dbcs[3];
  213.     int ret, i, rets[3];
  214.     DBT kdbts[3], ddbts[3];

  215.     dbenvs[0] = menv;
  216.     dbenvs[1] = cenv1;
  217.     dbenvs[2] = cenv2;
  218.     memset(kdbts, 0, sizeof(kdbts));
  219.     memset(ddbts, 0, sizeof(ddbts));

  220.     for (i = 0; i < 3; i++) {
  221.         CHK_RET((ret = open_db(dbenvs[i], fname, dname, DB_RDONLY, &dbs[i])),
  222.          "open_db(no_create)");
  223.         CHK_RET((ret = dbs[i]->cursor(dbs[i], NULL, &dbcs[i], 0)), "DB->cursor");
  224.     }

  225.     /*
  226.      * Open a cursor per database, and get the data using the cursor, then compare
  227.      * the items. The items should be identical
  228.      */
  229.     while (1) {
  230.         for (i = 0; i < 3; i++) {
  231.             rets[i] = dbcs[i]->get(dbcs[i], &kdbts[i], &ddbts[i], DB_NEXT);
  232.             TEST_ASSERT((rets[i] == 0 || rets[i] == DB_NOTFOUND));
  233.         }
  234.         for (i = 1; i < 3; i++) {
  235.             TEST_ASSERT((rets[i] == rets[0]));
  236.             if (rets[i] == DB_NOTFOUND)
  237.                 continue;
  238.             TEST_ASSERT((kdbts[i].size = kdbts[0].size));
  239.             TEST_ASSERT((memcmp(kdbts[i].data, kdbts[0].data, kdbts[0].size) == 0));
  240.             TEST_ASSERT((ddbts[i].size = ddbts[0].size));
  241.             TEST_ASSERT((memcmp(ddbts[i].data, ddbts[0].data, ddbts[0].size) == 0));
  242.         }
  243.         if (rets[0] == DB_NOTFOUND)
  244.             break;
  245.     }

  246.     for (i = 0; i < 3; i++) {
  247.         CHK_RET((ret = dbcs[i]->close(dbcs[i])), "DBC->close");
  248.         CHK_RET((ret = dbs[i]->close(dbs[i], 0)), "DB->close");
  249.     }

  250.     return ret;
  251. }


阅读(2462) | 评论(1) | 转发(0) |
0

上一篇:BerkeleyDB概述

下一篇:Rudll32.exe

给主人留下些什么吧!~~

tiplip2013-07-11 09:30:15

你好,我看BDB的文档里说,如果是用到了事务,高并发情况下sequence需要设置cache并用DB_AUTO_COMMIT and DB_TXN_NOSYNC flags来get数据,但如果roll back,又不让用cache,我晕了;
另外,我看你sequence->get包裹在txbegin里面,但有没传txn给get,那么会不会出现嵌套事务,因为get内部也会有一个transaction