Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2469837
  • 博文数量: 392
  • 博客积分: 7040
  • 博客等级: 少将
  • 技术积分: 4138
  • 用 户 组: 普通用户
  • 注册时间: 2009-06-17 13:03
个人简介

范德萨发而为

文章分类

全部博文(392)

文章存档

2017年(5)

2016年(19)

2015年(34)

2014年(14)

2013年(47)

2012年(40)

2011年(51)

2010年(137)

2009年(45)

分类: 数据库开发技术

2011-01-15 23:21:16

(本文纯属个人见解)

本文主要分析TFS在写文件时,遇到数据不一致时的处理流程
TFS写文件的正常流程的源码分析见这里:
分别是写文件时NameServer、DataServer和Client端的流程分析

首先说明:
1.TFS以Block为单位进行数据管理(这个Block类似GFS中的Chunk数据块)
2.TFS目前主要用于支持小文件的存储(GFS和Hadoop则是大文件的顺序读写),每个Block中包含多个小文件
3.TFS写数据时,是将这个小文件添加到某个Block中,NameServer分配用一个lease来管理Block的数据更新。在某个Block对应的lease被分配出去之后,其他的client不能对这个Block进行写操作,也就是:不支持多客户端同时对一个Block进行写操作

DataServer接收到Clienty(或者是primaryDS)发送过来的写操作指令,调用这个write_data函数进行数据写入,其中数据一致性在data_management_.write_data()函数里调用
  1. int DataService::write_data(WriteDataMessage* message)
  2.     {
  3.       WriteDataInfo write_info = message->get_write_info();
  4.      //注意lease_id和version在保证数据一致性上的作用
  5.      //lease_id和version都是从NameServer传递过来的
  6.       int32_t lease_id = message->get_lease_id();
  7.       int32_t version = message->get_block_version();
  8.       char* msg_data = message->get_data();

  9.       TBSYS_LOG(
  10.           DEBUG,
  11.           "write data start, blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, version: %u, leaseid: %u, isserver: %d\n",
  12.           write_info.block_id_, write_info.file_id_, write_info.file_number_, version, lease_id, write_info.is_server_);

  13.       UpdateBlockType repair = UPDATE_BLOCK_NORMAL;
  14.      //在本节点写入数据(primary和非primaryDS)
  15.      //数据一致性的检查在这个write_data函数中操作
  16.       int ret = data_management_.write_data(write_info, lease_id, version, msg_data, repair);
  17.       if (EXIT_NO_LOGICBLOCK_ERROR == ret)
  18.       {
  19.         return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_,
  20.             "write data failed. block is not exist. blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d",
  21.             write_info.block_id_, write_info.file_id_, ret);
  22.       }
  23.      //这里处理DS或NS的版本错误
  24.       else if (EXIT_BLOCK_DS_VERSION_ERROR == ret || EXIT_BLOCK_NS_VERSION_ERROR == ret)
  25.       {
  26.         MessageFactory::send_error_message(
  27.             message,
  28.             TBSYS_LOG_LEVEL(ERROR),
  29.             data_server_info_.id_,
  30.             "write data failed. block version error. blockid: %u, fileid: %" PRI64_PREFIX "u, error ret: %d, repair: %d",
  31.             write_info.block_id_, write_info.file_id_, ret, repair);
  32.         //提供错误信息给ds_requester,让ds_requester去发起数据修复的动作
  33.         if (TFS_SUCCESS != ds_requester_.req_update_block_info(write_info.block_id_, repair))
  34.         {
  35.           TBSYS_LOG(ERROR, "req update block info failed. blockid: %u, repair: %d", write_info.block_id_, repair);
  36.         }
  37.         return TFS_SUCCESS;
  38.       }

  39.        ......其他正常处理流程,这里不作分析

data_management_.write_data()函数
  1. int DataManagement::write_data(const WriteDataInfo& write_info, const int32_t lease_id, int32_t& version,
  2.         const char* data_buffer, UpdateBlockType& repair)
  3.     {
  4.       TBSYS_LOG(DEBUG,
  5.           "write data. blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, lease: %d",
  6.           write_info.block_id_, write_info.file_id_, write_info.file_number_, lease_id);
  7.       //if the first fragment, check version
  8.       //这里有个疑问,为什么第一个fragment才做check version????
  9.       if (== write_info.offset_)
  10.       {
  11.         LogicBlock* logic_block = BlockFileManager::get_instance()->get_logic_block(write_info.block_id_);
  12.         if (NULL == logic_block)
  13.         {
  14.           TBSYS_LOG(ERROR, "blockid: %u is not exist.", write_info.block_id_);
  15.           return EXIT_NO_LOGICBLOCK_ERROR;
  16.         }
         //在这里,使用版本号来做数据一致性检查
  1.         int ret = logic_block->check_block_version(version, repair);
  2.         //如果版本号错误,这里直接返回了,不做后续操作
  3.         if (TFS_SUCCESS != ret)
  4.         {
  5.           TBSYS_LOG(DEBUG, "check_block_version error. blockid: %u, ret: %d", write_info.block_id_, ret);
  6.           return ret;
  7.         }
  8.       }
       ......其实正常处理流程,跳过
  1.     }
