Chinaunix首页 | 论坛 | 博客
  • 博客访问: 88167
  • 博文数量: 12
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 09:57
文章分类

全部博文(12)

文章存档

2015年(11)

2014年(1)

我的朋友

分类: 架构设计与优化

2015-09-13 16:35:55

  Google Protocol Buffer(简称protobuf)是Google内部混合语言数据标准,protobuf是一种紧凑的可扩展的二进制消息格式,适合做网络数据传输、数据存储的消息格式(摘自网络)。这里不介绍protobuf的基本用法,详情可以参考: 1. ProtoBuf语法指南   style="line-height:2;">  2. 玩转ProtoBuf    3. Protobuf 反射和相关API   />   这里主要介绍Muduo中使用ProtoBuffer实现消息类型自解析自动调用相应的处理函数,对应的代码为:  />   首先我们要描述一下需要解决的问题:1. 处理端有各种消息的完整类型,但是不知道当前接收的消息类型(消息识别)  2. 处理端可以正确处理不同种类的消息(消息解析 + 消息处理)。2. 处理端不要用大量的if else或者switch来判断消息的类型然后做相应的处理 (代码不要太丑陋太难维护

一. 消息识别

 为了达到消息识别的目的,首先需要定义消息格式:
 
 len : 消息长度
 namelen : 消息类型名长度
 messageName : 消息类型名
 protoBufData : 消息体数据
 checkSum : 校验和 (这里使用 adler32 算法)
 注意:使用ProtoBuffer进行传输是不需要消息的版本号的,因为ProtoBuffer支持optional字段,可以通过增加或减少optional字段来实现不同版本的消息,而且不会引起解析错误。
 

二. 消息解析

当处理端收到消息后,应该怎样处理了,注意处理端收到的字节流很可能不能构成一个完整的消息,也可能构成多个消息
       对数据的解码在ProtobufCodec类中,onMessage函数就是处理原始字节流的,parse函数从原始字节流中提取出消息

点击(此处)折叠或打开

  1. void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
  2.                               Buffer* buf,
  3.                               Timestamp receiveTime)
  4. {
  5.   while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)
  6.   {
  7.     // 1. 获取消息长度 (头四个字节)
  8.     const int32_t len = buf->peekInt32();
  9.     if (len > kMaxMessageLen || len < kMinMessageLen)
  10.     {
  11.       errorCallback_(conn, buf, receiveTime, kInvalidLength);
  12.       break;
  13.     }
  14.     else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
  15.     {
  16.       ErrorCode errorCode = kNoError;
  17.       // 2. parse生成具体消息
  18.       MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);
  19.       if (errorCode == kNoError && message)
  20.       {
  21.         // 3. 调用消息处理函数
  22.         messageCallback_(conn, message, receiveTime);
  23.         // 4. 从缓冲区中删除已处理的消息字节数据
  24.         buf->retrieve(kHeaderLen+len);
  25.       }
  26.       else
  27.       {
  28.         errorCallback_(conn, buf, receiveTime, errorCode);
  29.         break;
  30.       }
  31.     }
  32.     else
  33.     {
  34.       break;
  35.     }
  36.   }
  37. }

点击(此处)折叠或打开

  1. MessagePtr ProtobufCodec::parse(const char* buf, int len, ErrorCode* error)
  2. {
  3.   MessagePtr message;

  4.   // check sum
  5.   // 1. 检查校验和
  6.   int32_t expectedCheckSum = asInt32(buf + len - kHeaderLen);
  7.   int32_t checkSum = static_cast<int32_t>(
  8.       ::adler32(1,
  9.                 reinterpret_cast<const Bytef*>(buf),
  10.                 static_cast<int>(len - kHeaderLen)));
  11.   if (checkSum == expectedCheckSum)
  12.   {
  13.     // get message type name
  14.     // 2. 获取消息类型名长度
  15.     int32_t nameLen = asInt32(buf);
  16.     if (nameLen >= 2 && nameLen <= len - 2*kHeaderLen)
  17.     {
  18.       // 3. 获取消息类型名
  19.       std::string typeName(buf + kHeaderLen, buf + kHeaderLen + nameLen - 1);
  20.       // create message object
  21.       // 4. 根据消息类型名创建一个消息
  22.       message.reset(createMessage(typeName));
  23.       if (message)
  24.       {
  25.         // parse from buffer
  26.         const char* data = buf + kHeaderLen + nameLen;
  27.         int32_t dataLen = len - nameLen - 2*kHeaderLen;
  28.         // 5. 从原始字节流中反序列化出消息数据
  29.         if (message->ParseFromArray(data, dataLen))
  30.         {
  31.           *error = kNoError;
  32.         }
  33.         else
  34.         {
  35.           *error = kParseError;
  36.         }
  37.       }
  38.       else
  39.       {
  40.         *error = kUnknownMessageType;
  41.       }
  42.     }
  43.     else
  44.     {
  45.       *error = kInvalidNameLen;
  46.     }
  47.   }
  48.   else
  49.   {
  50.     *error = kCheckSumError;
  51.   }

  52.   return message;
  53. }
 
