Chinaunix首页 | 论坛 | 博客
  • 博客访问: 472577
  • 博文数量: 185
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 681
  • 用 户 组: 普通用户
  • 注册时间: 2011-08-06 21:45
个人简介

为梦而战

文章分类

全部博文(185)

文章存档

2016年(3)

2015年(103)

2014年(79)

我的朋友

分类: LINUX

2015-03-24 10:00:17

ovsdb/SPEC文件定义了ovsdb的表规范,创建一个db时,需要预先准备好一个schema文件,该文件是一个JSON格式的字符串,定义了db名字,包含的所有表;每张表都包含一个columns的JSON dict,通过这个schema文件(e.g. vswitchd/vswitch.ovsschema)来创建一个db file。因此ovsdb实际是个文件数据库(为啥不直接用sqlite,还要自己写个)。

ovsdb-server启动时会开放一个json rpc接口给client进行rpc,该rpc一般通过unix domain socket进行。ovsdb-client是访问ovsdb的客户端程序,比较有用的用法包括ovsdb-client dump, 用来查看ovsdb内容, ovsdb-client transact用来执行一条类sql

ovsdb_jsonrpc_server用来处理ovsdb的rpc请求,ovsdb_server是其接口

/* Abstract representation of an OVSDB server not tied to any particular
 * network protocol.  Protocol implementations (e.g. jsonrpc-server.c) embed
 * this in a larger data structure.  */
struct ovsdb_server {
    struct shash dbs;      /* Maps from a db name to a "struct ovsdb *". */
    struct hmap locks;     /* Contains "struct ovsdb_lock"s indexed by name. */
};

ovsdb_server是一个抽象接口,根据不同的网络访问会有不同的实现,e.g. ovsdb_jsonrpc_server。dbs成员是一个hash表,key是db name, value是struct ovsdb,locks也是name<->struct ovsdb_lock的hash表

对于每个client <-> ovsdb的connection,用一个抽象的接口struct ovsdb_session来表示,每个ovsdb_session会要请求一些锁(ovsdb_lock),这些锁被存在叫做waiters的一个hmap中,以ovsdb_lock->name为hash key

/* Abstract representation of an OVSDB client connection, not tied to any
 * particular network protocol.  Protocol implementations
 * (e.g. jsonrpc-server.c) embed this in a larger data structure.  */
struct ovsdb_session {
    struct ovsdb_server *server;
    struct list completions;    /* Completed triggers. */
    struct hmap waiters;        /* "ovsdb_lock_waiter *"s by lock name. */
};

ovsdb_lock是互斥锁,在ovsdb_server中有个hmap专门管理这些ovsdb_lock,在某个ovsdb_lock上等待的ovsdb_lock_waiter会形成一个list

/* A database lock.
 *
 * A lock always has one or more "lock waiters" kept on a list.  The waiter at
 * the head of the list owns the lock. */
struct ovsdb_lock {
    struct hmap_node hmap_node;  /* In ovsdb_server's "locks" hmap. */
    struct ovsdb_server *server; /* The containing server. */
    char *name;                  /* Unique name. */
    struct list waiters;         /* Contains "struct ovsdb_lock_waiter"s. */
};

waiters是等待该锁的session表,被封装成一个struct ovsdb_lock_waiter结构,list的第一个node是ovsdb_lock的当前owner

/* A session's request for a database lock. */
struct ovsdb_lock_waiter {
    struct hmap_node session_node; /* In ->session->locks's hmap. */
    struct ovsdb_lock *lock;    /* The lock being waited for. */

    enum ovsdb_lock_mode mode;
    char *lock_name;

    struct ovsdb_session *session;
    struct list lock_node;      /* In ->lock->waiters's list. */
};

session_node是ovsdb_session->waiters这个hmap结构的hmap_node,lock_node是ovsdb_lock->waiters的list node,session是等待该ovsdb_lock的ovsdb_session,name/mode是该ovsdb_lock的name和访问方式,lock就是该ovsdb_lock_waiter要获取的ovsdb_lock


ovsdb_server_create_lock__,创建一个ovsdb_lock并加到ovsdb_server的锁hmap中

/* Attempts to acquire the lock named 'lock_name' for 'session' within
 * 'server'.  Returns the new lock waiter.
 *
 * If 'mode' is OVSDB_LOCK_STEAL, then the new lock waiter is always the owner
 * of the lock.  '*victimp' receives the session of the previous owner or NULL
 * if the lock was previously unowned.  (If the victim itself originally
 * obtained the lock through a "steal" operation, then this function also
 * removes the victim from the lock's waiting list.)
 *  
 * If 'mode' is OVSDB_LOCK_WAIT, then the new lock waiter is the owner of the
 * lock only if this lock had no existing owner.  '*victimp' is set to NULL. */