上面函数调用的check_block_version()函数
使用版本号进行数据一致性检查,如果DS数据不一致,设置repair标志,返回给NS,要求其做数据修复工作
  1. int LogicBlock::check_block_version(int32_t& remote_version, UpdateBlockType& repair)
  2.     {
  3.       ScopedRWLock scoped_lock(rw_lock_, WRITE_LOCKER);
  4.       int ret = index_handle_->check_block_version(remote_version);
  5.       //版本错误信息分为EXIT_BLOCK_NS_VERSION_ERROR和EXIT_BLOCK_DS_VERSION_ERROR
  6.       if (EXIT_BLOCK_DS_VERSION_ERROR == ret)
  7.       {
  8.         //这个REPAIR标记难道是标志数据不一致的,需要master发起数据修复
  9.         repair = UPDATE_BLOCK_REPAIR;
  10.       }
  11.       return ret;
  12.     }
这里是真正的版本号检查过程
(有些问题尚待明确:
 1. 为什么要将BLOCK_VERSION_MAGIC_NUM设置为2,这是出于什么考虑?
 2. 对于DS下线、上线出现的数据不一致,个人理解是只有可能出现NS上的版本号小于DS的版本号,为什么这里会处理NS版本号大于DS版本号的情况?这是怎么考虑的)
  1. int IndexHandle::check_block_version(int32_t& remote_version)
  2.     {
  3.       TBSYS_LOG(DEBUG, "block version. blockid: %u, remote version: %u, local version: %u", block_info()->block_id_,
  4.           remote_version, block_info()->version_);
  5.       if (remote_version != block_info()->version_)
  6.       {
  7.         //we assume that the difference between the version number greater than BLOCK_VERSION_MAGIC_NUM is illegal
  8.         //为什么要讲BLOCK_VERSION_MAGIC_NUM设置为2?
  9.         //这里的remote_version即是NS上保存的version_id
  10.         if (remote_version > block_info()->version_ + BLOCK_VERSION_MAGIC_NUM)
  11.         {
  12.           TBSYS_LOG(ERROR, "block version error. blockid: %u, remote version: %d, local version: %d",
  13.               block_info()->block_id_, remote_version, block_info()->version_);
  14.           return EXIT_BLOCK_DS_VERSION_ERROR;
  15.         }
  16.         else if (remote_version < block_info()->version_ - BLOCK_VERSION_MAGIC_NUM)
  17.         {
  18.           TBSYS_LOG(ERROR, "block version error. blockid: %u, remote version: %d, local version: %d",
  19.               block_info()->block_id_, remote_version, block_info()->version_);
  20.           return EXIT_BLOCK_NS_VERSION_ERROR;
  21.         }

  22.         //if local version less than ns version, and the difference is little than 2, replace the local version with ns version
  23.         //只有大小不超过2才不会出错,这是怎样设计的????
  24.         //远端version大于datanode上的block的version,说明datanode上的这个block是旧数据
  25.         //为什么出现NS的versino_id大于DS上的version_id的情况下,是返回TFS_SUCCESS?而不是一种错误情况
  26.         if (remote_version > block_info()->version_ && remote_version <= (block_info()->version_
  27.             + BLOCK_VERSION_MAGIC_NUM))
  28.         {
  29.           TBSYS_LOG(ERROR,
  30.               "remote version is larger, set block version. blockid: %u, remote version: %u, local version: %u",
  31.               block_info()->block_id_, remote_version, block_info()->version_);
  32.          //只更新版本号,在哪里做数据更新呢?难道不做数据更新?
  33.           block_info()->version_ = remote_version;
  34.         }
  35.         else
  36.         {
  37.           TBSYS_LOG(ERROR,
  38.               "block version is larger, set remote version. blockid: %u, remote version: %u, local version: %u",
  39.               block_info()->block_id_, remote_version, block_info()->version_);
  40.           remote_version = block_info()->version_;
  41.         }
  42.       }

  43.       return TFS_SUCCESS;
  44.     }