这里很重要的一点就是如何根据消息类型名创建一个消息,createMessage完成这个功能

点击(此处)折叠或打开

  1. google::protobuf::Message* ProtobufCodec::createMessage(const std::string& typeName)
  2. {
  3.   google::protobuf::Message* message = NULL;
  4.   const google::protobuf::Descriptor* descriptor =
  5.     google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
  6.   if (descriptor)
  7.   {
  8.     const google::protobuf::Message* prototype =
  9.       google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);
  10.     if (prototype)
  11.     {
  12.       message = prototype->New();
  13.     }
  14.   }
  15.   return message;
  16. }
  该函数用到的几个重要的步骤:
  1. 获取 MessageFactory 对象
    MessageFactory 类提供了一个 generated_factory() 的静态函数,此静态函数可以获取一个 MessageFactory 对象,此 MessageFactory 对象能够用来创建被编译入程序的所有的 message 对象。注意,此 Factory 是一个 Singleton,因此重复多次调用 generated_factory 函数不会创建多个 MessageFactory 对象,另外调用者也不能通过调用 delete 删除此对象。
  2. 获取 DescriptorPool 对象
    通过 DescriptorPool 类的 generated_pool() 静态函数能够获取 DescriptorPool 的指针。此 DescriptorPool 中包含了被编译入程序的 message 的 descriptor。generated_pool 类似于 generated_factory 函数,可以被重复调用多次而不会创建多个 DescriptorPool 对象。
  3. 获取 message descriptor
    有了 DescriptorPool 对象就可以获取到 message 的 descriptor 了。常见的一个函数是 const Descriptor * FindMessageTypeByName(const string & name) const,此函数可以通过 message 名字获取到顶层 message 的 descriptor。当然除此之外还有一些 API 可以用来获取 message discriptor,可以参考相关文档,这里就不一一详述了。
  4. 获取 message prototype 并构建 message 对象
    前面已经讲述了获取 MessageFactory 对象的方法,有了 MessageFactory 对象就可以通过函数 MessageFactory::GetPrototype(const Descriptor * type) 获取 message prototype(实质上就是一个 message 对象)。通过调用 message prototype 的 New 函数则可以构造此类型的 message。
    对同一个 Descriptor 多次调用 MessageFactory::GetPrototype 函数将返回同一个对象。通过调用 prototype 的 New 函数构造的 message 对象必须在 MessageFactory 销毁前销毁。


三. 消息处理

  消息的处理是在函数messageCallBack_中进行,这个函数是我们在创建QueryServer时绑定的:

点击(此处)折叠或打开

  1. QueryServer(EventLoop* loop,
  2.               const InetAddress& listenAddr)
  3.   : server_(loop, listenAddr, "QueryServer"),
  4.     dispatcher_(boost::bind(&QueryServer::onUnknownMessage, this, _1, _2, _3)),

  5.     // ProtobufDispatcher::onProtobufMessage 绑定到messageCallBack_上
  6.     codec_(boost::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3))
  7.   {
  8.     // 注册的消息处理函数
  9.     dispatcher_.registerMessageCallback<muduo::Query>(
  10.         boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
  11.     dispatcher_.registerMessageCallback<muduo::Answer>(
  12.         boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));
  13.     server_.setConnectionCallback(
  14.         boost::bind(&QueryServer::onConnection, this, _1));
  15.     server_.setMessageCallback(
  16.         boost::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));
  17.   }
 onProtobufMessage根据每个类型的descriptor来从一张表中查找处理函数,因为每种类型都有一个全局的Descriptor对象,它的地址不变,因此使用它来查找注册的函数

点击(此处)折叠或打开

  1. void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
  2.                          const MessagePtr& message,
  3.                          muduo::Timestamp receiveTime) const
  4.   {
  5.     CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());
  6.     if (it != callbacks_.end())
  7.     {
  8.       it->second->onMessage(conn, message, receiveTime);
  9.     }
  10.     else
  11.     {
  12.       defaultCallback_(conn, message, receiveTime);
  13.     }
  14.   }
至此,我们就实现了消息的自解析和自处理。

阅读(2353) | 评论(0) | 转发(0) |
0

上一篇:Muduo学习:shared_ptr, weak_ptr

下一篇:没有了

给主人留下些什么吧!~~