Chinaunix首页 | 论坛 | 博客
  • 博客访问: 462238
  • 博文数量: 185
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 681
  • 用 户 组: 普通用户
  • 注册时间: 2011-08-06 21:45
个人简介

为梦而战

文章分类

全部博文(185)

文章存档

2016年(3)

2015年(103)

2014年(79)

我的朋友

分类: LINUX

2015-03-24 11:24:57

stream


struct pstream {
    const struct pstream_class *class;
    char *name;
};

pstream_class是一个类似的接口类,其实现根据底层socket的不同(unix domain socket, tcp socket, ssl socket)而不同,p表示passive,其接口定义如下,

struct pstream_class {
    /* Prefix for connection names, e.g. "ptcp", "pssl", "punix". */
    const char *name;

    /* True if this pstream needs periodic probes to verify connectivty.  For
     * pstreams which need probes, it can take a long time to notice the
     * connection was dropped. */
    bool needs_probes;

    /* Attempts to start listening for stream connections.  'name' is the full
     * connection name provided by the user, e.g. "ptcp:1234".  This name is
     * useful for error messages but must not be modified.
     *
     * 'suffix' is a copy of 'name' following the colon and may be modified.
     * 'dscp' is the DSCP value that the new connection should use in the IP
     * packets it sends.
     *
     * Returns 0 if successful, otherwise a positive errno value.  If
     * successful, stores a pointer to the new connection in '*pstreamp'.
     *
     * The listen function must not block.  If the connection cannot be
     * completed immediately, it should return EAGAIN (not EINPROGRESS, as
     * returned by the connect system call) and continue the connection in the
     * background. */
    int (*listen)(const char *name, char *suffix, struct pstream **pstreamp,
                  uint8_t dscp);

    /* Closes 'pstream' and frees associated memory. */
    void (*close)(struct pstream *pstream);

   /* Tries to accept a new connection on 'pstream'.  If successful, stores
     * the new connection in '*new_streamp' and returns 0.  Otherwise, returns
     * a positive errno value.
     *
     * The accept function must not block waiting for a connection.  If no
     * connection is ready to be accepted, it should return EAGAIN. */
    int (*accept)(struct pstream *pstream, struct stream **new_streamp);

    /* Arranges for the poll loop to wake up when a connection is ready to be
     * accepted on 'pstream'. */
    void (*wait)(struct pstream *pstream);
};

可以看出pstream_class作为server端的stream socket封装,其接口包括了listen, accept, wait, close


struct stream {
    const struct stream_class *class;
    int state;
    int error;
    ovs_be32 remote_ip;
    ovs_be16 remote_port;
    ovs_be32 local_ip;
    ovs_be16 local_port;
    char *name;
};

类似的stream_class作为active stream socket的接口类,接口包括了open, close, recv, send, wait等,请参考lib/stream-provider.h


总结下来,就是pstream是被动的流,只用来listen, accept新的连接,stream是用来读写交换数据的主动流,无论是pstream, stream,都用到了bridge模式,通过pstream_class, stream_class来做接口定义的操作。我们来看几个典型的操作。


stream_connect,判断当前流的状态,如果还是CONNECTING,那么调用stream_class->connect判断连接结束没有,如果是CONNECTED,那么返回0

int stream_connect(struct stream *stream)
{
    enum stream_state last_state;

    do {
        last_state = stream->state;
        switch (stream->state) {
        case SCS_CONNECTING:
            scs_connecting(stream);
            break;

        case SCS_CONNECTED:
            return 0;
            
        case SCS_DISCONNECTED:
            return stream->error;

        default:
            NOT_REACHED();
        }
    } while (stream->state != last_state);

    return EAGAIN;
}


stream_wait,用来等待stream socket available,从而开始wait指定的操作,对于STREAM_SEND/STREAM_RECV而言,如果此时stream还是CONNECTING,会等到变为CONNECTED

void
stream_wait(struct stream *stream, enum stream_wait_type wait)
{
    assert(wait == STREAM_CONNECT || wait == STREAM_RECV
           || wait == STREAM_SEND);

    switch (stream->state) {
    case SCS_CONNECTING:
        wait = STREAM_CONNECT;
        break;

    case SCS_DISCONNECTED:
        poll_immediate_wake(); 
        return;
    }   
    (stream->class->wait)(stream, wait);
}   

stream_wait用到了poll函数来做event demultiplex,包括相应的struct pollfd,由于没有大量的并发连接,所以性能还不是问题


进程的poll机制如下,会有一个全局的poll_waiters list,每次有fd需要wait某些events时,会构造一个poll_waiter并挂到全局poll_waiters list中。同样可以通过poll_cancel来把一个poll_waiter从全局list里面去掉

