分布式RPC
分布式RPC(DRPC)的真正目的是使用storm实时并行计算极端功能。Storm拓扑需要一个输入流作为函数参数,以一个输出流的形式发射每个函数调用的结果。
DRPC没有多少storm特性,因为它是从storm的原始流,spouts,bolts,拓扑来表达一个模式。DRPC没有单独打包,但它如此有用,以至于和storm捆绑在一起。
概述
分布式RPC通过“DRPC server”协调。DRPC服务器协调接收一个RPC请求,发送请求到storm拓扑,从storm拓扑接收结果,发送结果回等待的客户端。从一个客户端的角度来看,一个分布式RPC调用就像是一个常规的RPC调用。例如,一个客户端如何为带“http://twitter.com”参数的“reach”功能计算结果。
- DRPCClient client = new DRPCClient("drpc-host", 3772);
- String result = client.execute("reach", "http://twitter.com");
客户端发送功能名称及功能所需参数到DRPC服务器去执行。图中的拓扑实现了此功能,它使用DRPCSpout从DRPC服务器接收功能调用流。每个功能调用通过DRPC服务器使用唯一ID标记,随后拓扑计算结果,在拓扑的最后,一个称之为“ReturnResults”的bolt连接到DRPC服务器,把结果交给这个功能调用(根据功能调用ID),DRPC服务器根据ID找到等待中的客户端,为等待中的客户端消除阻塞,并发送结果给客户端。
LinearDRPCTopologyBuilder
Storm有一个称之为LinearDRPCTopologyBuilder的拓扑Builder几乎自动完成DRPC所需的所有相关步骤。包括:
1.设置spout
2.返回结果给DRPC服务器
3.为bolt提供对一组元组的有限聚合功能
让我们看一个简单的例子。这是一个DRPC拓扑的实现,在输入参数的尾部追加“!”并返回:
- public static class ExclaimBolt implements IBasicBolt {
- public void prepare(Map conf, TopologyContext context) {
- }
-
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String input = tuple.getString(1);
- collector.emit(new Values(tuple.getValue(0), input + "!"));
- }
-
- public void cleanup() {
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "result"));
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
- builder.addBolt(new ExclaimBolt(), 3);
-
- }
如你所见,代码非常少。当创建时,你把这个拓扑的DRPC功能名称告诉storm。一个DRPC服务器可以协调许多功能,功能名称用于区别不同的功能,首先声明的bolt将接收一个输入的2-tuples,第一个字段是请求ID,第二个字段是请求参数。认为最后的bolt会发射一个输出流,该输出流包含[id, result]格式的2-tuples。最后,所有拓扑中间过程产生的元组(tuple)都包含请求id作为其第一个字段。
在这个例子中,ExclaimBolt只是简单地在元组的第二个字段尾部追加“!”字符。LinearDRPCTopologyBuilder处理其余的协调工作,包括连接DRPC服务器,发送最终结果。
本地模式DRPC
DRPC可以运行在本地模式。这是如何在本地模式运行上述例子:
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
-
- cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
-
- System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
-
- cluster.shutdown();
- drpc.shutdown();
首先你创建一个LocalDRPC对象。这个对象在进程内模拟一个DRPC服务器,就像在进程内模拟一个storm集群一样。然后你创建本地集群,在本地模式运行这个拓扑。创建本地拓扑和远程拓扑,LinearDRPCTopologyBuilder有不同的方法。在本地模式,LocalDRPC未绑定任何端口,拓扑也需要知道与哪个对象通讯,这是为什么createLocaclTopology方法需要接受LocalDRPC对象作为输入参数的原因。
载入拓扑后,你可以用LocalDRPC的execute方法执行DRPC调用。
远程模式DRPC
在实际的集群使用DRPC也很简单。有三个步骤:
1. 启动DRPC服务器
2. 配置DRPC服务器位置
3. 提交DRPC拓扑到storm集群
使用storm脚本启动DRPC服务器,和启动nimbus和ui一样:
- bin/storm drpc
接下来,配置你的storm集群,让集群知道DRPC服务器的位置,这样DRPCSpout就知道从哪里读取功能调用。可以通过修改storm.yaml配置文件或拓扑配置完成配置DRPC服务器位置。修改storm.yaml配置文件如下所示:
- drpc.servers:
- - "drpc1.foo.com"
- - "drpc2.foo.com"
最后,使用StormSubmitter启动DRPC拓扑,就像启动其它拓扑一样。在远程模式运行上述例子,代码如下所示:
- StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology方法用于在storm集群创建拓扑。
一个更完整的例子
这个exclaimation DRPC例子只是一个用来说明DRPC概念的玩具。让我们看一个更完整的例子,该例子是一个真正需要storm集群的并行计算的DRPC功能。我们将要看的例子是对twitter网站上的一个URL的接触用户进行统计。
一个URL的接触用户数是在twitter网站上接触一个URL的用户数,你需要以下4步:
1. 获取tweeted the URL的全部用户
2. 获取这些用户的全部追随者
3. 使追随者集合中的用户唯一
4. 统计唯一的用户数
一个单独的reach计算在计算期间涉及到数千数据库访问和数千万追随者记录。它是一个真正的耗时计算。正如你将要看到的,在storm上实现这个功能非常简单。在一台机器上,reach计算花费数分钟,在storm集群,最难计算reach的URL也只需数秒。
Storm-starter项目定义了一个reach样例,reach拓扑定义如下所示:
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
- builder.addBolt(new GetTweeters(), 3);
- builder.addBolt(new GetFollowers(), 12)
- .shuffleGrouping();
- builder.addBolt(new PartialUniquer(), 6)
- .fieldsGrouping(new Fields("id", "follower"));
- builder.addBolt(new CountAggregator(), 2)
- .fieldsGrouping(new Fields("id"));
这个拓扑以4个步骤的形式执行:
1. GetTweeters获取tweeted the URL的用户。它转换一个[id, url]形式的输入流到[id, tweeter]形式的输出流。每个url元组将映射到多个tweeter元组。
2. GetFollowers获取这些tweeter的追随者。它转换一个[id, tweeter]形式的输入流到[id, follower]形式的输出流。跨所有任务,当某人追随多个tweeter,这些tweeter又tweeted相同的URL时,这可能会得到重复的追随者。
按追随者ID对追随者数据流进行分组。同一的追随者去到同一的任务,因此每个PartialUniquer任务都接收到独立的相互独立的追随者集合。一旦收到请求ID用于它的所有追随者元组,它就发射追随者子集的唯一总数。
4. 最后,CountAggregator从每个PartialUniquer任务接收计数并对它们求和。
让我们来看看:
- public static class PartialUniquer implements IRichBolt, FinishedCallback {
- OutputCollector _collector;
- Map
-
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
-
- public void execute(Tuple tuple) {
- Object id = tuple.getValue(0);
- Set curr = _sets.get(id);
- if(curr==null) {
- curr = new HashSet();
- _sets.put(id, curr);
- }
- curr.add(tuple.getString(1));
- _collector.ack(tuple);
- }
-
- public void cleanup() {
- }
-
- public void finishedId(Object id) {
- Set curr = _sets.remove(id);
- int count;
- if(curr!=null) {
- count = curr.size();
- } else {
- count = 0;
- }
- _collector.emit(new Values(id, count));
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "partial-count"));
- }
- }
当PartialUniquer在exectue方法中接收一个follower元组时,它用一个内部HashMap添加它到与请求ID对应的集合。
PartialUniquer也实现了FinishedCallback接口,它告诉LinearDRPCTopologyBuilder,对于任意给定的请求ID,当它已收到所有指向它的元组时,请通知它。这个回调是finishedId方法。在这个回调中,PartialUniquer发射单一的元组,元组包含它的追随者子集的唯一总数。
在底层,用于检测一个bolt何时收到该请求ID的所有元组。CoordinatedBolt使用direct stream管理协调。
其它的拓扑应该是不言自明。如你所见,reach计算的每一单步都是并行执行的,而且定义一个DRPC拓扑也非常简单。
Non-Linear DRPC拓扑
LinearDRPCTopologyBuilder仅处理“线性的”DRPC拓扑,计算以一连串步骤的形式表达(像reach)。不难想象某些功能将需要更复杂的拓扑结构,这些拓扑带有带分支和合并bolt。目前,要做到这一点,你需要直接使用CoordinateBolt。务必在邮件列表中谈谈你的非线性DRPC拓扑用例,写下DRPC拓扑更普遍的抽象结构。
LinearDRPCTopologyBuilder如何工作?
DRPCSpout发射[args, ],return-info是DRPC服务器的主机和端口,还有DRPC服务器生成的ID。
拓扑组成部分:
- DRPCSpout
- PrepareRequest(生成一个请求ID,创建一个返回信息流,一个参数流)
- CoordinatedBolt包装器和直接分组
- JoinResult(同返回信息一起连接结果)
- ReturnResult(连接DRPC服务器并返回结果)
- LinearDRPCTopologyBuilder是一个构建在Storm原语之上的高层次抽象的好例子。
高级
- 同时编排处理多个请的KeyedFairBolt
- 如何直接使用
英文: