Chinaunix首页 | 论坛 | 博客
  • 博客访问: 958261
  • 博文数量: 253
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2609
  • 用 户 组: 普通用户
  • 注册时间: 2019-03-08 17:29
个人简介

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

文章分类

全部博文(253)

文章存档

2022年(60)

2021年(81)

2020年(83)

2019年(29)

我的朋友

分类: Java

2022-04-07 10:06:16

一、引言


RocketMQ是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。


RocketMQ主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr负责存储元数据,各组件的主要功能如下:


  • 消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。


  • 消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。


  • 代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。


  • 名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。


  • 生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。


  • 消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。


RocketMQ整体消息处理逻辑上以Topic维度进行生产消费、物理上会存储到具体的Broker上的某个MessageQueue当中,正因为一个Topic会存在多个Broker节点上的多个MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。


本篇文章分析的核心在于介绍RocketMQ的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。


二、RocketMQ的整体架构


(图片来自于Apache RocketMQ)


RocketMQ架构上主要分为四部分,如上图所示:


  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。


  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。


  • NameServer:NameServer是一个非常简单的Topic路由注册中心,支持分布式集群方式部署,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。


  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群方式部署。



RocketMQ的Topic的物理分布如上图所示:


Topic作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的Broker当中。


Broker中的Queue是Topic对应消息的物理存储单元。


在RocketMQ的整体设计理念当中,消息的生产消费以Topic维度进行,每个Topic会在RocketMQ的集群中的Broker节点创建对应的MessageQueue。


producer生产消息的过程本质上就是选择Topic在Broker的所有的MessageQueue并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。


consumer消费消息的过程本质上就是一个订阅同一个Topic的consumerGroup下的每个consumer按照一定的规则负责Topic下一部分MessageQueue进行消费。


在RocketMQ整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到Broker选择的负载均衡,消息的消费过程主要涉及多consumer和多Broker之间的负责均衡。


三、producer消息生产过程



producer消息生产过程:


  • producer首先访问namesvr获取路由信息,namesvr存储Topic维度的所有路由信息(包括每个topic在每个Broker的队列分布情况)。


  • producer解析路由信息生成本地的路由信息,解析Topic在Broker队列信息并转化为本地的消息生产的路由信息。


  • producer根据本地路由信息向Broker发送消息,选择本地路由中具体的Broker进行消息发送。


3.1 路由同步过程


  1. public class MQClientInstance {
  2.  
  3.     public boolean updateTopicRouteInfoFromNameServer(final String topic) {
  4.         return updateTopicRouteInfoFromNameServer(topic, false, null);
  5.     }
  6.  
  7.  
  8.     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
  9.         DefaultMQProducer defaultMQProducer) {
  10.         try {
  11.             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
  12.                 try {
  13.                     TopicRouteData topicRouteData;
  14.                     if (isDefault && defaultMQProducer != null) {
  15.                         // 省略对应的代码
  16.                     } else {
  17.                         // 1、负责查询指定的Topic对应的路由信息
  18.                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
  19.                     }
  20.  
  21.                     if (topicRouteData != null) {
  22.                         // 2、比较路由数据topicRouteData是否发生变更
  23.                         TopicRouteData old = this.topicRouteTable.get(topic);
  24.                         boolean changed = topicRouteDataIsChange(old, topicRouteData);
  25.                         if (!changed) {
  26.                             changed = this.isNeedUpdateTopicRouteInfo(topic);
  27.                         }
  28.                         // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息
  29.                         if (changed) {
  30.                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
  31.  
  32.                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
  33.                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
  34.                             }
  35.  
  36.                             // 生成生产者对应的Topic信息
  37.                             {
  38.                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
  39.                                 publishInfo.setHaveTopicRouterInfo(true);
  40.                                 Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
  41.                                 while (it.hasNext()) {
  42.                                     Entry<String, MQProducerInner> entry = it.next();
  43.                                     MQProducerInner impl = entry.getValue();
  44.                                     if (impl != null) {
  45.                                         impl.updateTopicPublishInfo(topic, publishInfo);
  46.                                     }
  47.                                 }
  48.                             }
  49.                             // 保存到本地生产者路由表当中
  50.                             this.topicRouteTable.put(topic, cloneTopicRouteData);
  51.                             return true;
  52.                         }
  53.                     }
  54.                 } finally {
  55.                     this.lockNamesrv.unlock();
  56.                 }
  57.             } else {
  58.             }
  59.         } catch (InterruptedException e) {
  60.         }
  61.  
  62.         return false;
  63.     }
  64. }