void poll_block(void)
{   
    static struct pollfd *pollfds;
    static size_t max_pollfds;

    struct poll_waiter *pw, *next;
    int n_waiters, n_pollfds;
    int elapsed;
    int retval;

    /* Register fatal signal events before actually doing any real work for
     * poll_block. */
    fatal_signal_wait();

    n_waiters = list_size(&waiters);
    if (max_pollfds < n_waiters) {
        max_pollfds = n_waiters;
        pollfds = xrealloc(pollfds, max_pollfds * sizeof *pollfds);
    }

    n_pollfds = 0;
    LIST_FOR_EACH (pw, node, &waiters) {
        pw->pollfd = &pollfds[n_pollfds];
        pollfds[n_pollfds].fd = pw->fd;
        pollfds[n_pollfds].events = pw->events;
        pollfds[n_pollfds].revents = 0;
        n_pollfds++;
    }   

    if (timeout_when == LLONG_MIN) {
        COVERAGE_INC(poll_zero_timeout);
    }

到此,把全局poll_waiter的所有poll_waiter,填到一个struct pollfd的数组pollfds中,

    retval = time_poll(pollfds, n_pollfds, timeout_when, &elapsed);
    if (retval < 0) {
        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
        VLOG_ERR_RL(&rl, "poll: %s", strerror(-retval));
    } else if (!retval) {
        log_wakeup(timeout_where, NULL, elapsed);
    }
time_poll是一个event demultiplexer,我们后面会分析

    LIST_FOR_EACH_SAFE (pw, next, node, &waiters) {
        if (pw->pollfd->revents) {
            log_wakeup(pw->where, pw->pollfd, 0);
        }
        poll_cancel(pw);
    }

    timeout_when = LLONG_MAX;
    timeout_where = NULL;

    /* Handle any pending signals before doing anything else. */
    fatal_signal_run();
}

time_poll实际上在一个循环里反复调用poll,直到给的时间片用完为止


下面我们拿unix domain socket, tcp socket为例来stream这块代码的分析

对TCP而言,ptcp_open是pstream_class的listen函数,

static int ptcp_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp, uint8_t dscp) {
    struct sockaddr_in sin;
    char bound_name[128];
    int fd;
    
    fd = inet_open_passive(SOCK_STREAM, suffix, -1, &sin, dscp);
    if (fd < 0) { 
        return -fd; 
    }

    sprintf(bound_name, "ptcp:%"PRIu16":"IP_FMT,
            ntohs(sin.sin_port), IP_ARGS(&sin.sin_addr.s_addr));
    return new_fd_pstream(bound_name, fd, ptcp_accept, NULL, pstreamp);
}   

inet_open_passive主要调用socket, setsockopt, bind等创建并配置好socket,new_fd_pstream创建并配置一个fd_pstream结构,这个结构包括了之前提到的struct pstream结构:

struct fd_pstream {   
    struct pstream pstream;
    int fd;
    int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
                     struct stream **);
    char *unlink_path;
};


new_fd_pstream里,原来pstream里的pstream_class被换成了fd_pstream_class,这样原来三个为NULL的函数指针变成了pfd_close, pfd_accept, pfd_wait

pfd_accept会调用accept接收connect过来的fd,然后调用accept_cb这个callback函数,对于TCP而言,这是ptcp_accept

ptcp_accept会基于这个accept的fd,通过new_tcp_stream把connection封装成一个stream,里面调用setsockopt设置了TCP_NODELAY,之后调用new_fd_stream,同样的原来stream里的stream_class换成了stream_fd_class,原来为NULL的函数指针变成了fd_close, fd_connect, fd_recv, fd_send, fd_wait

fd_send, fd_recv都是调用read, write来读写缓冲区,我们来看fd_wait,

static void fd_wait(struct stream *stream, enum stream_wait_type wait)
{
    struct stream_fd *s = stream_fd_cast(stream);
    switch (wait) {
    case STREAM_CONNECT:
    case STREAM_SEND:
        poll_fd_wait(s->fd, POLLOUT);
        break;

    case STREAM_RECV:
        poll_fd_wait(s->fd, POLLIN);
        break;

    default:
        NOT_REACHED();
    }
}

可以看出fd_wait只是把fd挂到了poll_waiter全局list里,之后的事情都交给poll_block去做


对于unix domain socket而言,pstream_class为punix_pstream_class,其listen函数为punix_open。

