int DataService::close_write_file(CloseFileMessage* message)
{
CloseFileInfo close_file_info = message->get_close_file_info();
int32_t lease_id = message->get_lease_id();
uint64_t peer_id = message->get_connection()->getPeerId();
TBSYS_LOG(
DEBUG,
"close write file, blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, leaseid: %u, from: %s\n",
close_file_info.block_id_, close_file_info.file_id_, close_file_info.file_number_, lease_id,
tbsys::CNetUtil::addrToString(peer_id).c_str());
int32_t write_file_size = 0; //这个过程比较复杂,大致是检查lease是否过期、从临时文件或临时内容中读出数据,写入到真正的block位置上
int ret = data_management_.close_write_file(close_file_info, write_file_size);
if (TFS_SUCCESS != ret)
{
if (EXIT_DATAFILE_EXPIRE_ERROR == ret)
{
return MessageFactory::send_error_message(
message,
TBSYS_LOG_LEVEL(ERROR),
data_server_info_.id_,
"datafile is null(maybe expired). blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, ret: %d",
close_file_info.block_id_, close_file_info.file_id_, close_file_info.file_number_, ret);
}
else if (EXIT_NO_LOGICBLOCK_ERROR == ret)
{
return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_,
"close write file failed. block is not exist. blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d",
close_file_info.block_id_, close_file_info.file_id_, ret);
}
else if (TFS_SUCCESS != ret)
{
try_add_repair_task(close_file_info.block_id_, ret);
if (CLOSE_FILE_SLAVER != close_file_info.mode_)
{
ds_requester_.req_block_write_complete(close_file_info.block_id_, lease_id, ret);
}
return MessageFactory::send_error_message(
message,
TBSYS_LOG_LEVEL(ERROR),
data_server_info_.id_,
"close write file error. blockid: %u, fileid : %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u. ret: %d",
close_file_info.block_id_, close_file_info.file_id_, close_file_info.file_number_, ret);
}
}
BlockInfo* blk = NULL;
int32_t visit_count = 0;
ret = data_management_.get_block_info(close_file_info.block_id_, blk, visit_count);
if (TFS_SUCCESS != ret)
{
return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_,
"close write file failed. block is not exist. blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d",
close_file_info.block_id_, close_file_info.file_id_, ret);
}
//if it is master DS. Send to other slave ds //primaryDS将关闭文件的消息发送给其他非primaryDS,执行前面的数据写入操作
if (CLOSE_FILE_SLAVER != close_file_info.mode_)
{
do_stat(peer_id, write_file_size, write_file_size, 0, AccessStat::WRITE_BYTES);
message->set_mode(CLOSE_FILE_SLAVER);
message->set_block(blk);
//这个是异步通信,使用条件等待(cond_.wait())直到收到所有DS发送回来的消息才返回
int send_ret = send_message_to_slave_ds(message, message->get_ds_list());
if (TFS_SUCCESS != send_ret)
{
// other ds failed, release lease
ds_requester_.req_block_write_complete(close_file_info.block_id_, lease_id, TFS_ERROR);
return MessageFactory::send_error_message(message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_,
"close write file to other dataserver fail, blockid: %u, fileid: %" PRI64_PREFIX "u, send_ret: %d",
close_file_info.block_id_, close_file_info.file_id_, send_ret);
}
else
{
//commit
//提交writeCommit消息给NS。NS更新相关元数据信息,回确认消息给primaryDS
//再由primaryDS回消息给client,整个写流程结束 //req_block_write_complete给NS发送wirteCommmit消息
int ret_code = ds_requester_.req_block_write_complete(close_file_info.block_id_, lease_id, TFS_SUCCESS);
if (TFS_SUCCESS == ret_code)
{
//sync to mirror
int option_flag = message->get_option_flag();
if (0 == (option_flag & TFS_FILE_NO_SYNC_LOG))
{
TBSYS_LOG(INFO, " write sync log, blockid: %u, fileid: %" PRI64_PREFIX "u", close_file_info.block_id_,
close_file_info.file_id_); //为什么在写完数据之后才写日志???
ret_code = sync_mirror_->write_sync_log(OPLOG_INSERT, close_file_info.block_id_,
close_file_info.file_id_);
}
}
//primaryDS给client回写入成功消息
if (TFS_SUCCESS == ret_code)
{
message->reply_message(new StatusMessage(STATUS_MESSAGE_OK));
TBSYS_LOG(INFO, "write successful. blockid: %u, fileid: %" PRI64_PREFIX "u\n", close_file_info.block_id_,
close_file_info.file_id_);
}
else
{
TBSYS_LOG(ERROR,
"rep block write complete or write sync log fail, blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d",
close_file_info.block_id_, close_file_info.file_id_, ret_code);
message->reply_message(new StatusMessage(STATUS_MESSAGE_ERROR));
}
}
} //非primaryDS给primaryDS发送写入操作成功的消息
else
{
TBSYS_LOG(INFO, "slave write successful. blockid: %u, fileid: %" PRI64_PREFIX "u\n", close_file_info.block_id_,
close_file_info.file_id_);
//slave will save seqno to prevent from the conflict when this block change to master block
BlockInfo* copyblk = message->get_block();
if (NULL != copyblk)
{
blk->seq_no_ = copyblk->seq_no_;
}
message->reply_message(new StatusMessage(STATUS_MESSAGE_OK));
}
return TFS_SUCCESS;
}
|