分类: 架构设计与优化

2013-10-05




  1. public void resourceOffers(SchedulerDriver d, List<Offer> offers) {
  2.     try {
  3.       synchronized(jobTracker) {

  4.         int numOffers = (int) offers.size();
  5.         double[] cpus = new double[numOffers];
  6.         double[] mem = new double[numOffers];

  7.         // Count up the amount of free CPUs and memory on each node
  8.         for (int i = 0; i < numOffers; i++) {
  9.           Offer offer = offers.get(i);
  10. "Got resource offer " + offer.getId());
  11.           cpus[i] = getResource(offer, "cpus");
  12.           mem[i] = getResource(offer, "mem");
  13.         }

  14.         // Assign tasks to the nodes in a round-robin manner, and stop when we
  15.         // are unable to assign a task to any node.
  16.         // We do this by keeping a linked list of indices of nodes for which
  17.         // we are still considering assigning tasks. Whenever we can't find a
  18.         // new task for a node, we remove it from the list. When the list is
  19.         // empty, no further assignments can be made. This algorithm was chosen
  20.         // because it minimizing the amount of scanning we need to do if we
  21.         // get a large set of offered nodes.
  22.         List<Integer> indices = new LinkedList<Integer>();
  23.         List<List<TaskInfo>> replies =
  24.             new ArrayList<List<TaskInfo>>(numOffers);
  25.         for (int i = 0; i < numOffers; i++) {
  26.           indices.add(i);
  27.           replies.add(new ArrayList<TaskInfo>());
  28.         }
  29.         while (indices.size() > 0) {
  30.           for (Iterator<Integer> it = indices.iterator(); it.hasNext();) {
  31.             int i =;
  32.             Offer offer = offers.get(i);
  33.             TaskInfo task = findTask(
  34.                 offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i]);
  35.             if (task != null) {
  36.               cpus[i] -= getResource(task, "cpus");
  37.               mem[i] -= getResource(task, "mem");
  38.               replies.get(i).add(task);
  39.             } else {
  40.               it.remove();
  41.             }
  42.           }
  43.         }

  44.         for (int i = 0; i < numOffers; i++) {
  45.           OfferID offerId = offers.get(i).getId();
  46.           Status status = d.launchTasks(offerId, replies.get(i));
  47.           if (status != Status.DRIVER_RUNNING) {
  48.             LOG.warn("SchedulerDriver returned irregular status: " + status);
  49.           }
  50.         }
  51.       }
  52.     } catch(Exception e) {
  53.       LOG.error("Error in resourceOffer", e);
  54.     }
  55.   }
一)创建了两个数组cpus和mem,这两个数组下标跟offers list的下标是一致的,用他们保存了list中对应下标offer的cpu和mem的数量。
(二)为这个offers list创建了一个索引数组indices,其中保存的是offers list中的下标。
(五)按照indices中的顺序遍历,整个offers list,调用了findTask(
                offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i])


  1. // Find a single task for a given node. Assumes JobTracker is locked.
  2.   private TaskInfo findTask(
  3.       SlaveID slaveId, String host, double cpus, double mem) {
  4.     if (cpus < cpusPerTask || mem < memPerTask) {
  5.       return null; // Too few resources are left on the node
  6.     }

  7.     TaskTrackerInfo ttInfo = getTaskTrackerInfo(host, slaveId);

  8.     // Pick whether to launch a map or a reduce based on available tasks
  9.     String taskType = null;
  10.     boolean haveMaps = canLaunchMap(host);
  11.     boolean haveReduces = canLaunchReduce(host);
  12.     //"Looking at " + host + ": haveMaps=" + haveMaps +
  13.     // ", haveReduces=" + haveReduces);
  14.     if (!haveMaps && !haveReduces) {
  15.       return null;
  16.     } else if (haveMaps && !haveReduces) {
  17.       taskType = "map";
  18.     } else if (haveReduces && !haveMaps) {
  19.       taskType = "reduce";
  20.     } else {
  21.       float mapToReduceRatio = 1;
  22.       if (ttInfo.reduces.size() < ttInfo.maps.size() / mapToReduceRatio)
  23.         taskType = "reduce";
  24.       else
  25.         taskType = "map";
  26.     }
  27.     //"Task type chosen: " + taskType);

  28.     // Get a Mesos task ID for the new task
  29.     TaskID mesosId = newMesosTaskId();

  30.     // Remember that it is launched
  31.     boolean isMap = taskType.equals("map");
  32.     if (isMap) {
  33.       unassignedMaps++;
  34.     } else {
  35.       unassignedReduces++;
  36.     }
  37.     MesosTask nt = new MesosTask(isMap, mesosId, host);
  38.     mesosIdToMesosTask.put(mesosId, nt);
  39.     ttInfo.add(nt);

  40."Launching Mesos task " + mesosId.getValue() +
  41.              " as " + taskType + " on " + host);

  42.     // Create a task description to pass back to Mesos.
  43.     return TaskInfo.newBuilder()
  44.         .setTaskId(mesosId)
  45.         .setSlaveId(slaveId)
  46.         .setName("task " + mesosId.getValue() + " (" + taskType + ")")
  47.         .addResources(makeResource("cpus", cpusPerTask))
  48.         .addResources(makeResource("mem", memPerTask))
  49.         .setExecutor(getExecutorInfo())
  50.         .build();
  51.   }


  1. private boolean canLaunchMap(String host) {
  2.     // Check whether the TT is saturated on maps
  3.     TaskTrackerInfo ttInfo = ttInfos.get(host);
  4.     if (ttInfo == null) {
  5.       throw new RuntimeException("Expecting TaskTrackerInfo for host " + host);
  6.     }

  7.     if (ttInfo.maps.size() >= ttInfo.maxMaps) {
  8.       return false;
  9.     }

  10.     // Compute the total demand for maps to make sure we don't exceed it
  11.     Collection<JobInProgress> jobs =;
  12.     int neededMaps = 0;
  13.     for (JobInProgress job : jobs) {
  14.       if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  15.         neededMaps += job.pendingMaps();
  16.       }
  17.     }
  18.     // TODO (!!!): Count speculatable tasks and add them to neededMaps
  19.     // For now, we just add 1
  20.     if (jobs.size() > 0)
  21.       neededMaps += 1;

  22.     if (unassignedMaps < neededMaps) {
  23.       // 0. check for a failed map task to place. These tasks are not included
  24.       // in the "normal" lists of tasks in the JobInProgress object.
  25.       for (JobInProgress job: jobs) {
  26.         int state = job.getStatus().getRunState();
  27.         if (job.failedMaps != null && state == JobStatus.RUNNING) {
  28.           for (TaskInProgress tip : job.failedMaps) {
  29.             if (!tip.hasFailedOnMachine(host)) {
  30.               return true;
  31.             }
  32.           }
  33.         }
  34.       }

  35.       int maxLevel = Integer.MAX_VALUE;
  36.       // Look for a map with the required level
  37.       for (JobInProgress job: jobs) {
  38.         int state = job.getStatus().getRunState();
  39.         if (state == JobStatus.RUNNING) {
  40.           int availLevel = availableMapLevel(job, host, maxLevel);
  41.           if (availLevel != -1) {
  42.             lastMapWasLocal = (availLevel == 0);
  43.             return true;
  44.           }
  45.         }
  46.       }
  47.     }
