Chinaunix首页 | 论坛 | 博客
  • 博客访问: 6276334
  • 博文数量: 2759
  • 博客积分: 1021
  • 博客等级: 中士
  • 技术积分: 4091
  • 用 户 组: 普通用户
  • 注册时间: 2012-03-11 14:14
文章分类

全部博文(2759)

文章存档

2019年(1)

2017年(84)

2016年(196)

2015年(204)

2014年(636)

2013年(1176)

2012年(463)

分类: 架构设计与优化

2013-10-08 00:32:34

我们接着分析,上一章节中讲到了,allocator将满足特定frameworkID的资源列表offerable,发送给了Master::offer()函数,我们就来看看offer函数做了什么工作:

点击(此处)折叠或打开

  1. void Master::offer(const FrameworkID& frameworkId,
  2.                    const hashmap<SlaveID, Resources>& resources)
  3. {
  4.   if (!frameworks.contains(frameworkId) || !frameworks[frameworkId]->active) {
  5.     LOG(WARNING) << "Master returning resources offered to framework "
  6.                  << frameworkId << " because the framework"
  7.                  << " has terminated or is inactive";

  8.     foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
  9.       dispatch(allocator, &Allocator::resourcesRecovered,
  10.                frameworkId, slaveId, offered);
  11.     }
  12.     return;
  13.   }

  14.   // Create an offer for each slave and add it to the message.
  15.   ResourceOffersMessage message;

  16.   Framework* framework = frameworks[frameworkId];
  17.   foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
  18.     if (!slaves.contains(slaveId)) {
  19.       LOG(WARNING) << "Master returning resources offered to framework "
  20.                    << frameworkId << " because slave " << slaveId
  21.                    << " is not valid";

  22.       dispatch(allocator, &Allocator::resourcesRecovered,
  23.                frameworkId, slaveId, offered);
  24.       continue;
  25.     }

  26.     Slave* slave = slaves[slaveId];

  27.     Offer* offer = new Offer();
  28.     offer->mutable_id()->MergeFrom(newOfferId());
  29.     offer->mutable_framework_id()->MergeFrom(framework->id);
  30.     offer->mutable_slave_id()->MergeFrom(slave->id);
  31.     offer->set_hostname(slave->info.hostname());
  32.     offer->mutable_resources()->MergeFrom(offered);
  33.     offer->mutable_attributes()->MergeFrom(slave->info.attributes());

  34.     // Add all framework's executors running on this slave.
  35.     if (slave->executors.contains(framework->id)) {
  36.       const hashmap<ExecutorID, ExecutorInfo>& executors =
  37.         slave->executors[framework->id];
  38.       foreachkey (const ExecutorID& executorId, executors) {
  39.         offer->add_executor_ids()->MergeFrom(executorId);
  40.       }
  41.     }

  42.     offers[offer->id()] = offer;

  43.     framework->addOffer(offer);
  44.     slave->addOffer(offer);

  45.     // Add the offer *AND* the corresponding slave's PID.
  46.     message.add_offers()->MergeFrom(*offer);
  47.     message.add_pids(slave->pid);
  48.   }

  49.   if (message.offers().size() == 0) {
  50.     return;
  51.   }

  52.   LOG(INFO) << "Sending " << message.offers().size()
  53.             << " offers to framework " << framework->id;

  54.   send(framework->pid, message);
  55. }
可以看到代码不是很长。
这里用到的数据结构:
hashmap frameworks//这里存放了所有的接入Mesos master的计算框架的信息

代码一开始首先判断一下,由allocator发送过来的frameworkID是否存在于frameworks之中,并且该计算框架现在是否是活跃的。否则此次分配是失败的,通知allocator将分配的资源收回,同时更新allocated、allocatable两张表的信息。

接下来程序对于每一个待分配的slave节点,创建了一个offer对象:
该对象中包含了:offerID、frameworkid、slaveid、slave所在主机的主机名以及其他的信息。
将这些offer的信息放入master节点的hashmapoffers之中
同时通知当事人framework和slave,做相应的操作。

接下来启动对应于该frameworkid的所有的执行器excutor部件。

创建了一条ResourceOffersMessage消息,消息中包含了offers和所有slave进程的pid,将该消息发送给framework对应的pid,即SchedulerProcess的pid

调用了SchedulerProcess的消息处理函数:resourceOffers

点击(此处)折叠或打开

  1. void resourceOffers(const vector<Offer>& offers,
  2.                       const vector<string>& pids)
  3.   {
  4.     if (aborted) {
  5.       VLOG(1) << "Ignoring resource offers message because "
  6.               << "the driver is aborted!";
  7.       return;
  8.     }

  9.     VLOG(1) << "Received " << offers.size() << " offers";

  10.     CHECK(offers.size() == pids.size());

  11.     // Save the pid associated with each slave (one per offer) so
  12.     // later we can send framework messages directly.
  13.     for (int i = 0; i < offers.size(); i++) {
  14.       UPID pid(pids[i]);
  15.       // Check if parse failed (e.g., due to DNS).
  16.       if (pid != UPID()) {
  17.         VLOG(2) << "Saving PID '" << pids[i] << "'";
  18.         savedOffers[offers[i].id()][offers[i].slave_id()] = pid;
  19.       } else {
  20.         VLOG(2) << "Failed to parse PID '" << pids[i] << "'";
  21.       }
  22.     }

  23.     scheduler->resourceOffers(driver, offers);
  24.   }
程序的一开始判断是否终止,这就体现了我们之前讲的MesosSchedulerDriver对于SchedulerProcess的控制作用,它可以通过设置这些状态信息,来控制SchedulerProcess的生命周期。