路由同步过程


  • 路由同步过程是消息生产者发送消息的前置条件,没有路由的同步就无法感知具体发往那个Broker节点。


  • 路由同步主要负责查询指定的Topic对应的路由信息,比较路由数据topicRouteData是否发生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。


  1. public class TopicRouteData extends RemotingSerializable {
  2.     private String orderTopicConf;
  3.     // 按照broker维度保存的Queue信息
  4.     private List<QueueData> queueDatas;
  5.     // 按照broker维度保存的broker信息
  6.     private List<BrokerData> brokerDatas;
  7.     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  8. }
  9.  
  10.  
  11. public class QueueData implements Comparable<QueueData> {
  12.     // broker的名称
  13.     private String brokerName;
  14.     // 读队列大小
  15.     private int readQueueNums;
  16.     // 写队列大小
  17.     private int writeQueueNums;
  18.     // 读写权限
  19.     private int perm;
  20.     private int topicSynFlag;
  21. }
  22.  
  23.  
  24. public class BrokerData implements Comparable<BrokerData> {
  25.     // broker所属集群信息
  26.     private String cluster;
  27.     // broker的名称
  28.     private String brokerName;
  29.     // broker对应的ip地址信息
  30.     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
  31.     private final Random random = new Random();
  32. }
  33.  
  34.  
  35. --------------------------------------------------------------------------------------------------
  36.  
  37.  
  38. public class TopicPublishInfo {
  39.     private boolean orderTopic = false;
  40.     private boolean haveTopicRouterInfo = false;
  41.     // 最细粒度的队列信息
  42.     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
  43.     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
  44.     private TopicRouteData topicRouteData;
  45. }
  46.  
  47. public class MessageQueue implements Comparable<MessageQueue>, Serializable {
  48.     private static final long serialVersionUID = 6191200464116433425L;
  49.     // Topic信息
  50.     private String topic;
  51.     // 所属的brokerName信息
  52.     private String brokerName;
  53.     // Topic下的队列信息Id
  54.     private int queueId;
  55. }

路由解析过程:


  • TopicRouteData核心变量QueueData保存每个Broker的队列信息,BrokerData保存Broker的地址信息。


  • TopicPublishInfo核心变量MessageQueue保存最细粒度的队列信息。


  • producer负责将从namesvr获取的TopicRouteData转化为producer本地的TopicPublishInfo。

  1. public class MQClientInstance {
  2.  
  3.     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
  4.  
  5.         TopicPublishInfo info = new TopicPublishInfo();
  6.  
  7.         info.setTopicRouteData(route);
  8.         if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
  9.           // 省略相关代码
  10.         } else {
  11.  
  12.             List<QueueData> qds = route.getQueueDatas();
  13.  
  14.             // 按照brokerName进行排序
  15.             Collections.sort(qds);
  16.  
  17.             // 遍历所有broker生成队列维度信息
  18.             for (QueueData qd : qds) {
  19.                 // 具备写能力的QueueData能够用于队列生成
  20.                 if (PermName.isWriteable(qd.getPerm())) {
  21.                     // 遍历获得指定brokerData进行异常条件过滤
  22.                     BrokerData brokerData = null;
  23.                     for (BrokerData bd : route.getBrokerDatas()) {
  24.                         if (bd.getBrokerName().equals(qd.getBrokerName())) {
  25.                             brokerData = bd;
  26.                             break;
  27.                         }
  28.                     }
  29.                     if (null == brokerData) {
  30.                         continue;
  31.                     }
  32.                     if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
  33.                         continue;
  34.                     }
  35.  
  36.                     // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo
  37.                     for (int i = 0; i < qd.getWriteQueueNums(); i++) {
  38.                         MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
  39.                         info.getMessageQueueList().add(mq);
  40.                     }
  41.                 }
  42.             }
  43.  
  44.             info.setOrderTopic(false);
  45.         }
  46.  
  47.         return info;
  48.     }
  49. }

