Chinaunix首页 | 论坛 | 博客
  • 博客访问: 471524
  • 博文数量: 17
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1693
  • 用 户 组: 普通用户
  • 注册时间: 2013-09-13 09:23
个人简介

前EMC高级软件工程师,现小米分布式存储码农,关注分布式存储,文件系统,Linux内核。微博: http://weibo.com/u/2203007022

文章分类

全部博文(17)

文章存档

2015年(1)

2014年(6)

2013年(10)

分类: 架构设计与优化

2013-09-13 10:03:38

 

一. server端服务启动流程  

  • 创建一个Processor(rpc_handler)实例,用于处理特定的输入;
  • TBufferedTransportFactory用于数据传输;
  • threadManager.start()启动服务线程;
  • Protocol表示数据传输的上层协议;

  • Server类用于接受客户端请求。

二. threadManager.start()

这个方法主要用于启动服务端的工作线程。

1. 调用addWorkder启动workerCount_个工作线程;
a) 创建workerCount_ThreadManager::WorkerBoostThread相互引用。
b) 设置ThreadManager::Worker的状态为STARTING
c) 调用每个BoostThreadstart方法;
    i.
创建boost::thread,指定入口函数为threadMain;
      ii. 设置BoostThread的状态为starting
d) BoostThread加入idMap中;
e) 等待所有的线程启动完成。

2. boost::thread的入口函数threadMain()

threadMain()函数是工作线程的入口函数,主要功能是取得threadManager(std::set tasks_;)中的task实例并进行处理,其具体实现步骤如下所示:

    从入口函数的参数中得到BoostThread指针;

    a) 设置BoostThread的状态为started;

    b) 调用ThreadManager::Workerstart方法,循环处理tasks_中的task;

    i. 检查workerCount是否达到最大值;

 ii. 循环处理以下情况;

                1. 没有task需要处理, manager_->monitor_.wait();等待处理task

                2. task需要处理,则取得task_中的第一个task,设置task的状态为EXECUTING,调用task->run()处理任务;

   c) 线程退出,设置BoostThread的状态为stopped

三. TThreadPoolServer.serv()

主线程:主要用于接收客户端的请求并组成task并加入到threadManagertask_中,工作线程取得task并进行处理。 TThreadPoolServer.serv()的具体实现:

1. 调用serverTransport_->listten()方法,serverTransportserverSocket类型的对象。

a) 尝试连接

b) 创建socketbindlisten.

2.  serverTransport_->accept(); 同步等待客户端的连接,连接成功则转到步骤3

3.  利用工厂得到transport对象和protocol对像;

4.  创建TThreadPoolServer::Task对象,传入serverprocessorprotocoltransportclient对象:

server对象为TThreadPoolServer类型,

processor对象为FileStorageProcessor对象,

protocolTBinaryProtocol类型,

transport对象为TBufferedTransport类型,都可以通过TThreadPoolServer构造函数配置。

client对象为accept返回的socket

5.      TThreadPoolServer::Task对象插入到threadManager的任务队列中;

a)  删除过期的任务,本例中将过期时间设置为0表明不过期;

b)  检查pending 任务的数量是否超过最大值;

c)  task加到任务队列中;

d)  如果有空闲的thread,就唤醒一个线程处理。

四. TThreadPoolServer::Task::run()

这个函数在工作线程中调用,主要作用:循环读取消息头,得到version,函数名和messageType,并且调用对应的处理函数得到输出传给客户端,循环直到socket连接断开。如果socket不断开,就一直循环。
A. 循环读取消息头并处理

1. 得到EventHandlercallContext

2. 处理callContext

3. 循环调用FileStorageProcessor::process处理;
a) iprot->readMessageBegin(fname, mtype, seqid)得到函数名,MessageTypeseqid

            i.   result += readI32(sz);  得到version

            ii.  result += readStringBody(name, sz); 得到fname

            iii. result += readByte(type);   得到MessageType

4. 如果scoket连接中断则则出循环

5. process_fn(iprot, oprot, fname, seqid, callContext); 调用特定的函数处理;

2
B.
Protocol怎样从socket读取数据
ReadI32读取4个字节。readString会先读取size4个字节),然后读取string

具体步骤:

1. 建立结构体,便于转换字节序;

2. 调用TBufferedTransport::readAll读取4个字节,直接调用TBufferBase::readAll

a)  检查rBuffer中是否有数据,有数据则直接中rBuffer中获取;

b)  没有数据则调用Transport::readAll

            i. 调用TBufferBase::read,检查rBuffer中是否有数据,没有则从socket中读取,transport_就是保存的socket

            ii. 将读取的数据转换为主机字节序。
C. 对应函数名的特定处理函数
函数名对应的处理函数存放在processMap_中,如”WriteFile”对应的处理函数为processMap_["WriteFile"] = &FileStorageProcessor::process_WriteFile;
具体步骤:

1. 调用args.read(iprot);读取参数

a) 调用readFieldBegin得到参数type和参数id

b) 得到参数的值

            i. 调用具体的RPC方法,并得到结果result.success

            ii. 调用oprot->writeMessageBegin("WriteFile", apache::thrift::protocol::T_REPLY, seqid);写入messagetype

            iii. 调用result.write(oprot)写入结果,通过socket传输。

阅读(5245) | 评论(1) | 转发(0) |
0

上一篇:没有了

下一篇:C++ STL map中的Key使用自定义类型

给主人留下些什么吧!~~

7大爷2013-09-16 09:38:16

 学习