Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2469033
  • 博文数量: 392
  • 博客积分: 7040
  • 博客等级: 少将
  • 技术积分: 4138
  • 用 户 组: 普通用户
  • 注册时间: 2009-06-17 13:03
个人简介

范德萨发而为

文章分类

全部博文(392)

文章存档

2017年(5)

2016年(19)

2015年(34)

2014年(14)

2013年(47)

2012年(40)

2011年(51)

2010年(137)

2009年(45)

分类: 数据库开发技术

2011-01-01 14:03:25

T   TFS写操作数据流

TFS系统中,nameserver会保证一个文件有多个副本存储于不同的dataserver上以保证冗余。当由于dataserver服务器宕机或由于其他原因退出系统导致某些文件副本数量下降时,nameserver将会调度新的dataserver节点存储文件备份。同样为了保证数据一致性,当写入一个文件时,只有所有参与的dataserver均写入成功时,该操作才算成功。TFS的写操作数据流图如下所示:


客户端首先向nameserver发起写请求,nameserver需要根据dataserver上的可写块,容量和负载加权平均来选择一个可写的block。并且在该block所在的多个dataserver中选择一个作为写入的master,这个选择过程也需要根据dataserver的负载以及当前作为master的次数来计算,使得每个dataserver作为master的机会均等。master一段选定,除非master宕机,不会更换,一旦master宕机,需要在剩余的dataserver中选择新的master。返回一个dataserver列表。
客户端向master dataserver开始数据写入操作。master server将数据传输到其他的dataserver节点,只有当所有dataserver节点写入均成功时,master server才会向nameserver和客户端返回操作成功的信息。

以上摘自

从上图可以大致了解TFS的写入过程,和GFS的写入过程很相似,最大的不同在于:

TFS在推送数据的时候是有Primary(图中的Master DS)来完成向其他所有副本的数据推送,而在GFS中,这个过程中存在由非Primary DS向另一个非Primary DS的数据推送,也就是在GFS中,减小了作为Primary的DS向外推送数据的压力。

TFS写操作代码分析

1. 请求写文件

    由client向NameServer发起写操作。文件名的解析过程是在client端完成的,通过文件名可以得到block_id和file_id两个信息,将此信息发送给NameServer进行处理。NameServer中存储有block_id到ds_list的映射关系。对写block的操作,NS获取一个新的可写块对应的ds信息返回给client,由client去和DS中被选举为primary的那个DS去通信,进行数据写入。

P.S ds_id其实就是一个ip+port的东西,client拿到这个东西就知道该和谁去进行通信了。

2. NameServer找出一个可写block_id和server_id

   a. NameServer是如何获取Primary block的?

    NameServer中会运行很多循环执行任务的线程,其中一个就是do_check_ds(),代码很简单,不断执行check_ds()函数

/*
   * Thread Function do_check_ds
   * check DataServerStatInfo heart beat message, see if lost connection.
   * check DataServerStatInfo primary write blocks, add some new block if lack.
   */

  int NameServer::do_check_ds()
  {
    Func::sleep(SYSPARAM_NAMESERVER.safe_mode_time_, reinterpret_cast<int32_t*> (&ns_global_info_.destroy_flag_));

    while (ns_global_info_.destroy_flag_ == NS_DESTROY_FLAGS_NO)
    {
      check_ds( time(NULL));
      Func::sleep(SYSPARAM_NAMESERVER.heart_interval_, reinterpret_cast<int32_t*> (&ns_global_info_.destroy_flag_));
    }
    return TFS_SUCCESS;
  }


check_ds()主要完成两个功能,检查dead DS和writabele DS。代码注释中主要介绍检查writable DS的相关代码

