一.RPC协议
1.hadoop中所有RPC协议都继承了VersionedProtocol接口,其中只有一个方法getProtocolVersion();
2.hadoop中的RPC协议
(1)HDFS相关
ClientDatanodeProtocol :一个客户端和datanode之间的协议接口;
ClientProtocol :client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
DatanodeProtocol : Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol :SecondaryNode与Namenode交互的接口;例如checkpoint;
(2)Mapreduce相关
InterDatanodeProtocol :Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol :TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
JobSubmissionProtocol :JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
TaskUmbilicalProtocol :Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的);
二.Client源码概述
有以下几个内部类:
Call :用于封装Invocation对象(Invocation类实现了wriable接口,Invocation类的成员变量包括目标方法名,参数Class[],参数值Object[]),作为VO(相当于数据传输层),写到服务端,同时也用于存储从服务端返回的数据(写入Call.value变量中);
Connection :用以处理远程连接对象,继承了Thread;此对象有两个成员变量out:DataOutputStream和in:DataInputStream;在Connection.sendParam(Call call)方法中发送请求参数,在run()方法中读取in输入流,接受响应消息,并将响应结果写入Call.value变量中;
ConnectionId :唯一确定一个连接;
(1)客户端和服务端的连接是怎样建立的?
Client类中的cal()方法中: getConnection(remoteId, call); 用于获取链接;并将call添加到Client.Connection.calls:Hashtable中;
(2)客户端是怎样给服务端发送数据的?
Client类中的cal()方法中:connection.sendParam(call);
(3)客户端是怎样获取服务端的返回数据的?
当连接建立时会启动一个线程用于处理服务端返回的数据;在线程的run中会不停地调用receiveResponse() 方法,从Client.Connection.in输入流中读取服务端传来的call对象,并将结果写入call.value中,将call对象读取完毕后,唤醒client处理线程。
三.RPC源码概述
Client与Server的交互通过RPC协议完成。Hadoop的RPC主要是通过Java的动态代理(Dynamic Proxy)与反射(Reflect)实现,有以下几个主要类:
Client:RPC服务的客户端
RPC:实现了一个简单的RPC模型
Server:服务端的抽象类
RPC.Server:服务端的具体类
VersionedProtocol:所有的使用RPC服务的类都要实现该接口,在创建代理时,用来判断代理对象是否创建正确。
1.RPC调用的流程
(1)先根据那个接口动态代理生成一个代理对象,调用这个代理对象的时候;
(2)用户的调用请求被RPC捕捉到,然后包装成调用请求,序列化成数据流发送到服务端;
(3)服务端从数据流中解析出调用请求,然后根据用户所希望调用的接口,通过反射调用接口真正的实现对象,再把调用结果返回给客户端。
2.RPC中的主要静态方法:
(1)***Proxy,获取客户端代理对象
waitForProxy、getProxy、stopProxy均是与代理有关的方法,其中wait需要保证namenode启动正常且连接正常,主要由SecondayNode、Datanode、JobTracker使用。
(2)getServer,创建并返回一个Server实例,由TaskTracker、JobTracker、NameNode、DataNode使用。
(3)call,向一系列服务器发送一系列请求,在源码中没见到那个类使用该方法。但注释提到了:Expert,应该是给系统管理员使用的接口。
3.RPC静态类
RPC方法仅仅提到了方法的作用,但是具体实现没说,具体实现就涉及到了RPC的静态类了,RPC类中有5个静态内部类,分别为:
RPC.ClientCache:用来缓存Client对象;
RPC.Invocation:每次RPC调用传的参数实体类,其中Invocation包括了调用方法(Method)和配置文件;
RPC.Invoker:具体的调用类,采用Java的动态代理机制,继承自InvocationHandler,有remoteId和client成员,id用以标识异步请求对象,client用以调用实现代码;
RPC.Server:org.apache.hadoop.ipc.Server的具体类,实现了抽象类的call方法,获得传入参数的call实例,再获取method方法,调用即可。用的是反射机制;
RPC.VersionMismatch:版本不匹配异常。
四.Server源码分析
有几个内部类:
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :连接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并调用RPC.Server.call()方法,在此方法中采用反射机制调用目标方法;
1.Server处理请求
(1)建立连接
Server端采用java NIO技术建立连接;Listener负责监听客户端的连接;
启动Listener线程时,服务端会一直等待客户端的连接,有连接到来时,调用doAccept(),从readers池中获取一个reader:Listener.Reader,并在此reader上注册该连接,进行读取数据操作
(2)接收请求
由Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法进行读取操作,调用Connection.processOneRpc(byte[] buf)将读取到数据封装成call对象存入Server.callQueue队列中;
(3)处理call对象
Server.Handler线程负责从Server.callQueue队列中获取call对象并处理;由实现类RPC.Server的call方法处理,采用的时反射调用真正到目标方法;
(4)返回请求
Server.Responder类中的doRespond()方法
阅读(753) | 评论(0) | 转发(0) |