1、从FrameworkSheduler保存的表:Map ttInfos中获取对应于该slave节点的TaskTrackerInfo,TaskTrackInfo中记载了什么东西呢?


  1. private static class TaskTrackerInfo {
  2.     SlaveID mesosSlaveId;
  3.     List<MesosTask> maps = new LinkedList<MesosTask>();
  4.     List<MesosTask> reduces = new LinkedList<MesosTask>();
  5.     int maxMaps = 1;
  6.     int maxReduces = 1;

  7.     public TaskTrackerInfo(SlaveID mesosSlaveId) {
  8.       this.mesosSlaveId = mesosSlaveId;
  9.     }

  10.     void add(MesosTask nt) {
  11.       if (nt.isMap)
  12.         maps.add(nt);
  13.       else
  14.         reduces.add(nt);
  15.     }

  16.     public void remove(MesosTask nt) {
  17.       if (nt.isMap)
  18.         maps.remove(nt);
  19.       else
  20.         reduces.remove(nt);
  21.     }





(4)我们知道在该节点上是应该启动一个map和reduce任务之后,就产生一个新的TaskID为这个新任务。更新unassignedMaps或者unassignedReduces两个计数器。创建一个新的MesosTask对象:    MesosTask nt = new MesosTask(isMap, mesosId, host);将其加入到Map mesosIdToMesosTask这张表之中,便于查找。将该MesosTask加入到该salve节点对应TaskTrackInfo之中。同时返回一个TaskInfo对象,其中记录了任务的mesosid,slaveid,executorinfo等信息



(六)接下来就是遍历replies,去启动这些任务,调用了SchedulerDriver.launchTasks()方法。调用我们之前sched.cpp之中launchTasks()方法,该方法会产生一条LanuchTasksMessage消息中包含了frameworkid、offerId、所有在该slave节点上的task的信息,一起发送给了Mesos master



  1. void Master::launchTasks(const FrameworkID& frameworkId,
  2.                          const OfferID& offerId,
  3.                          const vector<TaskInfo>& tasks,
  4.                          const Filters& filters)
  5. {
  6.   Framework* framework = getFramework(frameworkId);
  7.   if (framework != NULL) {
  8.     // TODO(benh): Support offer "hoarding" and allow multiple offers
  9.     // *from the same slave* to be used to launch tasks. This can be
  10.     // accomplished rather easily by collecting and merging all offers
  11.     // into a mega-offer and passing that offer to
  12.     // Master::processTasks.
  13.     Offer* offer = getOffer(offerId);
  14.     if (offer != NULL) {
  15.       CHECK(offer->framework_id() == frameworkId);
  16.       Slave* slave = getSlave(offer->slave_id());
  17.       CHECK(slave != NULL) << "An offer should not outlive a slave!";
  18.       processTasks(offer, framework, slave, tasks, filters);
  19.     } else {
  20.       // The offer is gone (possibly rescinded, lost slave, re-reply
  21.       // to same offer, etc). Report all tasks in it as failed.
  22.       // TODO: Consider adding a new task state TASK_INVALID for
  23.       // situations like these.
  24.       LOG(WARNING) << "Offer " << offerId << " is no longer valid";
  25.       foreach (const TaskInfo& task, tasks) {
  26.         StatusUpdateMessage message;
  27.         StatusUpdate* update = message.mutable_update();
  28.         update->mutable_framework_id()->MergeFrom(frameworkId);
  29.         TaskStatus* status = update->mutable_status();
  30.         status->mutable_task_id()->MergeFrom(task.task_id());
  31.         status->set_state(TASK_LOST);
  32.         status->set_message("Task launched with invalid offer");
  33.         update->set_timestamp(Clock::now());
  34.         update->set_uuid(UUID::random().toBytes());
  35.         send(framework->pid, message);
  36.       }
  37.     }
  38.   }
  39. }