struct ovsdb_lock_waiter *
ovsdb_server_lock(struct ovsdb_server *server,
                  struct ovsdb_session *session,
                  const char *lock_name,
                  enum ovsdb_lock_mode mode,
                  struct ovsdb_session **victimp)

{
    uint32_t hash = hash_string(lock_name, 0);
    struct ovsdb_lock_waiter *waiter, *victim;
    struct ovsdb_lock *lock;

    lock = ovsdb_server_create_lock__(server, lock_name, hash);
    victim = (mode == OVSDB_LOCK_STEAL && !list_is_empty(&lock->waiters)
              ? ovsdb_lock_get_owner(lock)
              : NULL);
如果锁的访问模式是OVSDB_LOCK_STEAL,那么当前锁的owner会成为victim,否则victim为空

    waiter = xmalloc(sizeof *waiter);
    waiter->mode = mode;
    waiter->lock_name = xstrdup(lock_name);
    waiter->lock = lock;
    if (mode == OVSDB_LOCK_STEAL) {
        list_push_front(&lock->waiters, &waiter->lock_node);  //OVSDB_LOCK_STEAL可以立刻获得锁

    } else {
        list_push_back(&lock->waiters, &waiter->lock_node);  //否则需要进入lock->waiters等待

    }
    waiter->session = session;
    hmap_insert(&waiter->session->waiters, &waiter->session_node, hash);


    if (victim && victim->mode == OVSDB_LOCK_STEAL) {
        ovsdb_lock_waiter_remove(victim);  //把victime从lock->waiters的list中移除

    }

    *victimp = victim ? victim->session : NULL;
    return waiter;
}


ovsdb_jsonrpc_server是ovsdb_server的一种实现,网络通信采用jsonrpc的方式进行。

struct ovsdb_jsonrpc_server {
    struct ovsdb_server up;
    unsigned int n_sessions, max_sessions;
    struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
};

struct ovsdb_jsonrpc_remote可以表示主动或者被动的connection pool,如果是主动的connection pool,listener为空,所有向外的connection都保存在sessions链表里,如果是被动的connection pool,listener表示接收connection,所有之后accept的connection都保存在sessions链表里。

/* A configured remote.  This is either a passive stream listener plus a list
 * of the currently connected sessions, or a list of exactly one active
 * session. */
struct ovsdb_jsonrpc_remote {
    struct ovsdb_jsonrpc_server *server;
    struct pstream *listener;   /* Listener, if passive. */
    struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
    uint8_t dscp;
};

ovsdb_jsonrpc_session代表了client <-> ovsdb的一个以jsonrpc方式通信的connection

struct ovsdb_jsonrpc_session {
    struct list node;           /* Element in remote's sessions list. */
    struct ovsdb_session up;
    struct ovsdb_jsonrpc_remote *remote;

    /* Triggers. */
    struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
    
    /* Monitors. */
    struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */

    /* Network connectivity. */
    struct jsonrpc_session *js;  /* JSON-RPC session. */
    unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
};  

struct ovsdb {
    struct ovsdb_schema *schema;
    struct list replicas;       /* Contains "struct ovsdb_replica"s. */
    struct shash tables;        /* Contains "struct ovsdb_table *"s. */

    /* Triggers. */
    struct list triggers;       /* Contains "struct ovsdb_trigger"s. */
    bool run_triggers;
};  

OVS的数据库通过struct ovsdb存储在ovsdb_server中,ovsdb_schema里主要是一个name<->ovsdb_table_schema的key-value表,每一个ovsdb_table_schema代表了一个table的schema结构,关于OVS DB table的代码都在ovsdb/table.h ovsdb/table.c里,这里不细说了。


static struct ovsdb_jsonrpc_remote *
ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
                                const char *name,
                                const struct ovsdb_jsonrpc_options *options)
{
    struct ovsdb_jsonrpc_remote *remote;
    struct pstream *listener;
    int error;
    
    error = jsonrpc_pstream_open(name, &listener, options->dscp);
    if (error && error != EAFNOSUPPORT) {
        VLOG_ERR_RL(&rl, "%s: listen failed: %s", name, strerror(error));
        return NULL;
    }

增加listen状态的pstream,如果listen失败则返错

    remote = xmalloc(sizeof *remote);
    remote->server = svr;
    remote->listener = listener;
    list_init(&remote->sessions);
    remote->dscp = options->dscp;
    shash_add(&svr->remotes, name, remote);

这里的remote其实是接收remote过来的connection的数据结构,这步结束后把新建的ovsdb_jsonrpc_remote结构加入到ovsdb_jsonrpc_server里

    if (!listener) {
        ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name));
    }
    return remote;

}


