Google Protocol Buffer(简称protobuf)是Google内部混合语言数据标准,protobuf是一种紧凑的可扩展的二进制消息格式,适合做网络数据传输、数据存储的消息格式(摘自网络)。这里不介绍protobuf的基本用法,详情可以参考: 1. ProtoBuf语法指南 style="line-height:2;"> 2. 玩转ProtoBuf 3. Protobuf 反射和相关API />
这里主要介绍Muduo中使用ProtoBuffer实现消息类型自解析自动调用相应的处理函数,对应的代码为: />
首先我们要描述一下需要解决的问题:1. 处理端有各种消息的完整类型,但是不知道当前接收的消息类型(消息识别) 2. 处理端可以正确处理不同种类的消息(消息解析 + 消息处理)。2. 处理端不要用大量的if else或者switch来判断消息的类型然后做相应的处理 (代码不要太丑陋太难维护)
一. 消息识别
为了达到消息识别的目的,首先需要定义消息格式:
len : 消息长度
namelen : 消息类型名长度
messageName : 消息类型名
protoBufData : 消息体数据
checkSum : 校验和 (这里使用 adler32 算法)
注意:使用ProtoBuffer进行传输是不需要消息的版本号的,因为ProtoBuffer支持optional字段,可以通过增加或减少optional字段来实现不同版本的消息,而且不会引起解析错误。
二. 消息解析
当处理端收到消息后,应该怎样处理了,注意处理端收到的字节流很可能不能构成一个完整的消息,也可能构成多个消息
对数据的解码在ProtobufCodec类中,onMessage函数就是处理原始字节流的,parse函数从原始字节流中提取出消息
-
void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
-
Buffer* buf,
-
Timestamp receiveTime)
-
{
-
while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)
-
{
-
// 1. 获取消息长度 (头四个字节)
-
const int32_t len = buf->peekInt32();
-
if (len > kMaxMessageLen || len < kMinMessageLen)
-
{
-
errorCallback_(conn, buf, receiveTime, kInvalidLength);
-
break;
-
}
-
else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
-
{
-
ErrorCode errorCode = kNoError;
-
// 2. parse生成具体消息
-
MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);
-
if (errorCode == kNoError && message)
-
{
-
// 3. 调用消息处理函数
-
messageCallback_(conn, message, receiveTime);
-
// 4. 从缓冲区中删除已处理的消息字节数据
-
buf->retrieve(kHeaderLen+len);
-
}
-
else
-
{
-
errorCallback_(conn, buf, receiveTime, errorCode);
-
break;
-
}
-
}
-
else
-
{
-
break;
-
}
-
}
-
}
-
MessagePtr ProtobufCodec::parse(const char* buf, int len, ErrorCode* error)
-
{
-
MessagePtr message;
-
-
// check sum
-
// 1. 检查校验和
-
int32_t expectedCheckSum = asInt32(buf + len - kHeaderLen);
-
int32_t checkSum = static_cast<int32_t>(
-
::adler32(1,
-
reinterpret_cast<const Bytef*>(buf),
-
static_cast<int>(len - kHeaderLen)));
-
if (checkSum == expectedCheckSum)
-
{
-
// get message type name
-
// 2. 获取消息类型名长度
-
int32_t nameLen = asInt32(buf);
-
if (nameLen >= 2 && nameLen <= len - 2*kHeaderLen)
-
{
-
// 3. 获取消息类型名
-
std::string typeName(buf + kHeaderLen, buf + kHeaderLen + nameLen - 1);
-
// create message object
-
// 4. 根据消息类型名创建一个消息
-
message.reset(createMessage(typeName));
-
if (message)
-
{
-
// parse from buffer
-
const char* data = buf + kHeaderLen + nameLen;
-
int32_t dataLen = len - nameLen - 2*kHeaderLen;
-
// 5. 从原始字节流中反序列化出消息数据
-
if (message->ParseFromArray(data, dataLen))
-
{
-
*error = kNoError;
-
}
-
else
-
{
-
*error = kParseError;
-
}
-
}
-
else
-
{
-
*error = kUnknownMessageType;
-
}
-
}
-
else
-
{
-
*error = kInvalidNameLen;
-
}
-
}
-
else
-
{
-
*error = kCheckSumError;
-
}
-
-
return message;
-
}
这里很重要的一点就是如何根据消息类型名创建一个消息,createMessage完成这个功能
-
google::protobuf::Message* ProtobufCodec::createMessage(const std::string& typeName)
-
{
-
google::protobuf::Message* message = NULL;
-
const google::protobuf::Descriptor* descriptor =
-
google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
-
if (descriptor)
-
{
-
const google::protobuf::Message* prototype =
-
google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);
-
if (prototype)
-
{
-
message = prototype->New();
-
}
-
}
-
return message;
-
}
该函数用到的几个重要的步骤:
-
获取 MessageFactory 对象
MessageFactory 类提供了一个 generated_factory() 的静态函数,此静态函数可以获取一个 MessageFactory 对象,此 MessageFactory 对象能够用来创建被编译入程序的所有的 message 对象。注意,此 Factory 是一个 Singleton,因此重复多次调用 generated_factory 函数不会创建多个 MessageFactory 对象,另外调用者也不能通过调用 delete 删除此对象。
-
获取 DescriptorPool 对象
通过 DescriptorPool 类的 generated_pool() 静态函数能够获取 DescriptorPool 的指针。此 DescriptorPool 中包含了被编译入程序的 message 的 descriptor。generated_pool 类似于 generated_factory 函数,可以被重复调用多次而不会创建多个 DescriptorPool 对象。
-
获取 message descriptor
有了 DescriptorPool 对象就可以获取到 message 的 descriptor 了。常见的一个函数是 const Descriptor * FindMessageTypeByName(const string & name) const,此函数可以通过 message 名字获取到顶层 message 的 descriptor。当然除此之外还有一些 API 可以用来获取 message discriptor,可以参考相关文档,这里就不一一详述了。
-
获取 message prototype 并构建 message 对象
前面已经讲述了获取 MessageFactory 对象的方法,有了 MessageFactory 对象就可以通过函数 MessageFactory::GetPrototype(const Descriptor * type) 获取 message prototype(实质上就是一个 message 对象)。通过调用 message prototype 的 New 函数则可以构造此类型的 message。
对同一个 Descriptor 多次调用 MessageFactory::GetPrototype 函数将返回同一个对象。通过调用 prototype 的 New 函数构造的 message 对象必须在 MessageFactory 销毁前销毁。
三. 消息处理
消息的处理是在函数messageCallBack_中进行,这个函数是我们在创建QueryServer时绑定的:
-
QueryServer(EventLoop* loop,
-
const InetAddress& listenAddr)
-
: server_(loop, listenAddr, "QueryServer"),
-
dispatcher_(boost::bind(&QueryServer::onUnknownMessage, this, _1, _2, _3)),
-
-
// 将ProtobufDispatcher::onProtobufMessage 绑定到messageCallBack_上
-
codec_(boost::bind(&ProtobufDispatcher::onProtobufMessage, &dispatcher_, _1, _2, _3))
-
{
-
// 注册的消息处理函数
-
dispatcher_.registerMessageCallback<muduo::Query>(
-
boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
-
dispatcher_.registerMessageCallback<muduo::Answer>(
-
boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));
-
server_.setConnectionCallback(
-
boost::bind(&QueryServer::onConnection, this, _1));
-
server_.setMessageCallback(
-
boost::bind(&ProtobufCodec::onMessage, &codec_, _1, _2, _3));
-
}
onProtobufMessage根据每个类型的descriptor来从一张表中查找处理函数,因为每种类型都有一个全局的Descriptor对象,它的地址不变,因此使用它来查找注册的函数
-
void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
-
const MessagePtr& message,
-
muduo::Timestamp receiveTime) const
-
{
-
CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());
-
if (it != callbacks_.end())
-
{
-
it->second->onMessage(conn, message, receiveTime);
-
}
-
else
-
{
-
defaultCallback_(conn, message, receiveTime);
-
}
-
}
至此,我们就实现了消息的自解析和自处理。
阅读(2342) | 评论(0) | 转发(0) |