下面看ds_requester_.req_update_block_info(),在检测到数据不一致之后的处理
主要是发送UpdateBlockInfoMessage给NS,让NS发起修复操作
  1. int Requester::req_update_block_info(const uint32_t block_id, const UpdateBlockType repair)
  2.     {
  3.       UpdateBlockType tmp_repair = repair; 
  4.       BlockInfo* blk = NULL;
  5.       if (UPDATE_BLOCK_MISSING != tmp_repair)
  6.       {
  7.         int32_t visit_count = 0;
  8.         int ret = data_management_->get_block_info(block_id, blk, visit_count);
  9.         if (EXIT_NO_LOGICBLOCK_ERROR == ret)
  10.         {
  11.           tmp_repair = UPDATE_BLOCK_MISSING;
  12.         }
  13.         else
  14.         {
  15.           if (NULL == blk)
  16.           {
  17.             TBSYS_LOG(ERROR, "blockid: %u can not find block info.", block_id);
  18.             tmp_repair = UPDATE_BLOCK_REPAIR;
  19.           }
  20.           else
  21.           {
  22.             TBSYS_LOG(
  23.                 INFO,
  24.                 "req update block info, blockid: %u, version: %d, file count: %d, size: %d, delfile count: %d, del_size: %d, seqno: %d\n",
  25.                 blk->block_id_, blk->version_, blk->file_count_, blk->size_, blk->del_file_count_, blk->del_size_, blk->seq_no_);
  26.           }
  27.         }
  28.       }

  29.       int ret = TFS_ERROR;
  30.       UpdateBlockInfoMessage ub_msg;
  31.       ub_msg.set_block_id(block_id);
  32.       ub_msg.set_block(blk);
  33.       ub_msg.set_server_id(dataserver_id_);
  34.       ub_msg.set_repair(tmp_repair);
  35.       Message* return_msg = NULL;
  36.      //发消息给NS,要求其发起数据不一致的修复操作
  37.      //需要查看NS如何处理UpdateBlockInfoMessage消息
  38.       ret = send_message_to_server(ns_ip_port_, &ub_msg, &return_msg);
  39.       if (TFS_SUCCESS != ret || !return_msg)
  40.       {
  41.         return ret;
  42.       }
  43.       int need_expire = 0;
  44.       if (NULL != return_msg)
  45.       {
  46.         StatusMessage* sm = dynamic_cast<StatusMessage*>(return_msg);
  47.         if (STATUS_MESSAGE_OK == sm->get_status())
  48.         {
  49.           ret = TFS_SUCCESS;
  50.         }
  51.         else if (STATUS_MESSAGE_REMOVE == sm->get_status())
  52.         {
  53.           need_expire = 1;
  54.           ret = TFS_SUCCESS;
  55.         }
  56.         else
  57.         {
  58.           TBSYS_LOG(ERROR, "req update block info: %s, id: %u, tmp_repair: %d\n", sm->get_error(), block_id, tmp_repair);
  59.         }
  60.         tbsys::gDelete(return_msg); 
  61.       }

  62.       //把过期数据删除
  63.       if (need_expire)
  64.       {
  65.         data_management_->del_single_block(block_id);
  66.       }

  67.       return ret;
  68.     }
