Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1940275
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2017-01-24 09:53:55

本文主要介绍AsyncMessenger的代码框架结构和主要使用到的

这里写图片描述

上图表示Ceph的AsyncMessenger模块中各个关键类之间的联系。在AsyncMessenger模块中用到的类主要有14个,下面逐一介绍每个类的作用,以及其中包含的主要成员变量和方法。


1、 AsyncMessenger类、SimplePolicyMessenger类和Messenger类

AsyncMessenger类、SimplePolicyMessenger类和Messenger类三者是继承与被继承的关系,Messenger是一个抽象的消息管理器,其主要接口在派生类AsyncMessenger中实现,SimplePolicyMessenger类则是对消息管理器的一些连接的策略进行定义的设置,AsyncMessenger中定义和实现了消息管理器的相关成员变量以及方法。

一个AsyncMessenger实例的关键成员变量以及类方法如下表所示(包括该类继承的父类成员变量以及类方法)。AsyncMessenger包含一个WorkerPool对象、一个Processor实例,以及3个AsyncConnectionRef对象列表和1个ConnectionRef对象列表。


AsyncMessenger类中的成员变量:

成员变量名 返回值类型 描述
*pool WorkerPool 通过pool->get_worker()从线程池中获取工作线程来进行工作
processor Processor Processor实例,主要用来监听连接,绑定socket,接受连接请求等,相当于AsyncMessenger的处理中心
listen_sd int 定义的监听套接字
conns ceph::unordered_map(entity_addr_t, AsyncConnectionRef) 地址和连接的map表,创建一个新的连接时将连接和和地址信息加入到这个map表中,在发送消息时先根据地址对这个map进行一次查找,如果找到了返回连接,如果没有找到创建一个新的连接。
accepting_conns set(AsyncConnectionRef) 接收连接的集合,这个集合主要存放那些已经接收的连接。
deleted_conns set(AsyncConnectionRef) 已经关闭并且需要清理的连接的集合
local_connection ConnectionRef 本地连接的计数器
did_bind bool 初始值为false,绑定地址后置为true,stop的时候再次置为false

AsynsMessenger类中的成员方法:

成员方法名 返回值类型 描述
bind (const entity_addr_t& bind_addr) int 绑定套接字,具体绑定过程是由Processor的bind()函数完成的
start() int 注册一个AsyncMessenger的实例后,启动这个实例,具体执行过程是WokerPool的start()函数完成的。
wait() void 等待停止的信号,如果收到停止的信息后,调用Processor的stop()函数,然后将did_bind置为false,最后删除建立的连接
send_message (Message *m, const entity_inst_t& dest) int 加了一个锁,然后调用_send_message(m, dest),将消息发送到目的地址
get_connection (const entity_inst_t& dest) ConnectionRef 函数用来建立连接,判定是否为本地连接,否则再继续查找连接是否已经存在,如果不存在再创建一个连接
ready() void 注册的AsyncMessenger已经准备好了,启动事件处理中心,开始工作,启动工作线程
create_connect(const entity_addr_t& addr, int type) AsyncConnectionRef create一个连接并将其加到conns中
submit_message(Message *m, AsyncConnectionRef con,const entity_addr_t& dest_addr, int dest_type) void 发送消息的时候会用到,根据目的地址判断需要发送消息的连接是否存在,以及连接是否是本地连接,如果是本地连接,直接对消息进行dispatch,如果连接不存在,需要根据消息类型创建新的连接
_send_message(Message *m, const entity_inst_t& dest) int 从连接中查找目的地址,然后调用submit_message()发送消息
add_accept(int sd) AsyncConnectionRef 新建一个连接,然后将其加入到accepting_conns中

2、 Processor类、WorkerPool类和Worker类


  • Processor类相当于AsyncMessenger模块中的一个处理器,AsyncMessenger需要完成的很多操作(start、ready、bind等)都是通过Processor来完成的。当Messenger完成地址绑定后,Processor启动,然后监听即将到来的连接。就是说AsyncMessenger模块的一些启动、绑定、就绪等操作是在Processor相应操作的基础上封装的。

  • Processor类定义了一个AsyncMessenger的对象,一个NetHandler的实例,一个Worker对象。

Processor类的成员变量(方法):