路由生成过程:


  • 路由生成过程主要是根据QueueData的BrokerName和writeQueueNums来生成MessageQueue 对象。


  • MessageQueue是消息发送过程中选择的最细粒度的可发送消息的队列。


  1. {
  2.     "TBW102": [{
  3.         "brokerName": "broker-a",
  4.         "perm": 7,
  5.         "readQueueNums": 8,
  6.         "topicSynFlag": 0,
  7.         "writeQueueNums": 8
  8.     }, {
  9.         "brokerName": "broker-b",
  10.         "perm": 7,
  11.         "readQueueNums": 8,
  12.         "topicSynFlag": 0,
  13.         "writeQueueNums": 8
  14.     }]
  15. }

路由解析举例:


  • topic(TBW102)在broker-a和broker-b上存在队列信息,其中读写队列个数都为8。


  • 先按照broker-a、broker-b的名字顺序针对broker信息进行排序。


  • 针对broker-a会生成8个topic为TBW102的MessageQueue对象,queueId分别是0-7。


  • 针对broker-b会生成8个topic为TBW102的MessageQueue对象,queueId分别是0-7。


  • topic(名为TBW102)的TopicPublishInfo整体包含16个MessageQueue对象,其中有8个broker-a的MessageQueue,有8个broker-b的MessageQueue。


  • 消息发送过程中的路由选择就是从这16个MessageQueue对象当中获取一个进行消息发送。


3.2 负载均衡过程


  1. public class DefaultMQProducerImpl implements MQProducerInner {
  2.  
  3.     private SendResult sendDefaultImpl(
  4.         Message msg,
  5.         final CommunicationMode communicationMode,
  6.         final SendCallback sendCallback,
  7.         final long timeout
  8.     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  9.         
  10.         // 1、查询消息发送的TopicPublishInfo信息
  11.         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  12.  
  13.         if (topicPublishInfo != null && topicPublishInfo.ok()) {
  14.              
  15.             String[] brokersSent = new String[timesTotal];
  16.             // 根据重试次数进行消息发送
  17.             for (; times < timesTotal; times++) {
  18.                 // 记录上次发送失败的brokerName
  19.                 String lastBrokerName = null == mq ? null : mq.getBrokerName();
  20.                 // 2、从TopicPublishInfo获取发送消息的队列
  21.                 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
  22.                 if (mqSelected != null) {
  23.                     mq = mqSelected;
  24.                     brokersSent[times] = mq.getBrokerName();
  25.                     try {
  26.                         // 3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送
  27.                         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
  28.                         switch (communicationMode) {
  29.                             case SYNC:
  30.                                 if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  31.                                     if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
  32.                                         continue;
  33.                                     }
  34.                                 }
  35.  
  36.                                 return sendResult;
  37.                             default:
  38.                                 break;
  39.                         }
  40.                     } catch (MQBrokerException e) {
  41.                         // 省略相关代码
  42.                     } catch (InterruptedException e) {
  43.                         // 省略相关代码
  44.                     }
  45.                 } else {
  46.                     break;
  47.                 }
  48.             }
  49.  
  50.             if (sendResult != null) {
  51.                 return sendResult;
  52.             }
  53.         }
  54.     }
  55. }

消息发送过程:


  • 查询Topic对应的路由信息对象TopicPublishInfo。


  • 从TopicPublishInfo中通过selectOneMessageQueue获取发送消息的队列,该队列代表具体落到具体的Broker的queue队列当中。


  • 执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送,重新选择队列会避开上一次发送失败的Broker的队列。


  1. public class TopicPublishInfo {
  2.  
  3.     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  4.         if (lastBrokerName == null) {
  5.             return selectOneMessageQueue();
  6.         } else {
  7.             // 按照轮询进行选择发送的MessageQueue
  8.             for (int i = 0; i < this.messageQueueList.size(); i++) {
  9.                 int index = this.sendWhichQueue.getAndIncrement();
  10.                 int pos = Math.abs(index) % this.messageQueueList.size();
  11.                 if (pos < 0)
  12.                     pos = 0;
  13.                 MessageQueue mq = this.messageQueueList.get(pos);
  14.                 // 避开上一次上一次发送失败的MessageQueue
  15.                 if (!mq.getBrokerName().equals(lastBrokerName)) {
  16.                     return mq;
  17.                 }
  18.             }
  19.             return selectOneMessageQueue();
  20.         }
  21.     }
  22. }