static void
ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
{
    struct ovsdb_jsonrpc_remote *remote = node->data;

    ovsdb_jsonrpc_session_close_all(remote);   这里用来close所遇remote->sessions里的connections
    pstream_close(remote->listener);   这里close remote->listener连接
    shash_delete(&remote->server->remotes, node);   把remote从ovsdb_jsonrpc_server的remotes哈希表里删除
    free(remote);
}


ovsdb_jsonrpc_server_reconnect,对ovsdb_jsonrpc_server的所有ovsdb_jsonrpc_remote,调用ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote* remote)

static void
ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
{   
    struct ovsdb_jsonrpc_session *s, *next;

    LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
        jsonrpc_session_force_reconnect(s->js);
        if (!jsonrpc_session_is_alive(s->js)) {
            ovsdb_jsonrpc_session_close(s);
        }
    }
}   

jsonrpc_session_force_reconnect会调用reconnect_force_reconnect,更改其状态为S_RECONNECT,然后对所有active的session调用ovsdb_jsonrpc_session_close关闭


void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
{
    struct shash_node *node;
    
    SHASH_FOR_EACH (node, &svr->remotes) {
        struct ovsdb_jsonrpc_remote *remote = node->data;
        
        if (remote->listener && svr->n_sessions < svr->max_sessions) {
            struct stream *stream;
            int error;
 
            error = pstream_accept(remote->listener, &stream);   如果是被动remote,那么尝试accept新connection进来
            if (!error) {
                struct jsonrpc_session *js;
                js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
                                                     remote->dscp);   创建一个jsonrpc_session
                ovsdb_jsonrpc_session_create(remote, js);   把jsonrpc_session加入到ovsdb_jsonrpc_remote中
            } else if (error != EAGAIN) {
                VLOG_WARN_RL(&rl, "%s: accept failed: %s",
                             pstream_get_name(remote->listener),
                             strerror(error));
            }
        }

        ovsdb_jsonrpc_session_run_all(remote);  对remote里的所有session调用ovsdb_jsonrpc_session_run,如果报错则ovsdb_jsonrpc_session_close关闭session
    }
}

ovsdb_jsonrpc_session_run调用jsonrpc_session_run处理pending connection/request,如果jsonrpc_session没有请求,调用jsonrpc_session_recv尝试接收并处理


--------------------------------  下面来分析ovsdb-server的主程序  -----------------------------------


先调用ovsdb_file_open,从数据库文件生成struct db这个数据库结构,

struct db {
    /* Initialized in main(). */
    char *filename;
    struct ovsdb_file *file;
    struct ovsdb *db;
    
    /* Only used by update_remote_status(). */
    struct ovsdb_txn *txn;
};  

之后调用ovsdb_jsonrpc_server_create创建struct ovsdb_jsonrpc_server结构,然后调用ovsdb_jsonrpc_server_add_db把struct ovsdb*加入到ovsdb_server的数据库的哈希表结构中。

下面是基于数据库的内容配置struct ovsdb_jsonrpc_server* jsonrpc,这部分工作通过reconfigure_from_db完成,里面主要配置remote connection和SSL

下面进入主循环,

    while (!exiting) {
        int i;

....

        reconfigure_from_db(jsonrpc, dbs, n_dbs, &remotes);
        ovsdb_jsonrpc_server_run(jsonrpc);
        unixctl_server_run(unixctl);
这里的核心是ovsdb_jsonrpc_server_run,其主要做两件事:a) 调用accept接收所有pending的connection请求  b) 对已有的所有jsonrpc_session,调用ovsdb_jsonrpc_session_run,处理已有connection进来的request/notify。对于request,调用ovsdb_jsonrpc_session_got_request处理,该函数根据request不同的method进行处理,其中'transact'是用得最多的方法

....

        for (i = 0; i < n_dbs; i++) {
            ovsdb_trigger_run(dbs[i].db, time_msec());
        }
        if (run_process && process_exited(run_process)) {
            exiting = true;
        }

.....

最后是一系列的阻塞调用,等待新的connection或者request进来,所有fd都在poll_block里用一个poll函数阻塞

        ovsdb_jsonrpc_server_wait(jsonrpc);
        unixctl_server_wait(unixctl);
        for (i = 0; i < n_dbs; i++) {
            ovsdb_trigger_wait(dbs[i].db, time_msec());
        }
        if (run_process) {
            process_wait(run_process);
        }
        if (exiting) {
            poll_immediate_wake();
        }
        poll_timer_wait_until(status_timer);
        poll_block();
    }

阅读(2742) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~