现在看NameServer端对数据修复指令的操作流程,调用update_block_info()函数
从ds_list中选择一个ds来执行数据块的复制操作
  1. int NameServer::update_block_info(Message* msg)
  2.   {
  3.     UpdateBlockInfoMessage* message = dynamic_cast<UpdateBlockInfoMessage*> (msg);
  4.     uint32_t block_id = message->get_block_id();
  5.     BlockInfo* block_info = message->get_block();
  6.     uint64_t dest_ds_id = message->get_server_id();
  7.     int32_t repair_flag = message->get_repair();

  8.     if (block_id == 0)
  9.     {
  10.       TBSYS_LOG(WARN, "block(%u) not found", block_id);
  11.       return EXIT_BLOCK_NOT_FOUND;
  12.     }

  13.     LayoutManager& block_ds_map = meta_mgr_.get_block_ds_mgr();
  14.     if (repair_flag == UPDATE_BLOCK_NORMAL)
  15.     {
  16.         ......
  17.     }

  18.     BlockChunkPtr ptr = block_ds_map.get_block_chunk(block_id);
  19.     ptr->mutex_.rdlock();

  20.     BlockCollect* block_collect = ptr->find(block_id);
  21.     if (block_collect == NULL)
  22.     {
  23.       ptr->mutex_.unlock();
  24.       MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_BLOCK_NOT_FOUND,
  25.           ns_global_info_.owner_ip_port_, "repair block, block collect not found by block id(%u)", block_id);
  26.       return TFS_SUCCESS;
  27.     }

  28.     // need repair this block;
  29.     //开始执行dest_ds_id上的block_id数据块的数据修复
  30.     VUINT64 ds_list = *(block_collect->get_ds());
  31.     ptr->mutex_.unlock();
  32.     if ((repair_flag == UPDATE_BLOCK_MISSING)
  33.         && (ds_list.size() >= static_cast<uint32_t> (SYSPARAM_NAMESERVER.min_replication_)))
  34.     {
  35.       MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_BLOCK_NOT_FOUND,
  36.           ns_global_info_.owner_ip_port_, "already got block(%u) replica(%u)", block_id, ds_list.size());
  37.       return TFS_SUCCESS;
  38.     }

  39.     if (ds_list.size() == 0)
  40.     {
  41.       MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_NO_DATASERVER,
  42.           ns_global_info_.owner_ip_port_, "repair block(%u) no any dataserver hold it", block_id);
  43.       return TFS_SUCCESS;
  44.     }

  45.     //从ds_list中找到第一个不是待修复的block_id的ds节点
  46.     uint64_t source_ds_id = 0;
  47.     for (uint32_t i = 0; i < ds_list.size(); ++i)
  48.     {
  49.       if (ds_list.at(i) != dest_ds_id)
  50.       {
  51.         source_ds_id = ds_list.at(i);
  52.         break;
  53.       }
  54.     }

  55.     if (source_ds_id == 0)
  56.     {
  57.       MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_NO_DATASERVER,
  58.           ns_global_info_.owner_ip_port_, "repair block(%u) no any other dataserver(%u) hold a correct replica", block_id, ds_list.size());
  59.       return TFS_SUCCESS;
  60.     }

  61.     message->reply_message(new StatusMessage(STATUS_MESSAGE_REMOVE));
  62.     //取消这次写操作的lease(也就是放弃写操作?)
  63.     meta_mgr_.get_lease_clerk().cancel_lease(block_id);
  64.     //取消block_id到dest_ds_id的映射关系
  65.     block_ds_map.release_ds_relation(block_id, dest_ds_id);
  66.     //向数据块复制线程发送复制指令
  67.     //给source_ds_id发送ReplicateBlockMessage消息,从source_ds_id复制数据块到dest_ds_id
  68.     return replicate_thread_.get_executor().send_replicate_cmd(source_ds_id, dest_ds_id, block_id, REPLICATE_BLOCK_MOVE_FLAG_NO);
  69.   }
然后看replicate_thread_.get_executor().send_replicate_cmd(),这个函数的功能是:
向source_ds_id发送一个复制指令,要求它复制数据块到dest_ds_id
  1. int ReplicateExecutor::send_replicate_cmd(const uint64_t source_id, const uint64_t destination_id,
  2.         const uint32_t block_id, int32_t is_move)
  3.     {
  4.       ReplicateBlockMessage dsmessage;
  5.       ReplBlock *replb = new ReplBlock();
  6.       replb->block_id_ = block_id;
  7.       replb->source_id_ = source_id;
  8.       replb->destination_id_ = destination_id;
  9.       replb->start_time_ = time(NULL);
  10.       replb->is_move_ = is_move;
  11.       BlockCollect *block_collect = meta_mgr_.get_block_ds_mgr().get_block_collect(block_id);
  12.       replb->server_count_ = block_collect->get_ds()->size();

  13.       dsmessage.set_repl_block(replb);
  14.       dsmessage.set_command(COMMAND_REPLICATE);
  15.       int iret = send_message_to_server(source_id, &dsmessage, NULL);
  16.       if (iret != TFS_SUCCESS)
  17.       {
  18.         tbsys::gDelete(replb);
  19.         TBSYS_LOG(ERROR, "send replicate command faild, block(%u), %s===>%s, is_move(%s)", block_id,
  20.             tbsys::CNetUtil::addrToString(source_id).c_str(), tbsys::CNetUtil::addrToString(destination_id).c_str(),
  21.             is_move == REPLICATE_BLOCK_MOVE_FLAG_NO ? "no" : "yes");
  22.         return iret;
  23.       }
  24.       mutex_.wrlock();
  25.       if (replicating_map_.find(block_id) == replicating_map_.end())
  26.       {
  27.         ReplicateStrategy::inc_ds_count(src_ds_counter_, source_id);
  28.         ReplicateStrategy::inc_ds_count(dest_ds_counter_, destination_id);
  29.         replicating_map_.insert(REPL_BLOCK_MAP::value_type(block_id, replb));
  30.         replb = NULL;
  31.       }
  32.       mutex_.unlock();
  33.       TBSYS_LOG(ERROR, "send replicate command successful, block(%u), %s===>%s, is_move(%s)", block_id,
  34.           tbsys::CNetUtil::addrToString(source_id).c_str(), tbsys::CNetUtil::addrToString(destination_id).c_str(),
  35.           is_move == REPLICATE_BLOCK_MOVE_FLAG_NO ? "no" : "yes");
  36.       tbsys::gDelete(replb);
  37.       return iret;
  38.     }







阅读(4380) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~