Chinaunix首页 | 论坛 | 博客
  • 博客访问: 44865
  • 博文数量: 6
  • 博客积分: 173
  • 博客等级: 入伍新兵
  • 技术积分: 85
  • 用 户 组: 普通用户
  • 注册时间: 2011-04-05 14:56
文章分类

全部博文(6)

文章存档

2012年(3)

2011年(3)

我的朋友

分类: 云计算

2011-12-23 17:01:00

通过前一篇文章,大家应该对Yarn运行App的机制有了一个大概的了解,那下面我们开始对每个过程进行具体而详细的分析.

yarn与客户端交互的接口 名为ClientRMProtocol,它里面有两个方法

  1. GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request)
  2. throws YarnRemoteException

  3. SubmitApplicationResponse submitApplication(SubmitApplicationRequest request)
  4. throws YarnRemoteException
client每次在提交作业之前都需要调用getNewApplication()获取一个新的id,其过程如下
ClientRMProtocolPBClientImpl实现了ClientRMProtocol接口,其代码如下:
  1. GetNewApplicationRequestProto requestProto = ((GetNewApplicationRequestPBImpl)request).getProto();
  2. try {
  3. return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, requestProto));
  4. }
那proxy又是什么呢? hadoop内部实现了一个rpc机制,yarn继续沿用了这个机制
  1. proxy = (ClientRMProtocolService.BlockingInterface)RPC.getProxy(
  2. ClientRMProtocolService.BlockingInterface.class, clientVersion, addr, conf);
proxy是通过反射实现的, 是一个实现了BlockingInterface的对象:ClientRMProtocolPBServiceImpl,其getNewApplication()代码如下:
  1. GetNewApplicationResponse response = real.getNewApplication(request);
  2. return ((GetNewApplicationResponsePBImpl)response).getProto();
real是服务端即RM的rpc实现类ClientRMService, getNewApplication()的实现如下
  1. public GetNewApplicationResponse getNewApplication(
  2. GetNewApplicationRequest request) throws YarnRemoteException {
  3. GetNewApplicationResponse response = recordFactory
  4. .newRecordInstance(GetNewApplicationResponse.class);
  5. response.setApplicationId(getNewApplicationId());
  6. // Pick up min/max resource from scheduler...
  7. response.setMinimumResourceCapability(scheduler
  8. .getMinimumResourceCapability());
  9. response.setMaximumResourceCapability(scheduler
  10. .getMaximumResourceCapability());
  11. return response;
  12. }
getNewApplicationId():
  1. ApplicationId applicationId = org.apache.hadoop.yarn.util.BuilderUtils
  2. .newApplicationId(recordFactory, ResourceManager.clusterTimeStamp,
  3. applicationCounter.incrementAndGet());
可以看出一个新的application id 由RM的时间戳与当前application数组成,每来一个application,id
同时getNewApplication()会返回当前最大/最小的资源承载能力,
client端调用完getNewApplication()函数之后调用,就开始设置应用的ApplicationSubmissionContext,里面包含RM启动application所需的信息,包括:
  1.  应用信息: id 和名字
  2.  队列信息:属于哪个组,优先级等,所属用户
  3.  启动container信息:应用启动所需可执行文件,jar包,配置文件,环境配置脚本,以及启动令行
设置这些之后,就调用submitApplication(appRequest),提交该应用。submitApplication()通过如上的rpc过程最终调用ClientRMService::submitApplication(),即服务端的实现。
  1. public SubmitApplicationResponse submitApplication(
  2. SubmitApplicationRequest request) throws YarnRemoteException {
  3. .....
  4. rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System.currentTimeMillis()));
  5. .....
  6. }
这个函数最主要做的一件事就是向rmAppManager发送了RMAppManagerSubmitEvent,这个事件类型是RMAppManagerEventType.APP_SUBMIT。

发送这个事件就触发了提交的app在RM中的生命周期,为了更容易,更全面的理解这个过程,我们下面将开始分析ResourceManager类,该类就是RM进程对应的类,是YARN的核心

阅读(3940) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~