成员变量(方法)名 返回值类型 描述
*msgr AsyncMessenger AsyncMessenger的指针实例,用于调用AsyncMessenger中的成员变量(方法),用的最多的还是绑定时获取的地址信息等。
net NetHandler 绑定套接字后将其设置为非阻塞,然后这是套接字选项。
*worker Worker 工作线程
listen_sd int 获取套接口描述字的值,非负表示套接字创建成功,-1表示出错
nonce uint64_t 构造函数中用于entity_addr_t的唯一标识ID
bind(const entity_addr_t &bind_addr, const set& avoid_ports) int 执行绑定套接字的具体过程
start(Worker *w) int 执行消息模块的start,具体就是启动线程,让其处于工作状态
accept() void 建立连接的过程,如果连接建立成功,则通过add_accept()函数将连接加入到accepting_conns集合中
stop() void 关闭套接字
rebind(const set& avoid_port) int 如果第一次没有绑定成功或者其它原因导致的绑定失败,执行重新绑定

  • WorkerPool类是一个线程池,主要作用是创建worker线程,然后将其放入自己的worker容器中,每次创建worker线程的时候根据配置文件的参数ms_async_op_threads来指定worker线程的数量,创建是在WorkerPool的构造函数中进行的。

  • 在WorkerPool类中定义了一个worker集合,用于存放worker线程,还定义了一个coreids,用户存放cpu id的集合,提供指定cpu运行单个线程的作用。在配置文件中有一个参数是ms_async_affinity_cores,将创建的worker绑定到指定的cpu core上。如果创建了2个线程,绑定的cpu core是0、1,默认的ms_async_affinity_cores值是空的,即使用全部的cpu资源,如果cpu的资源不够用的时候可以将worker指定cpu core。

WorkerPool类的成员变量(方法)

成员变量(方法)名 返回值类型 描述
coreids vector 用于存放CPU id的集合
WorkerPool(CephContext *c) 构造函数 WorkerPool的构造函数,根据ms_async_op_threads的值创建相应数量的worker线程,同时完成worker和cpu core的绑定。
start() void 创建worker集合中的worker线程,启动线程开始工作
*get_worker() Worker 获取worker集合中的worker线程
get_cpuid(int id) int 获取cpu的id
workers Worker* Worker线程的集合,WorkerPool在构造函数中创建的worker线程放入到这个集合中

  • Worker类是具体的工作线程,Worker线程的主要工作是一个循环,调用epoll_wait获取需要处理的事件,用循环来处理这个事件,当外部有操作时,比如读取消息,注册一个回调类,创建一个文件事件,然后启动回调操作即可处理请求。消息模块启动时,用一个线程在Worker类中定义了一个WorkerPool对象,一个EventCenter的实例。

Worker类的成员变量(方法)

成员变量(方法)名 返回值类型 描述
*pool WorkerPool WorkerPool的实例,在entry()函数中用于获取cpu的id
done bool 如果线程的工作完成置为true,否则false
center EventCenter EventCenter的实例,在Worker的构造函数中执行EventCenter的初始化工作
*entry() void 工作线程的入口函数,启动一个while循环来执行事件的处理,在整个消息模块中就使用了这一个工作线程
stop() void 将done置为true,然后调用EventCenter的wakeup函数,即停止socket工作

3、 AsyncConnection类

  • AsyncConnection是整个Async消息模块的核心,连接的创建和删除、数据的读写指令、连接的重建、消息的处理等都是在这个类中进行的。本小节重点分析了其中的重要成员变量和24个成员函数。

    AsyncConnection类的成员变量

成员变量名 返回值类型 描述
*async_msgr AsyncMessenger AsyncMessenger对象,调用一些环境变量等
out_q map(int, list(pair(bufferlist, Message*)) ) 存放消息和消息map信息的数据结构
sent list(Message*) 存放那些需要发送的消息
local_messages list(Message*) 存放本地传输的消息
outcoming_bl bufferlist 临时存放消息的bl
read_handler EventCallbackRef 处理读请求的回调指令
write_handler EventCallbackRef 处理写请求的回调指令
connect_handler EventCallbackRef 处理连接请求的回调指令
local_deliver_handler EventCallbackRef 处理本地连接请求的回调指令
data_buf bufferlist 存放数据的bl
data_blp bufferlist::iterator data_buf的指针
front, middle, data bufferlist 头部,中间部分和数据部分
connect_msg ceph_msg_connect 消息连接
net NetHandler NetHandler的实例,处理网络连接
*center EventCenter EventCenter的对象,用来调用事件中心的操作
*recv_buf char 用于从套接字中接收消息的buf