punix_open首先调用make_unix_socket,传入bind的path,该函数会创建unix domain socket,调用make_sockaddr_un配置好struct sockaddr_un,调用bind_unix_socket把unix domain socket绑定到bind path上的socket文件上;punix_open之后调用listen准备接受connection请求;最后调用new_fd_pstream;和TCP一样,此时已经不区分pstream_class属于TCP还是unix domain socket了,而是统一用fd_pstream_class来表示,相关的函数指针为pfd_close, pfd_accept, pfd_wait


unixctl


对于vswitchd, ovsdb这类的daemon而言,会包含一个unixctl_server,用来接收控制流

struct unixctl_server {
    struct pstream *listener;
    struct list conns;
};

unixctl_server_create:传入的path是server的unix domain socket所在的socket文件路径,之后调用pstream_open,执行了punix_open,并把punix_pstream_class的指针通过listener传出来,此时daemon已经调用完了listen,可以接收client的连接请求了。

struct unixctl_conn {
    struct list node;
    struct jsonrpc *rpc;
 
    /* Only one request can be in progress at a time.  While the request is
     * being processed, 'request_id' is populated, otherwise it is null. */
    struct json *request_id;   /* ID of the currently active request. */
};          

unixclt_server_run:调用pstream_accept接收connection请求,实际上调用pfd_accept,如果有新的connection,加入unixctl_server的conns连接list中,并初始化好connection对应的jsonrpc。之后对conns的每个请求调用run_connection

unixctl_server_wait:先把listen放入poll_waiter list,之后是有数据要发送的connection,最后是接收数据的connection

run_connection:先调用jsonrpc_run把缓冲区要发送的数据发送完毕,之后调用jsonrpc_recv接收一个jsonrpc_msg,unixctl_server只处理JSON REQUEST,对请求调用process_command。对于每个command而言,其必须先调用unixctl_command_register注册之后才可以被执行,process_command基于command method在hash表里查找到这个command的配置,最后调用command注册的callback函数执行


process

process.c是进程对象的utility实现,

struct process {
    struct list node;
    char *name;
    pid_t pid;

    /* Modified by signal handler. */
    volatile bool exited;
    volatile int status;
};

process_init:设置非阻塞管道xpipe_nonblocking(fds),设置SIGCHLD的信号处理函数sigchld_handler。struct list all_processes是该进程所有子进程的list,sigchld_handler对每个子进程调用waitpid,得到子进程退出返回的返回码和状态,并存到struct process的exited, status成员变量中

process_start:先调用process_prestart,该函数又会调用process_init,除此之外,还会验证进程的binary是否存在。 之后fork子进程,对于父进程,fork成功之后,把自己注册到all_processes的list上,回复之前被block的SIGCHLD信号就退出了;对于子进程,主要调用execvp切换成真正的binary进程

process_destroy:把struct process* p从all_processes的list中移除

process_wait:如果进程要退出(p->exited),调用poll_immediate_wake,唤醒waiters上的所有fd;否则阻塞在poll_block上

process_run:可以看作process_start >> process->wait >> poll->block >> process->wait >> poll->block的循环 


daemon

daemon的启动参数包括了是否detach, 是否chdir,是否set_pidfile,是否忽略之前的pidfile,是否启动monitor进程监控该daemon

/* If configured with set_pidfile() or set_detach(), creates the pid file and
 * detaches from the foreground session.  */
void 
daemonize(void)
{
    daemonize_start();
    daemonize_complete();
}

先来看daemonize_start

/* If daemonization is configured, then starts daemonization, by forking and
 * returning in the child process.  The parent process hangs around until the
 * child lets it know either that it completed startup successfully (by calling
 * daemon_complete()) or that it failed to start up (by exiting with a nonzero
 * exit code). */