判断一下offers和slaves节点的pid是否一致,调度程序记录每个节点的pid信息,以便于以后直接与其进行通信。

非常关键的一步:    scheduler->resourceOffers(driver, offers);此处调用了我们用来实例化SchedulerProcess的frameworkScheduler对象的resourceOffers方法,此时我们就进入我们自己定义的框架调度器程序hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java

点击(此处)折叠或打开

  1. public void resourceOffers(SchedulerDriver d, List<Offer> offers) {
  2.     try {
  3.       synchronized(jobTracker) {

  4.         int numOffers = (int) offers.size();
  5.         double[] cpus = new double[numOffers];
  6.         double[] mem = new double[numOffers];

  7.         // Count up the amount of free CPUs and memory on each node
  8.         for (int i = 0; i < numOffers; i++) {
  9.           Offer offer = offers.get(i);
  10.           LOG.info("Got resource offer " + offer.getId());
  11.           cpus[i] = getResource(offer, "cpus");
  12.           mem[i] = getResource(offer, "mem");
  13.         }

  14.         // Assign tasks to the nodes in a round-robin manner, and stop when we
  15.         // are unable to assign a task to any node.
  16.         // We do this by keeping a linked list of indices of nodes for which
  17.         // we are still considering assigning tasks. Whenever we can't find a
  18.         // new task for a node, we remove it from the list. When the list is
  19.         // empty, no further assignments can be made. This algorithm was chosen
  20.         // because it minimizing the amount of scanning we need to do if we
  21.         // get a large set of offered nodes.
  22.         List<Integer> indices = new LinkedList<Integer>();
  23.         List<List<TaskInfo>> replies =
  24.             new ArrayList<List<TaskInfo>>(numOffers);
  25.         for (int i = 0; i < numOffers; i++) {
  26.           indices.add(i);
  27.           replies.add(new ArrayList<TaskInfo>());
  28.         }
  29.         while (indices.size() > 0) {
  30.           for (Iterator<Integer> it = indices.iterator(); it.hasNext();) {
  31.             int i = it.next();
  32.             Offer offer = offers.get(i);
  33.             TaskInfo task = findTask(
  34.                 offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i]);
  35.             if (task != null) {
  36.               cpus[i] -= getResource(task, "cpus");
  37.               mem[i] -= getResource(task, "mem");
  38.               replies.get(i).add(task);
  39.             } else {
  40.               it.remove();
  41.             }
  42.           }
  43.         }

  44.         for (int i = 0; i < numOffers; i++) {
  45.           OfferID offerId = offers.get(i).getId();
  46.           Status status = d.launchTasks(offerId, replies.get(i));
  47.           if (status != Status.DRIVER_RUNNING) {
  48.             LOG.warn("SchedulerDriver returned irregular status: " + status);
  49.           }
  50.         }
  51.       }
  52.     } catch(Exception e) {
  53.       LOG.error("Error in resourceOffer", e);
  54.     }
  55.   }
我们之前所讲到的分配,从真正意义上来说都是一些逻辑上的分配,实际的分配都是在我们自己定义的FrameworkScheduler跟jobTracker的交互中完成的。下一章节我们会具体分析,task分配过程。

在这之前我们有必要总结一下,一个计算框架如何跟Mesos去交互的,我们在看得时候,可能会有这样的困惑,一个任务怎么就能够通过Mesos申请到资源的?

(1)首先我们实现了一个基于Hadoop这个计算框架的FrameworkScheduler.java这个类

(2)修改了Hadoop的配置文件,把taskScheduler属性值换成我们自己编译好的java对象

(3)通过读取配置文件的信息,用我们自己对象MesosScheduler实例化一个任务的调度器

(4)在MesosScheduler对象之中,FrameworkScheduler和Mesos Master的地址为参数创建了一个FrameworkSchedulerDriver对象

(5)调用driver的start方法,以FrameworkScheduler和Master url为参数创建了SchedulerProcess进程,同时以这个进程的pid和Master url为参数创建了一个MasterDetector对象,用这个对象来监控Master的状态变化,并且能够即时的报告给SchudulerProcess这个进程。

(6)有MasterDetector对象发送一个NewMasterDetectedMessage给SchedulerProcess进程,驱动了该进程向Master,发送了一条RegisterFrameworkMessage

(7)Master接受到了该RegisterFrameworkMessage之后,为该框架产生一个ID,并且将框架的信息(最主要的就是框架的调度进程SchedulerProcess的pid)放入了自己的hashmapframeworks之中,并且通知allocator给这个框架分配资源。

(8)此时有一点大家要清楚,这个时候mesos并不知道framework需要的资源情况,仅仅是通过framework自己的filters去过滤掉不符合要求的slave节点

(9)然后allocator将frameworkID和符合该framework要求的slaveIDs,一起发送给了Master节点,这样就找了计算框架和资源的之间的对应关系。

(10)master节点将对应这些slaveIDs的这些slave节点信息(包括slave节点pid、hostname、分配给的frameworkid)等信息打包成一个数据结构Offers,并且将这些东西合并到一个ResourceOffersMessage消息之中,发送给framework对应的shedulerProcess进程。

(11)schedulerProcess进程的resourceOffers方法之中调用了frameworkScheduler的resourceOffers方法,这样我们就通过mesos将承载这资源的slave节点的信息以offers对象的形式,发送给了我们自己实现的调度程序FrameworkScheduler,不过即便是到了这个时候,我们还是不知道Framework所需要的资源量,以及每个task任务需要的资源量,所有的这些东西,都在后续的章节之中。
阅读(674) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~