路由选择过程:


  • MessageQueue的选择按照轮询进行选择,通过全局维护索引进行累加取模选择发送队列。


  • MessageQueue的选择过程中会避开上一次发送失败Broker对应的MessageQueue。



Producer消息发送示意图


  • 某Topic的队列分布为Broker_A_Queue1、Broker_A_Queue2、Broker_B_Queue1、Broker_B_Queue2、Broker_C_Queue1、Broker_C_Queue2,根据轮询策略依次进行选择。


  • 发送失败的场景下如Broker_A_Queue1发送失败那么就会跳过Broker_A选择Broker_B_Queue1进行发送。


四、consumer消息消费过程


consumer消息消费过程


  • consumer访问namesvr同步topic对应的路由信息。


  • consumer在本地解析远程路由信息并保存到本地。


  • consumer在本地进行Reblance负载均衡确定本节点负责消费的MessageQueue。


  • consumer访问Broker消费指定的MessageQueue的消息。


4.1 路由同步过程


  1. public class MQClientInstance {
  2.  
  3.     // 1、启动定时任务从namesvr定时同步路由信息
  4.     private void startScheduledTask() {
  5.         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  6.  
  7.             @Override
  8.             public void run() {
  9.                 try {
  10.                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();
  11.                 } catch (Exception e) {
  12.                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
  13.                 }
  14.             }
  15.         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
  16.     }
  17.  
  18.     public void updateTopicRouteInfoFromNameServer() {
  19.         Set<String> topicList = new HashSet<String>();
  20.  
  21.         // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息
  22.         {
  23.             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  24.             while (it.hasNext()) {
  25.                 Entry<String, MQConsumerInner> entry = it.next();
  26.                 MQConsumerInner impl = entry.getValue();
  27.                 if (impl != null) {
  28.                     Set<SubscriptionData> subList = impl.subscriptions();
  29.                     if (subList != null) {
  30.                         for (SubscriptionData subData : subList) {
  31.                             topicList.add(subData.getTopic());
  32.                         }
  33.                     }
  34.                 }
  35.             }
  36.         }
  37.  
  38.         for (String topic : topicList) {
  39.             this.updateTopicRouteInfoFromNameServer(topic);
  40.         }
  41.     }
  42.  
  43.     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
  44.         DefaultMQProducer defaultMQProducer) {
  45.  
  46.         try {
  47.             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
  48.                 try {
  49.                     TopicRouteData topicRouteData;
  50.                     if (isDefault && defaultMQProducer != null) {
  51.                         // 省略代码
  52.                     } else {
  53.                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
  54.                     }
  55.  
  56.                     if (topicRouteData != null) {
  57.                         TopicRouteData old = this.topicRouteTable.get(topic);
  58.                         boolean changed = topicRouteDataIsChange(old, topicRouteData);
  59.                         if (!changed) {
  60.                             changed = this.isNeedUpdateTopicRouteInfo(topic);
  61.                         }
  62.  
  63.                         if (changed) {
  64.                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
  65.  
  66.                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
  67.                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
  68.                             }
  69.  
  70.                             // 构建consumer侧的路由信息
  71.                             {
  72.                                 Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  73.                                 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  74.                                 while (it.hasNext()) {
  75.                                     Entry<String, MQConsumerInner> entry = it.next();
  76.                                     MQConsumerInner impl = entry.getValue();
  77.                                     if (impl != null) {
  78.                                         impl.updateTopicSubscribeInfo(topic, subscribeInfo);
  79.                                     }
  80.                                 }
  81.                             }
  82.      
  83.                             this.topicRouteTable.put(topic, cloneTopicRouteData);
  84.                             return true;
  85.                         }
  86.                     }
  87.                 } finally {
  88.                     this.lockNamesrv.unlock();
  89.                 }
  90.             }
  91.         } catch (InterruptedException e) {
  92.         }
  93.  
  94.         return false;
  95.     }
  96. }