void
daemonize_start(void)
{
    daemonize_fd = -1;

    if (detach) {
        if (fork_and_wait_for_startup(&daemonize_fd) > 0) {
            /* Running in parent process. */
            exit(0);
        }
        /* Running in daemon or monitor process. */
    }

fork_and_wait_for_startup,调用fork_and_clean_up,该函数是fork的一个封装,父进程在fork之后取消之前的fatal_signal注册的callback函数(主要是进程退出时删除pid文件用的),子进程则在fork之后取消之前父进程的lockfile, timer等。OVS的父子进程之前有个管道机制,子进程启动成功之后向管道写一个字符,父进程收到之后认为子进程启动成功从而进入waitpid等待其退出

如果是detach模式,fork_and_wait_for_startup返回之后父进程就退出

    if (monitor) {
        int saved_daemonize_fd = daemonize_fd;
        pid_t daemon_pid;

        daemon_pid = fork_and_wait_for_startup(&daemonize_fd);
        if (daemon_pid > 0) {
            /* Running in monitor process. */
            fork_notify_startup(saved_daemonize_fd); 
            close_standard_fds();
            monitor_daemon(daemon_pid);
        } 
        /* Running in daemon process. */
    }   

如果是monitor模式,意味有一个monitor进程来监控daemon,此时monitor进程成为daemon的父进程,monitor调用fork_notify_startup通过管道向子进程写一个字符,通知子进程开始run,之后调用monitor_daemon开始监控

monitor_daemon在一个死循环里调用waitpid daemon_pid,一旦发现daemon进程异常退出,会尝试重启daemon:如果退出状态是WCOREDUMP(status),调用setrlimit(RLIMIT_CORE)关闭coredump,以节省硬盘空间; 如果重启频率超出了throttle,sleep一小段时间;最后调用fork_and_wait_for_startup才启动daemon进程

判断异常退出的方法是通过waitpid返回的status,如果WIFSIGNALED(status)为true,说明是信号导致的进程退出,这时通过WTERMSIG(status)得到退出前收到的信号,如果属于{SIGABRT, SIGALRM, SIGBUS, SIGFPE, SIGILL, SIGPIPE, SIGSEGV等信号,则需要重新启动daemon

    
    if (pidfile) {
        make_pidfile();                
    }                    

    /* Make sure that the unixctl commands for vlog get registered in a
     * daemon, even before the first log message. */
    vlog_init();    
}   

最后子进程生成pidfile,初始化vlog


/* If daemonization is configured, then this function notifies the parent
 * process that the child process has completed startup successfully.  It also
 * call daemonize_post_detach().
 *
 * Calling this function more than once has no additional effect. */
void
daemonize_complete(void)
{
    if (!detached) {
        detached = true;

        fork_notify_startup(daemonize_fd);
        daemonize_fd = -1;
        daemonize_post_detach();
    }
}

fork_notify_startup通知父进程启动成功,此时父进程进入waitpid。daemonize_post_detach则做unix daemon创建例行的事情

void
daemonize_post_detach(void)
{
    if (detach) {
        setsid();
        if (chdir_) {
            ignore(chdir("/"));
        }
        close_standard_fds();
    }
}


jsonrpc

/* Messages. */
enum jsonrpc_msg_type {
    JSONRPC_REQUEST,           /* Request. */
    JSONRPC_NOTIFY,            /* Notification. */
    JSONRPC_REPLY,             /* Successful reply. */
    JSONRPC_ERROR              /* Error reply. */
};
struct jsonrpc_msg {
    enum jsonrpc_msg_type type;
    char *method;               /* Request or notification only. */
    struct json *params;        /* Request or notification only. */
    struct json *result;        /* Successful reply only. */
    struct json *error;         /* Error reply only. */
    struct json *id;            /* Request or reply only. */
};

struct jsonrpc {
    struct stream *stream;
    char *name;
    int status;

    /* Input. */
    struct byteq input;
    struct json_parser *parser;
    struct jsonrpc_msg *received;

    /* Output. */
    struct list output;         /* Contains "struct ofpbuf"s. */
    size_t backlog;
};

jsonrpc的行为基于stream, pstream,再加上json parsing,如果你对前面的stream, json都比较了解,这部分内容还是很简单的

stream_open_with_default_ports, pstream_open_with_default_ports都通过stream_open, pstream_open来实现

jsonrpc_run,查看jsonrpc->output,把output队列的消息通过stream_send发送出去

jsonrpc_wait,等待可以发送

jsonrpc_send,调用jsonrpc_msg_to_json把jsonrpc_msg转化成json,调用json_to_string把json转成string,创建一个基于该string的ofpbuf,并挂到rpc->output的链表上,最后调用jsonrpc_run发送

jsonrpc_recv,jsonrpc用一个环形的buffer来收数据,每次recv首先计算环形buffer还有多少空间,然后尽最大可能收数据;如果此时buffer还有数据,那么调用json_parser_feed解析数据,如果parse完成(parser->done),调用jsonrpc_received把json转成jsonrpc_msg

jsonrpc_transact_block,首先调用jsonrpc_send_block把一个jsonrpc_msg request发送出去,在调用jsonrpc_recv_block等待一个reply


jsonrpc_create,创建不同类型的jsonrpc_msg

jsonrpc_msg_from_json, jsonrpc_msg_to_json,json和jsonrpc_msg之间互相转换


jsonrpc_session代表一个RPC connection

struct jsonrpc_session {
    struct reconnect *reconnect;
    struct jsonrpc *rpc;
    struct stream *stream;
    struct pstream *pstream;
    unsigned int seqno;
    uint8_t dscp;
};


这里出现了一个struct reconnect结构,就是一些配置,状态,统计信息的集合

struct reconnect {
    /* Configuration. */
    char *name;
    int min_backoff;
    int max_backoff;
    int probe_interval;
    bool passive;
    enum vlog_level info;       /* Used for informational messages. */

    /* State. */
    enum state state;
    long long int state_entered;
    int backoff;
    long long int last_activity;
    long long int last_connected;
    long long int last_disconnected;
    unsigned int max_tries;

    /* These values are simply for statistics reporting, not otherwise used
     * directly by anything internal. */
    long long int creation_time;
    unsigned int n_attempted_connections, n_successful_connections;
    unsigned int total_connected_duration;
    unsigned int seqno;
};

connection的状态有 S_VOID, S_BACKOFF, S_CONNECTING, S_ACTIVE, S_IDLE, S_RECONNECT, S_LISTENING,其中S_ACTIVE, S_IDLE表示已经连接的状态。

back_off表示在尝试重新连接时,连续两次尝试之间间隔的时间

reconnect_transition__,是一个状态转换的自动机,代码如下

static void reconnect_transition__(struct reconnect *fsm, long long int now,
                       enum state state)
{
    if (fsm->state == S_CONNECTING) {
        fsm->n_attempted_connections++;
        if (state == S_ACTIVE) {
            fsm->n_successful_connections++;
        }
    }


    if (is_connected_state(fsm->state) != is_connected_state(state)) {
        if (is_connected_state(fsm->state)) {
            fsm->total_connected_duration += now - fsm->last_connected;
        }
        fsm->seqno++;
    }

    VLOG_DBG("%s: entering %s", fsm->name, reconnect_state_name__(state));
    fsm->state = state;
    fsm->state_entered = now;
}


jsonrpc_session_open,初始化jsonrpc_session结构,解析name,如果是类似tcp:127.1.2.3,表示是一个active connection,如果是ptcp:127.1.2.3,表示是一个passive connection,用来listen新的connection。如果是passive connection,会调用reconnect_set_passive。最后调用reconnect_set_probe_interval,设置keepalive

jsonrpc_session_close,依次调用jsonrpc_close, reconnect_destroy, stream_close, pstream_close

jsonrpc_session_connect,如果s->reconnect是主动连接,那么调用jsonrpc_stream_open连接connection,并把状态变为S_CONNECTING;否则如果s->pstream为空,那么调用jsonrpc_pstream_open开启listen,同时把状态变为S_LISTENING

static void jsonrpc_session_connect(struct jsonrpc_session *s)
{   
    const char *name = reconnect_get_name(s->reconnect);
    int error;

    jsonrpc_session_disconnect(s);
    if (!reconnect_is_passive(s->reconnect)) {
        error = jsonrpc_stream_open(name, &s->stream, s->dscp);
        if (!error) {
            reconnect_connecting(s->reconnect, time_msec());
        }  
    } else {
        error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
                                                      s->dscp);
        if (!error) {
            reconnect_listening(s->reconnect, time_msec());
        }
    }   

    if (error) {
        reconnect_connect_failed(s->reconnect, time_msec(), error);
    }
    s->seqno++;
}