// check the dead ds list and the writable ds list
  int NameServer::check_ds(const time_t now)
  {
    VUINT64 dead_ds_list, write_ds_list;
    //default ds_dead_time=2
    int32_t expire_time = now - SYSPARAM_NAMESERVER.ds_dead_time_ * 4;
    // get the list of the dead ds and available ds.

    //在这个函数里,遍历所有DS,根据筛选条件(每个DS上作为写primary的block的数     //量不大于配置的max_write_filecount=5,选出可以作为可写的DS,放到           //write_ds_list里,后面还会从这个list中筛选出更合适的DS)

    meta_mgr_.get_block_ds_mgr().check_ds(SYSPARAM_NAMESERVER.ds_dead_time_, dead_ds_list, write_ds_list);

    // check the status of the dead ds
    // if the info is null or ds is still alive, then skip
    // else mark the ds and exclude it from the balance
    .....check dead ds, ignored


    if (write_ds_list.size() == 0)
    return TFS_SUCCESS;

    //随机选择几个可写DS,然后检查其是否有可以作为可写primary块
    // select a writable ds randomly, and check its available blocks which were used for write primary.
    srand(now);
    const uint32_t write_ds_list_size = write_ds_list.size();
    int32_t start_index = rand() % write_ds_list_size;
    for (uint32_t i = 0; i < write_ds_list_size; ++i)
    {
      ds_id = write_ds_list[start_index++ % write_ds_list_size];

      //这里是进一步筛选,从可写DS中选择可作为写时Primary的DS,放入类变量

      //_primary_writable_ds_list中,客户端在请求获取primaryDS时会从这里list中

      //获取
      meta_mgr_.check_primary_writable_block(ds_id, SYSPARAM_NAMESERVER.add_primary_block_count_, true);
      if ((i >= 0x0A) | (ns_global_info_.destroy_flag_ == NS_DESTROY_FLAGS_YES))
      break;
    }
    return TFS_SUCCESS;
  }


NameServer有一个main_task_queue,主要操作指令都会发送到这个FIFO队列中,每个指令是一个message,其中有一个类型为GET_BLOCK_INFO_MESSAGE的message,clinet是在调用tfs::tfs_open()的时候通过发送这个message向NameServer获取可写块以及primary块的信息
NameServer在接受到GET_BLOCK_INFO_MESSAGE的消息之后,调用下面的get_block_info()函数

int NameServer::get_block_info(Message *msg)
  {
    GetBlockInfoMessage* message = dynamic_cast<GetBlockInfoMessage*> (msg);
    if (meta_mgr_.get_block_ds_mgr().get_ds_size() <= 0)
    {
      return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), EXIT_NO_DATASERVER,
          ns_global_info_.owner_ip_port_, "not found dataserver, dataserver size equal 0");
    }

    SetBlockInfoMessage *result_msg = new SetBlockInfoMessage();
    uint32_t block_id = message->get_block_id();
    int32_t mode = message->get_mode();

    VUINT64 ds_list;
    uint32_t lease_id = 0;
    int32_t version = 0;
    int32_t ret = TFS_SUCCESS;

    //这里处理读数据操作,跟写操作无关,跳过
    if (mode & BLOCK_READ)
    {

         .....read data related
    }
    else
    {

      //作为备份的NameServer不处理这个命令
      if (ns_global_info_.owner_role_ == NS_ROLE_SLAVE)
      goto out;
      // BLOCK_WRITE | BLOCK_CREATE | BLOCK_NEWBLK | BLOCK_NOLEASE
      // check this block if doing any operations like replicating, moving, compacting...

      //BLOCK_NOLEASE应该是表示不需要lease,什么情况下会到这个流程呢?
      if (block_id != 0 && !(mode & BLOCK_NOLEASE))
      {
        if ((replicate_thread_.get_executor().is_replicating_block(block_id))
            || (compact_thread_.is_compacting_block(block_id)))
        {
          TBSYS_LOG(ERROR, "get block info block(%u), mode(%d) is busy", block_id ,mode);
          ret = EXIT_BLOCK_BUSY;
          goto out;
        }
      }
     //和写操作有关的代码在这里,获取可写块的过程,注意lease_id对于写数据来说

     //是很重要的,TFS中主要利用它来保证数据的一致性:也就是说,只要lease分配出

     //去,就可以保证对数据的写入是原子的,要么全写入,要么不写入任何数据

     //wirte_block_info分两种情况,写新块:获取一个待写入block_id;写旧块:获

     //取block_id相关信息。然后两个操作都需要获取lease(下面会对获取lease的过

     //程作简单介绍)

     //另外,注意此函数中version = block_collect->get_block_info()->version_;

     //这句话可以看出,每个block_id对应有一个version号,这也是保证数据一致性

     //的一个重要部分(version主要处理ds上线、下线导致的数据不一致问题)
      ret = meta_mgr_.write_block_info(block_id, mode, lease_id, version, ds_list);
      TBSYS_LOG(DEBUG, "get block info: block(%u) mode(%d) lease(%u), version(%d), dataserver size(%u), result(%d)",
          block_id, mode, lease_id, version, ds_list.size(), ret);
      if (ret == TFS_SUCCESS)

      //OK,设置write_block_info中获取的block_id,ds列表、block的版本号以及

      //lease_id,返回给client
      result_msg->set_write_block_ds(block_id, &ds_list, version, lease_id);
    }

    out:
    if (ret != TFS_SUCCESS)
    {
      tbsys::gDelete(result_msg);
      return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), ret, ns_global_info_.owner_ip_port_,
          "got error, when get block(%u) mode(%d), result(%d) information", block_id, mode, ret);
    }

    message->reply_message(result_msg);

    return TFS_SUCCESS;
  }