路由同步过程


  • 路由同步过程是消息消费者消费消息的前置条件,没有路由的同步就无法感知具体待消费的消息的Broker节点。


  • consumer节点通过定时任务定期从namesvr同步该消费节点订阅的topic的路由信息。


  • consumer通过updateTopicSubscribeInfo将同步的路由信息构建成本地的路由信息并用以后续的负责均衡。


4.2 负载均衡过程


  1. public class RebalanceService extends ServiceThread {
  2.  
  3.     private static long waitInterval =
  4.         Long.parseLong(System.getProperty(
  5.             "rocketmq.client.rebalance.waitInterval", "20000"));
  6.  
  7.     private final MQClientInstance mqClientFactory;
  8.  
  9.     public RebalanceService(MQClientInstance mqClientFactory) {
  10.         this.mqClientFactory = mqClientFactory;
  11.     }
  12.  
  13.     @Override
  14.     public void run() {
  15.  
  16.         while (!this.isStopped()) {
  17.             this.waitForRunning(waitInterval);
  18.             this.mqClientFactory.doRebalance();
  19.         }
  20.  
  21.     }
  22. }


负载均衡过程


  • consumer通过RebalanceService来定期进行重新负载均衡。


  • RebalanceService的核心在于完成MessageQueue和consumer的分配关系。


  1. public abstract class RebalanceImpl {
  2.  
  3.     private void rebalanceByTopic(final String topic, final boolean isOrder) {
  4.         switch (messageModel) {
  5.             case BROADCASTING: {
  6.                 // 省略相关代码
  7.                 break;
  8.             }
  9.             case CLUSTERING: { // 集群模式下的负载均衡
  10.                 // 1、获取topic下所有的MessageQueue
  11.                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  12.  
  13.                 // 2、获取topic下该consumerGroup下所有的consumer对象
  14.                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
  15.                  
  16.                 // 3、开始重新分配进行rebalance
  17.                 if (mqSet != null && cidAll != null) {
  18.                     List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  19.                     mqAll.addAll(mqSet);
  20.  
  21.                     Collections.sort(mqAll);
  22.                     Collections.sort(cidAll);
  23.  
  24.                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
  25.  
  26.                     List<MessageQueue> allocateResult = null;
  27.                     try {
  28.                         // 4、通过分配策略重新进行分配
  29.                         allocateResult = strategy.allocate(
  30.                             this.consumerGroup,
  31.                             this.mQClientFactory.getClientId(),
  32.                             mqAll,
  33.                             cidAll);
  34.                     } catch (Throwable e) {
  35.  
  36.                         return;
  37.                     }
  38.  
  39.                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  40.                     if (allocateResult != null) {
  41.                         allocateResultSet.addAll(allocateResult);
  42.                     }
  43.                     // 5、根据分配结果执行真正的rebalance动作
  44.                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  45.                     if (changed) {
  46.                         this.messageQueueChanged(topic, mqSet, allocateResultSet);
  47.                     }
  48.                 }
  49.                 break;
  50.             }
  51.             default:
  52.                 break;
  53.         }
  54.     }

重新分配流程


  • 获取topic下所有的MessageQueue。


  • 获取topic下该consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。


  • 针对mqAll和cidAll进行排序,mqAll排序顺序按照先BrokerName后BrokerId,cidAll排序按照字符串排序。


  • 通过分配策略


  • AllocateMessageQueueStrategy重新分配。


  • 根据分配结果执行真正的rebalance动作。


  1. public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
  2.     private final InternalLogger log = ClientLogger.getLog();
  3.  
  4.     @Override
  5.     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  6.         List<String> cidAll) {
  7.          
  8.         List<MessageQueue> result = new ArrayList<MessageQueue>();
  9.          
  10.         // 核心逻辑计算开始
  11.  
  12.         // 计算当前cid的下标
  13.         int index = cidAll.indexOf(currentCID);
  14.          
  15.         // 计算多余的模值
  16.         int mod = mqAll.size() % cidAll.size();
  17.  
  18.         // 计算平均大小
  19.         int averageSize =
  20.             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
  21.                 + 1 : mqAll.size() / cidAll.size());
  22.         // 计算起始下标
  23.         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
  24.         // 计算范围大小
  25.         int range = Math.min(averageSize, mqAll.size() - startIndex);
  26.         // 组装结果
  27.         for (int i = 0; i < range; i++) {
  28.             result.add(mqAll.get((startIndex + i) % mqAll.size()));
  29.         }
  30.         return result;
  31.     }
  32.     // 核心逻辑计算结束
  33.  
  34.     @Override
  35.     public String getName() {
  36.         return "AVG";
  37.     }
  38. }
  39.  
  40. ------------------------------------------------------------------------------------
  41.  
  42. rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。
  43.  
  44. rocketMq上存在名为topic_demo的topic,writeQueue写队列数量为3,分布在3个broker。
  45. 排序后的mqAll的大小为9,依次为
  46. [broker_a_0 broker_a_1 broker_a_2 broker_b_0 broker_b_1 broker_b_2 broker_c_0 broker_c_1 broker_c_2]
  47.  
  48. rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为
  49. [192.168.0.6@15956 192.168.0.7@15957 192.168.0.8@15958 192.168.0.9@15959]
  50.  
  51. 192.168.0.6@15956 的分配MessageQueue结算过程
  52. index:0
  53. mod:9%4=1
  54. averageSize:9 / 4 + 1 = 3
  55. startIndex:0
  56. range:3
  57. messageQueue:[broker_a_0、broker_a_1、broker_a_2]
  58.  
  59.  
  60. 192.168.0.6@15957 的分配MessageQueue结算过程
  61. index:1
  62. mod:9%4=1
  63. averageSize:9 / 4 = 2
  64. startIndex:3
  65. range:2
  66. messageQueue:[broker_b_0、broker_b_1]
  67.  
  68.  
  69. 192.168.0.6@15958 的分配MessageQueue结算过程
  70. index:2
  71. mod:9%4=1
  72. averageSize:9 / 4 = 2
  73. startIndex:5
  74. range:2
  75. messageQueue:[broker_b_2、broker_c_0]
  76.  
  77.  
  78. 192.168.0.6@15959 的分配MessageQueue结算过程
  79. index:3
  80. mod:9%4=1
  81. averageSize:9 / 4 = 2
  82. startIndex:7
  83. range:2
  84. messageQueue:[broker_c_1、broker_c_2]