AsyncConnection类的成员方法

编号 成员方法名 返回值类型 描述
1 do_sendmsg(struct msghdr &msg, int len, bool more) int 返回的是需要被发送的消息的长度
2 try_send(bufferlist &bl, bool send=true) int 加上一个write_lock,然后调用_try_send来真正发送消息
3 _try_send(bufferlist &bl, bool send=true) int 如果send的值为false,会将bl添加到send buffer中,这么做的目的是避免messenger线程外发生错误
4 prepare_send_message(uint64_t features, Message *m, bufferlist &bl) void 将m中的数据encode和copy到bl中
5 read_until(uint64_t needed, char *p) int 循环读,调用read_bulk,如果r的值不为0,一直循环下去
6 _process_connection() int 处理连接,根据不同的state状态执行不同的操作,关键点是state的值不同
7 _connect() void 首先将STATE_CONNECTING的值赋给state,然后调用dispatch_event_external将read_handler事件添加到external_events集合中
8 _stop() void 注销连接,然后将STATE_CLOSED赋给state,关闭套接字,清理事件
9 handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r) int 根据reply.tag值的不同执行不同的操作
10 handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl) int 处理消息的连接,如果成功则接收这个连接
11 discard_out_queue() void 清除AsyncConnection的消息队列
12 requeue_sent() void 重新将send队列入队
13 handle_ack(uint64_t seq) void 处理确认信息,删除send队列中的message
14 write_message(Message *m, bufferlist& bl) int 将消息写到complete_bl中,调用_try_send发送消息
15 _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &replybufferlist authorizer_reply) int 有一个bufferlist结构的reply_bl,调用try_send将reply_bl发送出去
16 is_queued() bool 判断是否入队列,主要是out_q和outcoming_bl这两个队列
17 shutdown_socket() void 关闭套接字
18 connect(const entity_addr_t& addr, int type) void 在AsyncConnection第一次构造的时候使用,然后调用_connect()函数
19 accept(int sd) void 将state的值设置为STATE_ACCEPTING,然后调用create_file_event函数创建文件事件,调用dispatch_event_external函数将回调指令分发出去
20 send_message(Message *m) int 一般需要发送消息的时候都会调用这个函数进行具体的发送操作,在此之前已经完成了连接
21 handle_write() void 使用一个while循环调用write_message将data写入到m中
22 process() void 还是根据不同的state值做不同的处理
23 local_deliver() void 这个函数主要用来处理本地的消息传递
24 cleanup_handler() void 清理事件处理助手,将其重置

4、 EventCenter类和EventCallback类

  • AsyncMessenger消息模块是基于epoll的事件驱动方式,取代了之间每个连接需要建立一个Pipe,然后创建两个线程,一个用来处理消息的接收,另一个用来处理消息的发送,其它操作也是借助线程的方式。不同于SimpleMessenger消息模块,AsyncMessenger消息模块使用了事件,所以需要一个处理事件的数据结构,即EventCenter,用一个线程专门用来循环处理,所有的操作都是通过回调函数来进行的,避免了大量线程的使用。在EventCenter定义了一个FileEvent的数据结构和一个TimeEvent的数据结构,大部分事件都是FileEvent。下面介绍EventCenter中的主要成员变量和方法。