这个函数是在上面的write_block_info()中调用,用于申请lease的函数

uint32_t LeaseClerk::register_lease(const uint32_t block_id)
    {

      //管理所有lease的数据结果的全局锁,必须原子操作
      mutex_.lock();
      int64_t wait_count = wait_count_;

      //从这里可以看出,一个lease是对应一个block_id的,类似于GFS 中一个lease

      //对应一个64MB(通常情况)的chunk
      WriteLease* lease = get_lease(block_id);
      if (lease == NULL)
      {
        lease = new WriteLease();
        reset(*lease);

        //这里也可看出lease_map_作为NameServer中的全局数据结构,保存所有             //block_id->lease的映射关系

        lease_map_.insert(LEASE_MAP::value_type(block_id, lease));
        mutex_.unlock();
        return lease->lease_id_;
      }

      ....省略lease已存在的处理过程

    }


到此 ,上图中的第二步结束,NameServer在整个写操作的过程中的任务已完成了大半。

3. 接下来就是client和DS之间的数据操作,等到写完之后,被选举为primary的DS(或者是client,尚不明确)在收到所有数据写操作都已完成的消息后,向NameServer发送write_commit消息,由NameServer调整相关数据结构的映射关系。也就是上如中的step6

/**
   * a write operation completed, commit to nameserver and update block's verion
   */

   //NS接收到DS的write_complete消息,更新元数据相关信息

  int NameServer::write_complete(Message* msg)
  {
    BlockWriteCompleteMessage* message = dynamic_cast<BlockWriteCompleteMessage*> (msg);
    BlockInfo *blk = message->get_block();
    uint64_t ds_id = message->get_server_id();
    uint32_t lease_id = message->get_lease_id();
    WriteCompleteStatus status = message->get_success();
    UnlinkFlag unlink_flag = message->get_unlink_flag();

    TBSYS_LOG(DEBUG, "write commit: block(%u) dataserver(%s) version(%d), file_count(%d)"
        "size(%d) delfile_count(%d), del_size(%d), seq_no(%u), lease(%u), unlink(%s), status(%s)", blk->block_id_,
        tbsys::CNetUtil::addrToString(ds_id).c_str(), blk->version_, blk->file_count_, blk->size_,
        blk->del_file_count_, blk->del_size_, blk->seq_no_, lease_id,
        unlink_flag == UNLINK_FLAG_NO ? "no" : unlink_flag == UNLINK_FLAG_YES ? "yes" : "unknow",
        status == WRITE_COMPLETE_STATUS_YES ? "yes" : "no");

    //如果元数据write_commit的时候出现错误或者block is full,则该变量变为true
    bool need_add_new_block = false;
    std::string errmsg;

    //检查block的version版本号(新版本号至少不应该比老版本号小),更改lease状态,更新元数据相关映射关系

    //并且调用lease->monitor_->notifyAll(),通知其他在等待的想要获取该lease的线程
    int ret = meta_mgr_.write_commit(*blk, ds_id, lease_id, unlink_flag, status, need_add_new_block, errmsg);

    //给primaryDS回复写操作完成的消息
    message->reply_message(new StatusMessage(ret, const_cast<char*> (errmsg.c_str())));

    // add new block, when block filled complete
    if (need_add_new_block)
    {
      int need_add_new_block_count = meta_mgr_.check_primary_writable_block(ds_id, 1, true);
      TBSYS_LOG(INFO, "need add new block, count(%d)", need_add_new_block_count);
    }
    return ret;
  }



