通过前一篇文章,大家应该对Yarn运行App的机制有了一个大概的了解,那下面我们开始对每个过程进行具体而详细的分析.
yarn与客户端交互的接口
名为ClientRMProtocol,它里面有两个方法
- GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request)
-
throws YarnRemoteException
-
-
SubmitApplicationResponse submitApplication(SubmitApplicationRequest request)
-
throws YarnRemoteException
client每次在提交作业之前都需要调用getNewApplication()获取一个新的id,其过程如下
ClientRMProtocolPBClientImpl实现了
ClientRMProtocol接口,其代码如下:
- GetNewApplicationRequestProto requestProto = ((GetNewApplicationRequestPBImpl)request).getProto();
-
try {
-
return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, requestProto));
-
}
那proxy又是什么呢? hadoop内部实现了一个rpc机制,yarn继续沿用了这个机制
- proxy = (ClientRMProtocolService.BlockingInterface)RPC.getProxy(
-
ClientRMProtocolService.BlockingInterface.class, clientVersion, addr, conf);
proxy是通过反射实现的, 是一个实现了BlockingInterface的对象:ClientRMProtocolPBServiceImpl,其getNewApplication()代码如下:
- GetNewApplicationResponse response = real.getNewApplication(request);
-
return ((GetNewApplicationResponsePBImpl)response).getProto();
real是服务端即RM的rpc实现类ClientRMService, getNewApplication()的实现如下
- public GetNewApplicationResponse getNewApplication(
-
GetNewApplicationRequest request) throws YarnRemoteException {
-
GetNewApplicationResponse response = recordFactory
-
.newRecordInstance(GetNewApplicationResponse.class);
-
response.setApplicationId(getNewApplicationId());
-
// Pick up min/max resource from scheduler...
-
response.setMinimumResourceCapability(scheduler
-
.getMinimumResourceCapability());
-
response.setMaximumResourceCapability(scheduler
-
.getMaximumResourceCapability());
-
return response;
-
}
getNewApplicationId():
- ApplicationId applicationId = org.apache.hadoop.yarn.util.BuilderUtils
-
.newApplicationId(recordFactory, ResourceManager.clusterTimeStamp,
-
applicationCounter.incrementAndGet());
可以看出一个新的application id 由RM的时间戳与当前application数组成,每来一个application,id
同时getNewApplication()会返回当前最大/最小的资源承载能力,
client端调用完getNewApplication()函数之后调用,就开始设置应用的ApplicationSubmissionContext,里面包含RM启动application所需的信息,包括:
- 应用信息: id 和名字
- 队列信息:属于哪个组,优先级等,所属用户
- 启动container信息:应用启动所需可执行文件,jar包,配置文件,环境配置脚本,以及启动令行
设置这些之后,就调用submitApplication(appRequest),提交该应用。submitApplication()通过如上的rpc过程最终调用ClientRMService::submitApplication(),即服务端的实现。
- public SubmitApplicationResponse submitApplication(
-
SubmitApplicationRequest request) throws YarnRemoteException {
-
.....
-
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System.currentTimeMillis()));
-
.....
-
}
这个函数最主要做的一件事就是向rmAppManager发送了RMAppManagerSubmitEvent,这个事件类型是RMAppManagerEventType.APP_SUBMIT。
发送这个事件就触发了提交的app在RM中的生命周期,为了更容易,更全面的理解这个过程,我们下面将开始分析ResourceManager类,该类就是RM进程对应的类,是YARN的核心
阅读(3994) | 评论(0) | 转发(0) |