一.NameNode的RPC架构如下图:
二.下面以DataNode向NameNode注册的实例来说明RPC调用的流程
Hadoop的RPC主要是通过Java的动态代理(Dynamic Proxy)与反射(Reflect)实现,以在DataNode端创建与NameNode交互的RPC协议DatanodeProtocol为例,分析DataNode与NameNode交互的底层RPC协议:
1.【(DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr,conf)】:创建DatanodeProtocol类的动态代理对象,主要的细节有:
1.1)【NetUtils.getDefaultSocketFactory(conf)】在其中会调用此方法创建socket工厂SocketFactory对象;首先获取配置参数"hadoop.rpc.socket.factory.class.default",若此参数不为空,则通过反射生成此类,若此参数为空,则【SocketFactory.getDefault()】生成默认的Socket工厂类;
1.2)此动态代理中实现了InvocationHandler接口的类是RPC.Invoker,故在创建代理的时候为初始化RPC.Invoker类的invoker对象;
1.2.1)调用Client.ConnectionId.getConnectionId(....)方法生成RPC.Invoker.remoteId:Client.ConnectionId对象,参数nameNodeAddr用于生成ConnectionId;
1.2.2)【ClientCache.getClient(Configuration conf, SocketFactory factory)】检查RPC.CLIENTS:ClientCache中是否存在factory的Client对象,若不存在则调用【new Client(ObjectWritable.class, conf, factory)】创建一个Client对象,Client.socketFactory=factory,Client.valueClass=ObjectWritable.class(表示值的class类型);并以factory为key存入RPC.CLIENTS:ClientCache中;返回此Client对象,最后赋值给RPC.Invoker.client:Client变量;
1.3)【(VersionedProtocol)Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, invoker)】生存参数protocol指定的RPC协议代理对象,其中invoker为回调接口InvocationHandler的实现类(在1.2步中创建的invoker对象);
1.4)返回1.3步中创建的代理对象,并作为DataNode与NameNode交互的DatanodeProtocol协议的动态代理对象;
2.在第1步中DataNode已经创建了用于与NameNode交互的DatanodeProtocol协议的动态代理对象DataNode.namenode,以DataNode向NameNode注册为例即调用namenode.register(dnRegistration)方法,分析RPC协议处理逻辑,此时DataNode作为RPC的客户端:
2.1)触发RPC.Invoker类的invoke(Object proxy, Method method, Object[] args)方法:
2.1.1)以method和args作为参数初始化Invocation对象,Invocation类实现了wriable接口,其中成员变量:methodName = method.getName(),parameterClasses = method.getParameterTypes(),parameters = parameters;
2.1.2)【Client.call(Writable param, ConnectionId remoteId)】,其中形参param为2.1.1步中初始化的Invocation对象:
2.1.2.1)以param为参数初始化Client.Call对象;
2.1.2.2)【Client.getConnection(ConnectionId remoteId, Call call)】,主要工作:
1)初始化Connection对象,并将此对象添加到Client.connections:Hashtable集合中;
2)以Call.id为key,call为value添加到Client.Connection.calls:Hashtable中,并调用notify()方法唤醒因calls为空而阻塞的Connection线程的run()方法(作用:获取响应消息);
3)【Connection.setupIOstreams()】,主要工作如下:
a)调用【Connection.setupConnection()】测试能成功连接到NameNode;
b)在此socket上建立输入流Client.Connection.in:DataInputStream和输出流Client.Connection.out:DataOutputStream
c)【Connection.writeHeader()】写入消息头到输出流Client.Connection.out中;
d)开启Connection线程;
2.1.2.3)【connection.sendParam(call)】将参数值发送到NameNode端,其中参数call是由param(2.1.1步创建的Invocation对象)参数值封装的Call对象,NameNode端会有线程监听从其他地方到来的请求,并进行处理,具体逻辑在《RPC服务端处理逻辑》中讲解;
1)初始化数据输出缓冲区DataOutputBuffer对象,首先将call.id写入缓存区;
2)其次将方法名methodName,parameterClasses.length写入缓冲区;
3)最后将参数类型以及参数值写入缓冲区;
4)将这个缓冲区的字节数据全部写入Client.Connection.out输出流中;
2.1.2.4)等待RPC方法调用的返回结果:判断call.done是否为true(RPC方法调用的结果已经返回),若为true则返回call.value给上一层,即Invoker.invoke()方法,至此DataNode通过RPC协议调用远程NameNode的方法已经获取到了返回值,此次RPC交互结束;
备注:在Connection线程反复调用Connection.receiveResponse()方法时,首先从Client.Connection.in读取call.id并根据此id从Client.Connection.calls中获取Call对象,然后从Client.Connection.in中读取的状态码,若状态码为成功,则继续读取返回值并调用call.setValue(value)方法将结果值设置Call对象的value中,在设置成功后调用Call.callComplete()将call.done置为true;具体逻辑见《Client.Connection线程接收返回值》部分;
3.【RPC服务端处理逻辑】经过第2.1.2.3步的处理,将参数值发送到的NameNode端,此时NameNode端作为RPC的服务端,进行如下处理:
3.1)背景说明:在第2.1.2.2步中DataNode是在"dfs.namenode.servicerpc-address"地址上与NameNode建立的输入流Client.Connection.in:DataInputStream和输出流Client.Connection.out:DataOutputStream,而NameNode启动的时候在"dfs.namenode.servicerpc-address"地址上创建并启动了NameNode.serviceRpcServer:RPC.Server对象,具体过程如下:
3.1.1)启动Server.Responder线程;响应RPC请求类,请求处理完毕,由Responder发送结果给请求客户端。
3.1.2)启动Server.Listener线程;用于监听客户端发来的请求,同时Listener内部还有一个静态类Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。在初始化Server.Listener线程时:首先初始化Listener.readers:Reader[]数组变量,其中数组个数由配置参数"ipc.server.read.threadpool.size"决定,缺省为1,此值赋值给Server.readThreads变量;其次调用Executors.newFixedThreadPool(int nThreads)方法初始化Listener.readPool:ExecutorService线程池Listener.readPool,其中线程池大小nThreads也是由Server.readThreads变量决定;最后调用new Reader(Selector.open())初始化Server.readThreads个Reader对象,并放入线程池Listener.readPool中,并启动;
3.1.3)初始化N个Server.Handler线程,添加到Server.handlers:Handler[]变量中并启动,请求处理类,会循环阻塞读取callQueue中的call对象,并调用RPC.Server.call()方法,在此方法中采用反射机制调用目标方法; N的大小由参数"dfs.namenode.service.handler.count"决定,缺省为10;
3.2)在3.1步中的工作是在NameNode启动时完成的,下面继续RPC服务端的讲解:
3.2.1)首先是监听客户端的请求。由Server.Listener线程负责完成;在启动Listener线程时,会一直等待客户端的连接,有连接到来时,调用【Listener.doAccept(SelectionKey key)】方法,此方法采用java NIO技术完成请求接受工作,大致思路如下:
1)从Listener.readers数组中获取一个reader:Listener.Reader,并调用【reader.registerChannel(channel)】在此reader.readSelector上注册该通道,在Reader线程上一直在监听readSelector上是否有读操作准备就绪;
2)在注册之后会得到一个SelectionKey对象,将调用【new Connection(readKey, channel, System.currentTimeMillis())】初始化Connection对象,并作为附件添加到SelectionKey对象上;
3.2.2)然后读取客户端传来的参数。当Listener.Reader线程监听到readSelector选择器上有读操作就绪时,调用【Listener.doRead(SelectionKey key)】方法将读取到数据封装成call对象存入Server.callQueue队列中:
1)【key.attachment()】调用此方法从SelectionKey对象的附件中取出Connection对象(在上一步中放入此附件的);
2)【Connection.readAndProcess()】调用此方法完成客户端请求中参数的读取工作,参数存入字节缓冲区中;
3)【Connection.processOneRpc(byte[] buf)】将传来的参数封装成Server.Call对象;
3.a)首先读取call.id值;
3.b)【ReflectionUtils.newInstance(paramClass, conf)】通过反射创建Invocation对象,Server.paramClass变量的值是NameNode在创建NameNode.serviceRpcServer时指定为Invocation.class;
3.c)将参数封装成Invocation对象param;
3.d)调用【new Server.Call(int id, Writable param, Connection connection)】传来的参数最终封装成Server.Call对象;并添加到Server.callQueue队列中;
3.2.3)其次处理客户端的请求,由Server.Handler线程Server.callQueue队列中获取call对象并处理:
1)调用【RPC.Server.call(Class> protocol, Writable param, long receivedTime)】,以DataNode向NameNode注册为例,在此方法中通过反射调用NameNode的register()方法,并调用【new ObjectWritable(method.getReturnType(), value)】以方法的返回值类型和方法返回值为参数初始化ObjectWritable对象;
2)将call.id,状态码以及返回值添加到Server.Call.response:ByteBuffer中;
3)将此Server.Call对象添加到Server.Connection.responseQueue:LinkedList链表中;若链表就只有一个Server.Call对象元素,则直接调用【Responder.processResponse(LinkedList responseQueue, boolean inHandler=true)】方法将写入通道中,同时由于inHandler=true,则将此通道以写操作注册到Responder.writeSelector选择器上;
3.2.4)最后将方法调用的返回值发送给客户端,Server.Responder类中的doRespond()方法:
1)writeSelector.select(900000); 即在此通道上监听15分钟,15分钟到后无论有没有请求都返回;
2)若有请求达到,检查Responder.writeSelector选择器监听通道是否有写操作就绪的请求,由于第一个添加到Server.Connection.responseQueue链表时,就以inHandler=true调用了Responder.processResponse(LinkedList responseQueue, boolean inHandler)方法,故已经将写操作注册到此通道上,写操作已经就绪,则调用【Responder.doAsyncWrite(SelectionKey key)】方法,内部调用processResponse(call.connection.responseQueue, false)方法将返回值写入通道中;
3)上一步处理完毕后,若未超过15分钟,则继续上述1-2步;若超过了则将此通道上的连接关闭;
4.【Client.Connection线程接收返回值】Client.Connection线程负责接受从RPC服务端返回的方法调用结果值;
4.1)【Client.Connection.waitForWork()】检查是否所有的Call对象的结果值已经全部得到;若Client.Connection.calls不为空,Client.Connection.shouldCloseConnection=false,Client线程还在运行,则调用4.2步;若Client.Connection.calls为空,则将Connection.shouldCloseConnection置为true,则调用【Client.Connection.close()】若shouldCloseConnection=true,则调用【Client.Connection.close()】;
4.2)【Client.Connection.receiveResponse()】:
4.2.1)首先从Client.Connection.in读取call.id并根据此id从Client.Connection.calls中获取Call对象;
4.2.2)然后从Client.Connection.in中读取的状态码,若状态码为成功,则继续读取返回值并调用call.setValue(value)方法将结果值设置Call对象的value中,在设置成功后调用Call.callComplete()将call.done置为true;从而在2.1.2.4步中等待返回值的线程继续执行;
三.为了NameNode,DataNode,SecondaryNameNode,DFSClient之间能通过RPC进行远程调用,分别在启动时各自启动了Server服务,如下:
1.NameNode启动时在参数"dfs.namenode.servicerpc-address"上启动了serviceRpcServer:Server服务;用于处理DataNode和SecondaryNameNode的RPC调用请求;
2.NameNode启动时在参数"dfs.namenode.rpc-address"上启动了Server服务;用于处理DFSClient的RPC调用请求;
3.DataNode启动时在参数"dfs.datanode.ipc.address"上启动了Server服务;用于处理DFSClient的RPC调用请求;(将Server的端口号赋值给DataNode.dnRegistration.ipcPort,dnRegistration注册到NameNode)
阅读(1011) | 评论(0) | 转发(0) |