写操作中,NameServer的策略问题:
1. NameServer的get_block_info()函数中需要注册一个可用的lease_id来管理写入操作,调用下面的函数
   从代码中看出,对于多个客户端同时获取一个block的lease进行写操作的时候,后者会等待前面的使用者释放lease_id之后,再取得对该lease_id的使用权,也就是不支持多个client对同一个block进行并发的写操作

uint32_t LeaseClerk::register_lease(const uint32_t block_id)
    {
      mutex_.lock();
      int64_t wait_count = wait_count_;
      WriteLease* lease = get_lease(block_id);
      if (lease == NULL)
      {
        lease = new WriteLease();
        //获取全局+1的lease_id,设置lease使用时间

        reset(*lease);
        lease_map_.insert(LEASE_MAP::value_type(block_id, lease));
        mutex_.unlock();
        return lease->lease_id_;
      }

      mutex_.unlock();
      Monitor<Mutex>::Lock lock(lease->monitor_);
      lease->dump(block_id, is_valid_lease(lease), wait_count, SYSPARAM_NAMESERVER.max_wait_write_lease_);
      //lease已经过期,重新设置时间

     if (!is_valid_lease(lease))
      {
        reset(*lease);
        lease->monitor_.notifyAll();
        return lease->lease_id_;
      }

      //等待lease的线程太多的话,返回错误

      if (wait_count > SYSPARAM_NAMESERVER.max_wait_write_lease_)
      {
        TBSYS_LOG(WARN, "lease(%u), current wait thread(%"PRI64_PREFIX"d) beyond max_wait(%d)", lease->lease_id_, wait_count,
            SYSPARAM_NAMESERVER.max_wait_write_lease_);
        return WriteLease::INVALID_LEASE;
      }

      //到这里,说明lease正在使用中...等待lease,设置等待时间

      //从这里可以大致看出lease的使用策略,如果一个block的lease已经分配出去,那么

      //所有想要对这个block进行写操作的client都会在这里等待lease变为可用

      //这么说对一个block来说,同时只能有一个client进行写入操作咯????

      inc_wait_count();

      Time time_out = Time::milliSeconds(WriteLease::LEASE_EXPIRE_TIME);
     //这里应该是个等待的过程,不做任何操作

      bool ret = lease->monitor_.timedWait(time_out);

      dec_wait_count();

      TBSYS_LOG(DEBUG, "register_lease block(%u) wait end ret(%d), valid(%d)", block_id, ret, is_valid_lease(lease));

      if (!is_valid_lease(lease))
      {
        reset(*lease);
        lease->monitor_.notifyAll();
        return lease->lease_id_;
      }
      return WriteLease::INVALID_LEASE;
    }


阅读(4911) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~