分配策略分析:


  • 整体分配策略可以参考上图的具体例子,可以更好的理解分配的逻辑。


consumer的分配


  • 同一个consumerGroup下的consumer对象会分配到同一个Topic下不同的MessageQueue。


  • 每个MessageQueue最终会分配到具体的consumer当中。


五、RocketMQ指定机器消费设计思路


日常测试环境当中会存在多台consumer进行消费,但实际开发当中某台consumer新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候consumerGroup的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由那台consumer进行消费。当然我们可以通过介入consumer的负载均衡机制来实现指定机器消费。


  1. public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
  2.     private final InternalLogger log = ClientLogger.getLog();
  3.  
  4.     @Override
  5.     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
  6.         List<String> cidAll) {
  7.         
  8.  
  9.         List<MessageQueue> result = new ArrayList<MessageQueue>();
  10.         // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费
  11.         if (!cidAll.contains(currentCID)) {
  12.             return result;
  13.         }
  14.  
  15.         int index = cidAll.indexOf(currentCID);
  16.         int mod = mqAll.size() % cidAll.size();
  17.         int averageSize =
  18.             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
  19.                 + 1 : mqAll.size() / cidAll.size());
  20.         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
  21.         int range = Math.min(averageSize, mqAll.size() - startIndex);
  22.         for (int i = 0; i < range; i++) {
  23.             result.add(mqAll.get((startIndex + i) % mqAll.size()));
  24.         }
  25.         return result;
  26.     }
  27. }

consumer负载均衡策略改写


  • 通过改写负载均衡策略AllocateMessageQueueAveragely的allocate机制保证只有指定IP的机器能够进行消费。


  • 通过IP进行判断是基于RocketMQ的cid格式是192.168.0.6@15956,其中前面的IP地址就是对于的消费机器的ip地址,整个方案可行且可以实际落地。


六、小结


本文主要介绍了RocketMQ在生产和消费过程中的负载均衡机制,结合源码和实际案例力求给读者一个易于理解的技术普及,希望能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未涉及,也有很多技术细节未详细阐述,如有疑问欢迎继续交流。

作者:vivo互联网服务器团队-Wang Zhi

阅读(364) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~