jsonrpc_session_disconnect,close已有s->stream, s->rpc结构

jsonrpc_session_run,首先,如果s->pstream不为空,调用pstream_accept接收新的connection,如果此时jsonrpc_session已经有了connection,那么用新的替换老的

        error = pstream_accept(s->pstream, &stream);
        if (!error) {
            if (s->rpc || s->stream) {
                VLOG_INFO_RL(&rl,
                             "%s: new connection replacing active connection",
                             reconnect_get_name(s->reconnect));
                jsonrpc_session_disconnect(s);
            }
            reconnect_connected(s->reconnect, time_msec());
            s->rpc = jsonrpc_open(stream);
        }

如果s->rpc不为空,会把数据发送出去,否则重新尝试连接s->stream。

        stream_run(s->stream);
        error = stream_connect(s->stream);
        if (!error) {
            reconnect_connected(s->reconnect, time_msec());
            s->rpc = jsonrpc_open(s->stream);
            s->stream = NULL;
        } else if (error != EAGAIN) {
            reconnect_connect_failed(s->reconnect, time_msec(), error);
            stream_close(s->stream);
            s->stream = NULL;
        }


jsonrpc_session_send,调用jsonrpc_send发送数据

jsonrpc_session_recv,调用jsonrpc_recv接收一个jsonrpc_msg,如果不是echo之类的测试报文,返回这个jsonrpc_msg

阅读(851) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~