Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85538
  • 博文数量: 18
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 321
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-30 21:09
文章分类

全部博文(18)

文章存档

2015年(3)

2014年(9)

2013年(6)

我的朋友

分类: HADOOP

2013-08-11 14:48:11

基本的流程:
1.  跟RM做通讯,向RM注册自己,并取得自己的所在位置(机器,端口,tracking url);从这里可以看出,AM被AMS调度并起来后,刚开始对自己的情况是什么都不知道的,包括自己在那台机器上,目前的监听端口等等;这些都需要RM告诉他;
2.  和RM保持heartbeat的通讯
3.  向RM申请执行真正shell的container
4.  对于申请到的container,设置各种执行资源、环境变量、执行命令等
5. 和RM通讯,了解每一个container的执行情况

代码分析:
1.  public boolean init(String[] args) throws ParseException, IOException
这个函数的主要作用是分析传递给其的args参数以及system.env(); system.env()里面的很多值,是client在递交时指定的;Container在启动时,会将这些值设置到system系统变量中去;

2. public boolean run() throws YarnRemoteException

 // 和RM建立RPC通讯
 resourceManager = new AMRMClientImpl(appAttemptID);
 resourceManager.init(conf);
 resourceManager.start();

 // 向RM注册,传入的参数应该是一些hint信息     
RegisterApplicationMasterResponse response = resourceManager
          .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
              appMasterTrackingUrl);
  
// 向RM申请资源是采用ContainerRequest结构,该结构来表明申请每个container的内存大小,优先级,总共需要申请多少个container等等
ContainerRequest request = new ContainerRequest(capability, null, null, pri, numContainers);

// 设置需要的资源信息
 resourceManager.addContainerRequest(request);
// 向RM申请资源
AllocateResponse resp = resourceManager.allocate(progressIndicator);

// 可以采用递归的方式,从resp中取得每一个container的信息,
对于每一个container,其需要关注的参数有
allocatedContainer.getId() // 当前container的id
allocatedContainer.getNodeId().getHost() // container所处NodeManager的hostname
allocatedContainer.getNodeId().getPort() // container所处NodeManager的RPC的端口
allocatedContainer.getNodeHttpAddress() // container所处NodeManager的http端口,应该用于url tracking

// 当container从RM被分配后,就需要和NodeManager进行通讯,真正将shell 脚本运行起来

    private void connectToCM() {
      String cmIpPortStr = container.getNodeId().getHost() + ":"
          + container.getNodeId().getPort();
      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
      this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
          cmAddress, conf));
    }

// 建立shell脚本的执行环境
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setContainerId(container.getId());
ctx.setResource(container.getResource());
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
ctx.setUser(jobUserName);
// 将脚本的env设置在这里了
ctx.setEnvironment(shellEnv);
// 再设置LocalResouce,即那个shell script需要设置
// 再设置脚本的启动命令及参数

// 启动这个container
StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
cm.startContainer(startReq);










阅读(2053) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~