(本文纯属个人见解)
本文主要分析TFS在写文件时,遇到数据不一致时的处理流程
TFS写文件的正常流程的源码分析见这里:
分别是写文件时NameServer、DataServer和Client端的流程分析
首先说明:
1.TFS以Block为单位进行数据管理(这个Block类似GFS中的Chunk数据块)
2.TFS目前主要用于支持小文件的存储(GFS和Hadoop则是大文件的顺序读写),每个Block中包含多个小文件
3.TFS写数据时,是将这个小文件添加到某个Block中,NameServer分配用一个lease来管理Block的数据更新。在某个Block对应的lease被分配出去之后,其他的client不能对这个Block进行写操作,也就是:不支持多客户端同时对一个Block进行写操作
DataServer接收到Clienty(或者是primaryDS)发送过来的写操作指令,调用这个write_data函数进行数据写入,其中数据一致性在data_management_.write_data()函数里调用
- int DataService::write_data(WriteDataMessage* message)
- {
- WriteDataInfo write_info = message->get_write_info();
- //注意lease_id和version在保证数据一致性上的作用
- //lease_id和version都是从NameServer传递过来的
- int32_t lease_id = message->get_lease_id();
- int32_t version = message->get_block_version();
- char* msg_data = message->get_data();
- TBSYS_LOG(
- DEBUG,
- "write data start, blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, version: %u, leaseid: %u, isserver: %d\n",
- write_info.block_id_, write_info.file_id_, write_info.file_number_, version, lease_id, write_info.is_server_);
- UpdateBlockType repair = UPDATE_BLOCK_NORMAL;
- //在本节点写入数据(primary和非primaryDS)
- //数据一致性的检查在这个write_data函数中操作
- int ret = data_management_.write_data(write_info, lease_id, version, msg_data, repair);
- if (EXIT_NO_LOGICBLOCK_ERROR == ret)
- {
- return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_,
- "write data failed. block is not exist. blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d",
- write_info.block_id_, write_info.file_id_, ret);
- }
- //这里处理DS或NS的版本错误
- else if (EXIT_BLOCK_DS_VERSION_ERROR == ret || EXIT_BLOCK_NS_VERSION_ERROR == ret)
- {
- MessageFactory::send_error_message(
- message,
- TBSYS_LOG_LEVEL(ERROR),
- data_server_info_.id_,
- "write data failed. block version error. blockid: %u, fileid: %" PRI64_PREFIX "u, error ret: %d, repair: %d",
- write_info.block_id_, write_info.file_id_, ret, repair);
- //提供错误信息给ds_requester,让ds_requester去发起数据修复的动作
- if (TFS_SUCCESS != ds_requester_.req_update_block_info(write_info.block_id_, repair))
- {
- TBSYS_LOG(ERROR, "req update block info failed. blockid: %u, repair: %d", write_info.block_id_, repair);
- }
- return TFS_SUCCESS;
- }
- ......其他正常处理流程,这里不作分析
data_management_.write_data()函数
- int DataManagement::write_data(const WriteDataInfo& write_info, const int32_t lease_id, int32_t& version,
- const char* data_buffer, UpdateBlockType& repair)
- {
- TBSYS_LOG(DEBUG,
- "write data. blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, lease: %d",
- write_info.block_id_, write_info.file_id_, write_info.file_number_, lease_id);
- //if the first fragment, check version
- //这里有个疑问,为什么第一个fragment才做check version????
- if (0 == write_info.offset_)
- {
- LogicBlock* logic_block = BlockFileManager::get_instance()->get_logic_block(write_info.block_id_);
- if (NULL == logic_block)
- {
- TBSYS_LOG(ERROR, "blockid: %u is not exist.", write_info.block_id_);
- return EXIT_NO_LOGICBLOCK_ERROR;
- }
//在这里,使用版本号来做数据一致性检查
- int ret = logic_block->check_block_version(version, repair);
- //如果版本号错误,这里直接返回了,不做后续操作
- if (TFS_SUCCESS != ret)
- {
- TBSYS_LOG(DEBUG, "check_block_version error. blockid: %u, ret: %d", write_info.block_id_, ret);
- return ret;
- }
- }
......其实正常处理流程,跳过
- }
上面函数调用的check_block_version()函数
使用版本号进行数据一致性检查,如果DS数据不一致,设置repair标志,返回给NS,要求其做数据修复工作
- int LogicBlock::check_block_version(int32_t& remote_version, UpdateBlockType& repair)
- {
- ScopedRWLock scoped_lock(rw_lock_, WRITE_LOCKER);
- int ret = index_handle_->check_block_version(remote_version);
- //版本错误信息分为EXIT_BLOCK_NS_VERSION_ERROR和EXIT_BLOCK_DS_VERSION_ERROR
- if (EXIT_BLOCK_DS_VERSION_ERROR == ret)
- {
- //这个REPAIR标记难道是标志数据不一致的,需要master发起数据修复
- repair = UPDATE_BLOCK_REPAIR;
- }
- return ret;
- }
这里是真正的版本号检查过程
(有些问题尚待明确:
1. 为什么要将BLOCK_VERSION_MAGIC_NUM设置为2,这是出于什么考虑?
2. 对于DS下线、上线出现的数据不一致,个人理解是只有可能出现NS上的版本号小于DS的版本号,为什么这里会处理NS版本号大于DS版本号的情况?这是怎么考虑的)
- int IndexHandle::check_block_version(int32_t& remote_version)
- {
- TBSYS_LOG(DEBUG, "block version. blockid: %u, remote version: %u, local version: %u", block_info()->block_id_,
- remote_version, block_info()->version_);
- if (remote_version != block_info()->version_)
- {
- //we assume that the difference between the version number greater than BLOCK_VERSION_MAGIC_NUM is illegal
- //为什么要讲BLOCK_VERSION_MAGIC_NUM设置为2?
- //这里的remote_version即是NS上保存的version_id
- if (remote_version > block_info()->version_ + BLOCK_VERSION_MAGIC_NUM)
- {
- TBSYS_LOG(ERROR, "block version error. blockid: %u, remote version: %d, local version: %d",
- block_info()->block_id_, remote_version, block_info()->version_);
- return EXIT_BLOCK_DS_VERSION_ERROR;
- }
- else if (remote_version < block_info()->version_ - BLOCK_VERSION_MAGIC_NUM)
- {
- TBSYS_LOG(ERROR, "block version error. blockid: %u, remote version: %d, local version: %d",
- block_info()->block_id_, remote_version, block_info()->version_);
- return EXIT_BLOCK_NS_VERSION_ERROR;
- }
- //if local version less than ns version, and the difference is little than 2, replace the local version with ns version
- //只有大小不超过2才不会出错,这是怎样设计的????
- //远端version大于datanode上的block的version,说明datanode上的这个block是旧数据
- //为什么出现NS的versino_id大于DS上的version_id的情况下,是返回TFS_SUCCESS?而不是一种错误情况
- if (remote_version > block_info()->version_ && remote_version <= (block_info()->version_
- + BLOCK_VERSION_MAGIC_NUM))
- {
- TBSYS_LOG(ERROR,
- "remote version is larger, set block version. blockid: %u, remote version: %u, local version: %u",
- block_info()->block_id_, remote_version, block_info()->version_);
- //只更新版本号,在哪里做数据更新呢?难道不做数据更新?
- block_info()->version_ = remote_version;
- }
- else
- {
- TBSYS_LOG(ERROR,
- "block version is larger, set remote version. blockid: %u, remote version: %u, local version: %u",
- block_info()->block_id_, remote_version, block_info()->version_);
- remote_version = block_info()->version_;
- }
- }
- return TFS_SUCCESS;
- }
下面看ds_requester_.req_update_block_info(),在检测到数据不一致之后的处理
主要是发送UpdateBlockInfoMessage给NS,让NS发起修复操作
- int Requester::req_update_block_info(const uint32_t block_id, const UpdateBlockType repair)
- {
- UpdateBlockType tmp_repair = repair;
- BlockInfo* blk = NULL;
- if (UPDATE_BLOCK_MISSING != tmp_repair)
- {
- int32_t visit_count = 0;
- int ret = data_management_->get_block_info(block_id, blk, visit_count);
- if (EXIT_NO_LOGICBLOCK_ERROR == ret)
- {
- tmp_repair = UPDATE_BLOCK_MISSING;
- }
- else
- {
- if (NULL == blk)
- {
- TBSYS_LOG(ERROR, "blockid: %u can not find block info.", block_id);
- tmp_repair = UPDATE_BLOCK_REPAIR;
- }
- else
- {
- TBSYS_LOG(
- INFO,
- "req update block info, blockid: %u, version: %d, file count: %d, size: %d, delfile count: %d, del_size: %d, seqno: %d\n",
- blk->block_id_, blk->version_, blk->file_count_, blk->size_, blk->del_file_count_, blk->del_size_, blk->seq_no_);
- }
- }
- }
- int ret = TFS_ERROR;
- UpdateBlockInfoMessage ub_msg;
- ub_msg.set_block_id(block_id);
- ub_msg.set_block(blk);
- ub_msg.set_server_id(dataserver_id_);
- ub_msg.set_repair(tmp_repair);
- Message* return_msg = NULL;
- //发消息给NS,要求其发起数据不一致的修复操作
- //需要查看NS如何处理UpdateBlockInfoMessage消息
- ret = send_message_to_server(ns_ip_port_, &ub_msg, &return_msg);
- if (TFS_SUCCESS != ret || !return_msg)
- {
- return ret;
- }
- int need_expire = 0;
- if (NULL != return_msg)
- {
- StatusMessage* sm = dynamic_cast<StatusMessage*>(return_msg);
- if (STATUS_MESSAGE_OK == sm->get_status())
- {
- ret = TFS_SUCCESS;
- }
- else if (STATUS_MESSAGE_REMOVE == sm->get_status())
- {
- need_expire = 1;
- ret = TFS_SUCCESS;
- }
- else
- {
- TBSYS_LOG(ERROR, "req update block info: %s, id: %u, tmp_repair: %d\n", sm->get_error(), block_id, tmp_repair);
- }
- tbsys::gDelete(return_msg);
- }
- //把过期数据删除
- if (need_expire)
- {
- data_management_->del_single_block(block_id);
- }
- return ret;
- }
现在看NameServer端对数据修复指令的操作流程,调用update_block_info()函数
从ds_list中选择一个ds来执行数据块的复制操作
- int NameServer::update_block_info(Message* msg)
- {
- UpdateBlockInfoMessage* message = dynamic_cast<UpdateBlockInfoMessage*> (msg);
- uint32_t block_id = message->get_block_id();
- BlockInfo* block_info = message->get_block();
- uint64_t dest_ds_id = message->get_server_id();
- int32_t repair_flag = message->get_repair();
- if (block_id == 0)
- {
- TBSYS_LOG(WARN, "block(%u) not found", block_id);
- return EXIT_BLOCK_NOT_FOUND;
- }
- LayoutManager& block_ds_map = meta_mgr_.get_block_ds_mgr();
- if (repair_flag == UPDATE_BLOCK_NORMAL)
- {
- ......
- }
- BlockChunkPtr ptr = block_ds_map.get_block_chunk(block_id);
- ptr->mutex_.rdlock();
- BlockCollect* block_collect = ptr->find(block_id);
- if (block_collect == NULL)
- {
- ptr->mutex_.unlock();
- MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_BLOCK_NOT_FOUND,
- ns_global_info_.owner_ip_port_, "repair block, block collect not found by block id(%u)", block_id);
- return TFS_SUCCESS;
- }
- // need repair this block;
- //开始执行dest_ds_id上的block_id数据块的数据修复
- VUINT64 ds_list = *(block_collect->get_ds());
- ptr->mutex_.unlock();
- if ((repair_flag == UPDATE_BLOCK_MISSING)
- && (ds_list.size() >= static_cast<uint32_t> (SYSPARAM_NAMESERVER.min_replication_)))
- {
- MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_BLOCK_NOT_FOUND,
- ns_global_info_.owner_ip_port_, "already got block(%u) replica(%u)", block_id, ds_list.size());
- return TFS_SUCCESS;
- }
- if (ds_list.size() == 0)
- {
- MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_NO_DATASERVER,
- ns_global_info_.owner_ip_port_, "repair block(%u) no any dataserver hold it", block_id);
- return TFS_SUCCESS;
- }
- //从ds_list中找到第一个不是待修复的block_id的ds节点
- uint64_t source_ds_id = 0;
- for (uint32_t i = 0; i < ds_list.size(); ++i)
- {
- if (ds_list.at(i) != dest_ds_id)
- {
- source_ds_id = ds_list.at(i);
- break;
- }
- }
- if (source_ds_id == 0)
- {
- MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_NO_DATASERVER,
- ns_global_info_.owner_ip_port_, "repair block(%u) no any other dataserver(%u) hold a correct replica", block_id, ds_list.size());
- return TFS_SUCCESS;
- }
- message->reply_message(new StatusMessage(STATUS_MESSAGE_REMOVE));
- //取消这次写操作的lease(也就是放弃写操作?)
- meta_mgr_.get_lease_clerk().cancel_lease(block_id);
- //取消block_id到dest_ds_id的映射关系
- block_ds_map.release_ds_relation(block_id, dest_ds_id);
- //向数据块复制线程发送复制指令
- //给source_ds_id发送ReplicateBlockMessage消息,从source_ds_id复制数据块到dest_ds_id
- return replicate_thread_.get_executor().send_replicate_cmd(source_ds_id, dest_ds_id, block_id, REPLICATE_BLOCK_MOVE_FLAG_NO);
- }
然后看replicate_thread_.get_executor().send_replicate_cmd(),这个函数的功能是:
向source_ds_id发送一个复制指令,要求它复制数据块到dest_ds_id
- int ReplicateExecutor::send_replicate_cmd(const uint64_t source_id, const uint64_t destination_id,
-
const uint32_t block_id, int32_t is_move)
-
{
-
ReplicateBlockMessage dsmessage;
-
ReplBlock *replb = new ReplBlock();
-
replb->block_id_ = block_id;
-
replb->source_id_ = source_id;
-
replb->destination_id_ = destination_id;
-
replb->start_time_ = time(NULL);
-
replb->is_move_ = is_move;
-
BlockCollect *block_collect = meta_mgr_.get_block_ds_mgr().get_block_collect(block_id);
-
replb->server_count_ = block_collect->get_ds()->size();
-
-
dsmessage.set_repl_block(replb);
-
dsmessage.set_command(COMMAND_REPLICATE);
-
int iret = send_message_to_server(source_id, &dsmessage, NULL);
-
if (iret != TFS_SUCCESS)
-
{
-
tbsys::gDelete(replb);
-
TBSYS_LOG(ERROR, "send replicate command faild, block(%u), %s===>%s, is_move(%s)", block_id,
-
tbsys::CNetUtil::addrToString(source_id).c_str(), tbsys::CNetUtil::addrToString(destination_id).c_str(),
-
is_move == REPLICATE_BLOCK_MOVE_FLAG_NO ? "no" : "yes");
-
return iret;
-
}
-
mutex_.wrlock();
-
if (replicating_map_.find(block_id) == replicating_map_.end())
-
{
-
ReplicateStrategy::inc_ds_count(src_ds_counter_, source_id);
-
ReplicateStrategy::inc_ds_count(dest_ds_counter_, destination_id);
-
replicating_map_.insert(REPL_BLOCK_MAP::value_type(block_id, replb));
-
replb = NULL;
-
}
-
mutex_.unlock();
-
TBSYS_LOG(ERROR, "send replicate command successful, block(%u), %s===>%s, is_move(%s)", block_id,
-
tbsys::CNetUtil::addrToString(source_id).c_str(), tbsys::CNetUtil::addrToString(destination_id).c_str(),
-
is_move == REPLICATE_BLOCK_MOVE_FLAG_NO ? "no" : "yes");
-
tbsys::gDelete(replb);
-
return iret;
-
}
阅读(4380) | 评论(0) | 转发(0) |