成员变量(方法)名 返回值类型 描述
FileEvent struct 文件事件类
TimeEvent struct 时间事件类
external_events deque(EventCallbackRef) 用于存放外部事件的队列
*file_events FileEvent FileEvent的实例
*driver EventDriver EventDriver的实例
time_events map(utime_t, list(TimeEvent)) 事件事件的容器
net NetHandler NetHandler的实例
process_time_events() int 处理时间事件
*_get_file_event(int fd) FileEvent 获取文件事件
init(int nevent) int 根据不同的宏创建不同的事件处理器;调用create_file_event创建事件。
create_file_event(int fd, int mask, EventCallbackRef ctxt) int 根据fd和mask创建文件事件,调用add_event函数将创建的事件加入到事件处理器中去处理
create_time_event(uint64_t milliseconds, EventCallbackRef ctxt) uint64_t 创建time event,然后将其加入到time_events中
delete_file_event(int fd, int mask) void 删除file event
delete_time_event(uint64_t id) void 删除time event
process_events(int timeout_microseconds) int 如果事件是read_cb或者write_cb则调用相应的回调函数来进行处理(由do_request函数来完成);如果不是这两种事件,则将external_events队列中的事件取出放入cur_process中,调用一个while一个循环来处理。
dispatch_event_external(EventCallbackRef e) void 将创建的外部事件放入external_events队列中
  • EventCallback是一个接口类,根据操作不同会定义其子类,使用方式用一个虚函数do_request()来回调处理不同的事件,具体的处理是在do_request()中进行的。

5、 EventDriver类、EpollDriver类、KqueueDriver类和SelectDriver类

  • 事件中心相当于一个事件处理的容器,它本身并不真正去处理事件,通过回调函数的方式来完成事件的处理。同样,如何获取需要处理的事件也不是事件中心来完成的,它只负责处理,具体对需要处理的事件的获取是通过EventDriver来完成的,EventDriver是一个接口类,其实现主要是由EpollDriver、KqueueDriver和SelectDriver三个类操作的。Ceph支持多种操作系统的使用,如果使用的是Linux操作系统,使用EpollDriver,如果是BSD,使用KqueueDriver,如果都不是的情况下再使用SelectDriver(系统定义为最坏状况下)。事件驱动的执行主要依赖于epoll的方式,其中主要有三个函数:epoll_create(在epoll文件系统建立了个file节点,并开辟epoll自己的内核高速cache区,建立红黑树,分配好想要的size的内存对象,建立一个list链表,用于存储准备就绪的事件); epoll_ctl(把要监听的socket放到对应的红黑树上,给内核中断处理程序注册一个回调函数,通知内核,如果这个句柄的数据到了,就把它放到就绪列表);epoll_wait(观察就绪列表里面有没有数据,并进行提取和清空就绪列表,非常高效)。由于本项目运行在Linux中,所以下面以EpollDriver为例对Ceph的底层事件驱动进行描述。

EpollDriver的成员变量(方法)

成员变量(方法)名 返回值类型 描述
epfd int epoll的文件描述符
*events struct epoll_event epoll_event的一个对象
size int 在执行初始化时获取文件数量
init(int nevent) int 执行EpollDriver的初始化,主要是调用epoll_create,建立epoll对象
add_event(int fd, int cur_mask, int add_mask) int 根据事件的mask执行不同的操作,如果是EVENT_READABLE,表示对应的文件描述符可读,如果是EVENT_WRITABLE,表示文件描述符可写,然后调用epoll_ctl添加事件
del_event(int fd, int cur_mask, int del_mask) int 调用epoll_ctl执行事件的修改或者删除
resize_events(int newsize) int 清空事件数量
event_wait(vector &fired_events, struct timeval *tp) int 调用epoll_wait循环处理事件

6、NetHandler类

  • NetHandler是AsyncMessenger模块中用于网络处理的类,其中定义了6个关键成员方法,其中的NetHandler::generic_connect()是每个连接都需要使用到的,创建socket、将socket设置为非阻塞、设置socket选项等都是经常使用的方法,下表对其详细分析。
成员方法名 返回值类型 描述
create_socket(int domain, bool reuse_addr=false) int 创建socket
generic_connect(const entity_addr_t& addr, bool nonblock) int 通信双方通过该函数产生连接,首先调用create_socket()创建一个socket,然后将创建的socket设置为非阻塞,完成以后调用系统socket:: connect()建立连接
set_nonblock(int sd) int 将Socket设置为非阻塞的
set_socket_options(int sd) void 调用系统的socket::setsockopt函数,设置套接字的一些关键选项
connect(const entity_addr_t &addr) int 对NetHandler::generic_connect()进行了一个简单的封装
nonblock_connect(const entity_addr_t &addr) int 接口函数,设置非阻塞的连接

上文描述了AsyncMessenger基本数据结构及框架,下一章描述代码流程。

阅读(1667) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~