我们接着分析,上一章节中讲到了,allocator将满足特定frameworkID的资源列表offerable,发送给了Master::offer()函数,我们就来看看offer函数做了什么工作:
-
void Master::offer(const FrameworkID& frameworkId,
-
const hashmap<SlaveID, Resources>& resources)
-
{
-
if (!frameworks.contains(frameworkId) || !frameworks[frameworkId]->active) {
-
LOG(WARNING) << "Master returning resources offered to framework "
-
<< frameworkId << " because the framework"
-
<< " has terminated or is inactive";
-
-
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
-
dispatch(allocator, &Allocator::resourcesRecovered,
-
frameworkId, slaveId, offered);
-
}
-
return;
-
}
-
-
// Create an offer for each slave and add it to the message.
-
ResourceOffersMessage message;
-
-
Framework* framework = frameworks[frameworkId];
-
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
-
if (!slaves.contains(slaveId)) {
-
LOG(WARNING) << "Master returning resources offered to framework "
-
<< frameworkId << " because slave " << slaveId
-
<< " is not valid";
-
-
dispatch(allocator, &Allocator::resourcesRecovered,
-
frameworkId, slaveId, offered);
-
continue;
-
}
-
-
Slave* slave = slaves[slaveId];
-
-
Offer* offer = new Offer();
-
offer->mutable_id()->MergeFrom(newOfferId());
-
offer->mutable_framework_id()->MergeFrom(framework->id);
-
offer->mutable_slave_id()->MergeFrom(slave->id);
-
offer->set_hostname(slave->info.hostname());
-
offer->mutable_resources()->MergeFrom(offered);
-
offer->mutable_attributes()->MergeFrom(slave->info.attributes());
-
-
// Add all framework's executors running on this slave.
-
if (slave->executors.contains(framework->id)) {
-
const hashmap<ExecutorID, ExecutorInfo>& executors =
-
slave->executors[framework->id];
-
foreachkey (const ExecutorID& executorId, executors) {
-
offer->add_executor_ids()->MergeFrom(executorId);
-
}
-
}
-
-
offers[offer->id()] = offer;
-
-
framework->addOffer(offer);
-
slave->addOffer(offer);
-
-
// Add the offer *AND* the corresponding slave's PID.
-
message.add_offers()->MergeFrom(*offer);
-
message.add_pids(slave->pid);
-
}
-
-
if (message.offers().size() == 0) {
-
return;
-
}
-
-
LOG(INFO) << "Sending " << message.offers().size()
-
<< " offers to framework " << framework->id;
-
-
send(framework->pid, message);
-
}
可以看到代码不是很长。
这里用到的数据结构:
hashmap
frameworks//这里存放了所有的接入Mesos master的计算框架的信息
代码一开始首先判断一下,由allocator发送过来的frameworkID是否存在于frameworks之中,并且该计算框架现在是否是活跃的。否则此次分配是失败的,通知allocator将分配的资源收回,同时更新allocated、allocatable两张表的信息。
接下来程序对于每一个待分配的slave节点,创建了一个offer对象:
该对象中包含了:offerID、frameworkid、slaveid、slave所在主机的主机名以及其他的信息。
将这些offer的信息放入master节点的hashmapoffers之中
同时通知当事人framework和slave,做相应的操作。
接下来启动对应于该frameworkid的所有的执行器excutor部件。
创建了一条ResourceOffersMessage消息,消息中包含了offers和所有slave进程的pid,将该消息发送给framework对应的pid,即SchedulerProcess的pid
调用了SchedulerProcess的消息处理函数:resourceOffers
-
void resourceOffers(const vector<Offer>& offers,
-
const vector<string>& pids)
-
{
-
if (aborted) {
-
VLOG(1) << "Ignoring resource offers message because "
-
<< "the driver is aborted!";
-
return;
-
}
-
-
VLOG(1) << "Received " << offers.size() << " offers";
-
-
CHECK(offers.size() == pids.size());
-
-
// Save the pid associated with each slave (one per offer) so
-
// later we can send framework messages directly.
-
for (int i = 0; i < offers.size(); i++) {
-
UPID pid(pids[i]);
-
// Check if parse failed (e.g., due to DNS).
-
if (pid != UPID()) {
-
VLOG(2) << "Saving PID '" << pids[i] << "'";
-
savedOffers[offers[i].id()][offers[i].slave_id()] = pid;
-
} else {
-
VLOG(2) << "Failed to parse PID '" << pids[i] << "'";
-
}
-
}
-
-
scheduler->resourceOffers(driver, offers);
-
}
程序的一开始判断是否终止,这就体现了我们之前讲的MesosSchedulerDriver对于SchedulerProcess的控制作用,它可以通过设置这些状态信息,来控制SchedulerProcess的生命周期。
判断一下offers和slaves节点的pid是否一致,调度程序记录每个节点的pid信息,以便于以后直接与其进行通信。
非常关键的一步: scheduler->resourceOffers(driver, offers);此处调用了我们用来实例化SchedulerProcess的frameworkScheduler对象的resourceOffers方法,此时我们就进入我们自己定义的框架调度器程序hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
-
public void resourceOffers(SchedulerDriver d, List<Offer> offers) {
-
try {
-
synchronized(jobTracker) {
-
-
int numOffers = (int) offers.size();
-
double[] cpus = new double[numOffers];
-
double[] mem = new double[numOffers];
-
-
// Count up the amount of free CPUs and memory on each node
-
for (int i = 0; i < numOffers; i++) {
-
Offer offer = offers.get(i);
-
LOG.info("Got resource offer " + offer.getId());
-
cpus[i] = getResource(offer, "cpus");
-
mem[i] = getResource(offer, "mem");
-
}
-
-
// Assign tasks to the nodes in a round-robin manner, and stop when we
-
// are unable to assign a task to any node.
-
// We do this by keeping a linked list of indices of nodes for which
-
// we are still considering assigning tasks. Whenever we can't find a
-
// new task for a node, we remove it from the list. When the list is
-
// empty, no further assignments can be made. This algorithm was chosen
-
// because it minimizing the amount of scanning we need to do if we
-
// get a large set of offered nodes.
-
List<Integer> indices = new LinkedList<Integer>();
-
List<List<TaskInfo>> replies =
-
new ArrayList<List<TaskInfo>>(numOffers);
-
for (int i = 0; i < numOffers; i++) {
-
indices.add(i);
-
replies.add(new ArrayList<TaskInfo>());
-
}
-
while (indices.size() > 0) {
-
for (Iterator<Integer> it = indices.iterator(); it.hasNext();) {
-
int i = it.next();
-
Offer offer = offers.get(i);
-
TaskInfo task = findTask(
-
offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i]);
-
if (task != null) {
-
cpus[i] -= getResource(task, "cpus");
-
mem[i] -= getResource(task, "mem");
-
replies.get(i).add(task);
-
} else {
-
it.remove();
-
}
-
}
-
}
-
-
for (int i = 0; i < numOffers; i++) {
-
OfferID offerId = offers.get(i).getId();
-
Status status = d.launchTasks(offerId, replies.get(i));
-
if (status != Status.DRIVER_RUNNING) {
-
LOG.warn("SchedulerDriver returned irregular status: " + status);
-
}
-
}
-
}
-
} catch(Exception e) {
-
LOG.error("Error in resourceOffer", e);
-
}
-
}
我们之前所讲到的分配,从真正意义上来说都是一些逻辑上的分配,实际的分配都是在我们自己定义的FrameworkScheduler跟jobTracker的交互中完成的。下一章节我们会具体分析,task分配过程。
在这之前我们有必要总结一下,一个计算框架如何跟Mesos去交互的,我们在看得时候,可能会有这样的困惑,一个任务怎么就能够通过Mesos申请到资源的?
(1)首先我们实现了一个基于Hadoop这个计算框架的FrameworkScheduler.java这个类
(2)修改了Hadoop的配置文件,把taskScheduler属性值换成我们自己编译好的java对象
(3)通过读取配置文件的信息,用我们自己对象MesosScheduler实例化一个任务的调度器
(4)在MesosScheduler对象之中,FrameworkScheduler和Mesos Master的地址为参数创建了一个FrameworkSchedulerDriver对象
(5)调用driver的start方法,以FrameworkScheduler和Master url为参数创建了SchedulerProcess进程,同时以这个进程的pid和Master url为参数创建了一个MasterDetector对象,用这个对象来监控Master的状态变化,并且能够即时的报告给SchudulerProcess这个进程。
(6)有MasterDetector对象发送一个NewMasterDetectedMessage给SchedulerProcess进程,驱动了该进程向Master,发送了一条RegisterFrameworkMessage
(7)Master接受到了该RegisterFrameworkMessage之后,为该框架产生一个ID,并且将框架的信息(最主要的就是框架的调度进程SchedulerProcess的pid)放入了自己的hashmapframeworks之中,并且通知allocator给这个框架分配资源。
(8)此时有一点大家要清楚,这个时候mesos并不知道framework需要的资源情况,仅仅是通过framework自己的filters去过滤掉不符合要求的slave节点
(9)然后allocator将frameworkID和符合该framework要求的slaveIDs,一起发送给了Master节点,这样就找了计算框架和资源的之间的对应关系。
(10)master节点将对应这些slaveIDs的这些slave节点信息(包括slave节点pid、hostname、分配给的frameworkid)等信息打包成一个数据结构Offers,并且将这些东西合并到一个ResourceOffersMessage消息之中,发送给framework对应的shedulerProcess进程。
(11)schedulerProcess进程的resourceOffers方法之中调用了frameworkScheduler的resourceOffers方法,这样我们就通过mesos将承载这资源的slave节点的信息以offers对象的形式,发送给了我们自己实现的调度程序FrameworkScheduler,不过即便是到了这个时候,我们还是不知道Framework所需要的资源量,以及每个task任务需要的资源量,所有的这些东西,都在后续的章节之中。
阅读(4345) | 评论(1) | 转发(2) |