基本的流程:
1. 跟RM做通讯,向RM注册自己,并取得自己的所在位置(机器,端口,tracking url);从这里可以看出,AM被AMS调度并起来后,刚开始对自己的情况是什么都不知道的,包括自己在那台机器上,目前的监听端口等等;这些都需要RM告诉他;
2. 和RM保持heartbeat的通讯
3. 向RM申请执行真正shell的container
4. 对于申请到的container,设置各种执行资源、环境变量、执行命令等
5. 和RM通讯,了解每一个container的执行情况
代码分析:
1. public boolean init(String[] args) throws ParseException, IOException
这个函数的主要作用是分析传递给其的args参数以及system.env(); system.env()里面的很多值,是client在递交时指定的;Container在启动时,会将这些值设置到system系统变量中去;
2. public boolean run() throws YarnRemoteException
// 和RM建立RPC通讯
resourceManager = new AMRMClientImpl(appAttemptID);
resourceManager.init(conf);
resourceManager.start();
// 向RM注册,传入的参数应该是一些hint信息
RegisterApplicationMasterResponse response = resourceManager
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
// 向RM申请资源是采用ContainerRequest结构,该结构来表明申请每个container的内存大小,优先级,总共需要申请多少个container等等
ContainerRequest request = new ContainerRequest(capability, null, null, pri, numContainers);
// 设置需要的资源信息
resourceManager.addContainerRequest(request);
// 向RM申请资源
AllocateResponse resp = resourceManager.allocate(progressIndicator);
// 可以采用递归的方式,从resp中取得每一个container的信息,
对于每一个container,其需要关注的参数有
allocatedContainer.getId() // 当前container的id
allocatedContainer.getNodeId().getHost() // container所处NodeManager的hostname
allocatedContainer.getNodeId().getPort() // container所处NodeManager的RPC的端口
allocatedContainer.getNodeHttpAddress() // container所处NodeManager的http端口,应该用于url tracking
// 当container从RM被分配后,就需要和NodeManager进行通讯,真正将shell 脚本运行起来
private void connectToCM() {
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
cmAddress, conf));
}
// 建立shell脚本的执行环境
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setContainerId(container.getId());
ctx.setResource(container.getResource());
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
ctx.setUser(jobUserName);
// 将脚本的env设置在这里了
ctx.setEnvironment(shellEnv);
// 再设置LocalResouce,即那个shell script需要设置
// 再设置脚本的启动命令及参数
// 启动这个container
StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
cm.startContainer(startReq);
阅读(2053) | 评论(0) | 转发(0) |