> 文档中心 > rocketmq源码③-Producer的启动、发送消息、路由broker

rocketmq源码③-Producer的启动、发送消息、路由broker

添加了注释的源码
https://github.com/WangTingYeYe/rocketmq_source

前提介绍:

一定要先看前面的几篇文章,了解rocketmq的基本概念和架构设计之后再看本篇
在这里插入图片描述

Producer 分类

Producer有两种。 一个是普通发送者DefaultMQProducer。这个只需要构建一个Netty客户端。
还一个是事务消息发送者: TransactionMQProducer。这个需要构建一个Netty客户端同时也要构建Netty服务端(用于接受broker的回查)。

DefaultMQProducer 启动

官方提供的Producer例子

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/ * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. */public class Producer {    public static void main(String[] args) throws MQClientException, InterruptedException { /*  * Instantiate with a producer group name.  */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /*  * Specify name server addresses.  * 

* * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR *

  * {@code  * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");  * }  * 

*/ producer.setNamesrvAddr("127.0.0.1:9876"); /* * Launch the instance. */ producer.start(); for (int i = 0; i < 2; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h msg.setDelayTimeLevel(2); /* * Call send message to deliver message to one of brokers. */ SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * Shut down once the producer instance is not longer in use. */ producer.shutdown(); }}

producer.start()

  • producer.start()
    • org.apache.rocketmq.client.producer.DefaultMQProducer#start
      • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
        • org.apache.rocketmq.client.impl.factory.MQClientInstance#start
 public void start() throws MQClientException { synchronized (this) {     switch (this.serviceState) {  case CREATE_JUST:      this.serviceState = ServiceState.START_FAILED;      // If not specified,looking address from name server      if (null == this.clientConfig.getNamesrvAddr()) {   this.mQClientAPIImpl.fetchNameServerAddr();      }      // Start request-response channel      // 启动了nettry客户端      this.mQClientAPIImpl.start();      // Start various schedule tasks      this.startScheduledTask();      // Start pull service 消费者才会用到      this.pullMessageService.start();      // Start rebalance service      //K2 客户端负载均衡 消费者才会用到      this.rebalanceService.start();      // Start push service 消费者才会用到   this.defaultMQProducer.getDefaultMQProducerImpl().start(false);      log.info("the client factory [{}] start OK", this.clientId);      this.serviceState = ServiceState.RUNNING;      break;  case START_FAILED:      throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);  default:      break;     } }    }

启动的定时任务this.startScheduledTask()

通过这些定时任务完成了很多核心的功能

private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) {     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  @Override  public void run() {      try {   // 更新nameServer地址。   MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();      } catch (Exception e) {   log.error("ScheduledTask fetchNameServerAddr exception", e);      }  }     }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {     @Override     public void run() {  try {      //从nameServer更新主题路由信息      MQClientInstance.this.updateTopicRouteInfoFromNameServer();  } catch (Exception e) {      log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);  }     } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {     @Override     public void run() {  try {      //清除下线的broker      MQClientInstance.this.cleanOfflineBroker();      //向所有broker发送心跳,并记录broker的版本      MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();  } catch (Exception e) {      log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);  }     } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {     @Override     public void run() {  try {      //将消费进度向Broker同步      MQClientInstance.this.persistAllConsumerOffset();  } catch (Exception e) {      log.error("ScheduledTask persistAllConsumerOffset exception", e);  }     } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {     @Override     public void run() {  try {      MQClientInstance.this.adjustThreadPool();  } catch (Exception e) {      log.error("ScheduledTask adjustThreadPool exception", e);  }     } }, 1, 1, TimeUnit.MINUTES);    }

producer.send(msg)

  • SendResult sendResult = producer.send(msg);
    • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
      • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
        • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo(查找topic信息)
        • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(选择topic下的一个队列)
        • org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl(真正去发送消息)

发送的核心
1、找到topic
2、找到对应的队列
3、通过nettry发送消息

  //K1 Producer发送消息的实现类    private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; //生产者获取Topic的公开信息,注意下有哪些信息。重点关注怎么选择MessageQueue的。 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) {     boolean callTimeout = false;     MessageQueue mq = null;     Exception exception = null;     SendResult sendResult = null;     int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;     int times = 0;     String[] brokersSent = new String[timesTotal];     for (; times < timesTotal; times++) {  String lastBrokerName = null == mq ? null : mq.getBrokerName();  //K2 Producer计算把消息发到哪个MessageQueue中。  MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  if (mqSelected != null) {      mq = mqSelected;      //根据MessageQueue去获取目标节点的信息。      brokersSent[times] = mq.getBrokerName();      try {   beginTimestampPrev = System.currentTimeMillis();   if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));   }   long costTime = beginTimestampPrev - beginTimestampFirst;   if (timeout < costTime) {callTimeout = true;break;   }   //实际发送消息的方法   sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);   endTimestamp = System.currentTimeMillis();   this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   switch (communicationMode) {case ASYNC:    return null;case ONEWAY:    return null;case SYNC:    if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {     continue; }    }    return sendResult;default:    break;   }      } catch (RemotingException e) {   endTimestamp = System.currentTimeMillis();   this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   log.warn(msg.toString());   exception = e;   continue;      } catch (MQClientException e) {   endTimestamp = System.currentTimeMillis();   this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   log.warn(msg.toString());   exception = e;   continue;      } catch (MQBrokerException e) {   endTimestamp = System.currentTimeMillis();   this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   log.warn(msg.toString());   exception = e;   switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:    continue;default:    if (sendResult != null) { return sendResult;    }    throw e;   }      } catch (InterruptedException e) {   endTimestamp = System.currentTimeMillis();   this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   log.warn(msg.toString());   log.warn("sendKernelImpl exception", e);   log.warn(msg.toString());   throw e;      }  } else {      break;  }     }     if (sendResult != null) {  return sendResult;     }     String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",  times,  System.currentTimeMillis() - beginTimestampFirst,  msg.getTopic(),  Arrays.toString(brokersSent));     info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);     MQClientException mqClientException = new MQClientException(info, exception);     if (callTimeout) {  throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");     }     if (exception instanceof MQBrokerException) {  mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());     } else if (exception instanceof RemotingConnectException) {  mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);     } else if (exception instanceof RemotingTimeoutException) {  mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);     } else if (exception instanceof MQClientException) {  mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);     }     throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),     null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }

找到 topic信息

核心就是 本地有缓存,先从缓存拿如果拿不到去nameServer上去拿。

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) {     this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());     //Producer向NameServer获取更新Topic的路由信息。     this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);     //还是从本地缓存中寻找Topic路由信息。     topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {     return topicPublishInfo; } else {     this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);     topicPublishInfo = this.topicPublishInfoTable.get(topic);     return topicPublishInfo; }    }

选择一个队列

默认就是递增取模轮询的去选择

#org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String)public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) {     return selectOneMessageQueue(); } else {     int index = this.sendWhichQueue.getAndIncrement();     for (int i = 0; i < this.messageQueueList.size(); i++) {  int pos = Math.abs(index++) % this.messageQueueList.size();  if (pos < 0)      pos = 0;  MessageQueue mq = this.messageQueueList.get(pos);  if (!mq.getBrokerName().equals(lastBrokerName)) {      return mq;  }     }     return selectOneMessageQueue(); }    }    //选择MessageQueue的方式: 递增取模    public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0)     pos = 0; return this.messageQueueList.get(pos);    }

发送消息

1、根据 queue 找到一个broker地址,因为queue上是保存了自己存在那个broker的
2、this.mQClientFactory.getMQClientAPIImpl().sendMessage() 网络请求发送消息数据

 private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); //寻找Broker地址。找不到就去NameServer上获取。 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) {     tryToFindTopicPublishInfo(mq.getTopic());     brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } //后面一大堆的构建请求参数代码,就不用太关注了。 SendMessageContext context = null; if (brokerAddr != null) {     brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);     byte[] prevBody = msg.getBody();     try {  //for MessageBatch,ID has been set in the generating process  if (!(msg instanceof MessageBatch)) {      MessageClientIDSetter.setUniqID(msg);  }  boolean topicWithNamespace = false;  if (null != this.mQClientFactory.getClientConfig().getNamespace()) {      msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());      topicWithNamespace = true;  }  int sysFlag = 0;  boolean msgBodyCompressed = false;  if (this.tryToCompressMessage(msg)) {      sysFlag |= MessageSysFlag.COMPRESSED_FLAG;      msgBodyCompressed = true;  }  final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {      sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;  }  if (hasCheckForbiddenHook()) {      CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();      checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());      checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());      checkForbiddenContext.setCommunicationMode(communicationMode);      checkForbiddenContext.setBrokerAddr(brokerAddr);      checkForbiddenContext.setMessage(msg);      checkForbiddenContext.setMq(mq);      checkForbiddenContext.setUnitMode(this.isUnitMode());      this.executeCheckForbiddenHook(checkForbiddenContext);  }  if (this.hasSendMessageHook()) {      context = new SendMessageContext();      context.setProducer(this);      context.setProducerGroup(this.defaultMQProducer.getProducerGroup());      context.setCommunicationMode(communicationMode);      context.setBornHost(this.defaultMQProducer.getClientIP());      context.setBrokerAddr(brokerAddr);      context.setMessage(msg);      context.setMq(mq);      context.setNamespace(this.defaultMQProducer.getNamespace());      String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);      if (isTrans != null && isTrans.equals("true")) {   context.setMsgType(MessageType.Trans_Msg_Half);      }      if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {   context.setMsgType(MessageType.Delay_Msg);      }      this.executeSendMessageHookBefore(context);  }  SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();  requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());  requestHeader.setTopic(msg.getTopic());  requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());  requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());  requestHeader.setQueueId(mq.getQueueId());  requestHeader.setSysFlag(sysFlag);  requestHeader.setBornTimestamp(System.currentTimeMillis());  requestHeader.setFlag(msg.getFlag());  requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));  requestHeader.setReconsumeTimes(0);  requestHeader.setUnitMode(this.isUnitMode());  requestHeader.setBatch(msg instanceof MessageBatch);  if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {      String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);      if (reconsumeTimes != null) {   requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));   MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);      }      String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);      if (maxReconsumeTimes != null) {   requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));   MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);      }  }  SendResult sendResult = null;  switch (communicationMode) {      case ASYNC:   Message tmpMessage = msg;   boolean messageCloned = false;   if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);   }   if (topicWithNamespace) {if (!messageCloned) {    tmpMessage = MessageAccessor.cloneMessage(msg);    messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));   }   long costTimeAsync = System.currentTimeMillis() - beginStartTime;   if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");   }   sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);   break;      case ONEWAY:      case SYNC:   long costTimeSync = System.currentTimeMillis() - beginStartTime;   if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");   }   sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);   break;      default:   assert false;   break;  }  //消息发送完成后执行钩子程序。  if (this.hasSendMessageHook()) {      context.setSendResult(sendResult);      this.executeSendMessageHookAfter(context);  }  return sendResult;     } catch (RemotingException e) {  if (this.hasSendMessageHook()) {      context.setException(e);      this.executeSendMessageHookAfter(context);  }  throw e;     } catch (MQBrokerException e) {  if (this.hasSendMessageHook()) {      context.setException(e);      this.executeSendMessageHookAfter(context);  }  throw e;     } catch (InterruptedException e) {  if (this.hasSendMessageHook()) {      context.setException(e);      this.executeSendMessageHookAfter(context);  }  throw e;     } finally {  msg.setBody(prevBody);  msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));     } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }

怎么拉去topic信息

回到上面启动的时候,启动了好几个定时任务

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {     @Override     public void run() {  try {      //从nameServer更新主题路由信息      MQClientInstance.this.updateTopicRouteInfoFromNameServer();  } catch (Exception e) {      log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);  }     } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

定时从nameServer更新主题信息并缓存在本地

1、向nameServer 发送网络请求获取 producer关心的topic信息
2、缓存 topic信息 缓存broker网络地址

  • org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer()
    • org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
      • org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
  public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try {     if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {  try {      TopicRouteData topicRouteData;      if (isDefault && defaultMQProducer != null) {   // 发起网络请求从NameServer 获取指定Topic的信息   topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);   if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());    data.setReadQueueNums(queueNums);    data.setWriteQueueNums(queueNums);}   }      } else {   topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);      }      if (topicRouteData != null) {   // 更新本地topic缓存   TopicRouteData old = this.topicRouteTable.get(topic);   boolean changed = topicRouteDataIsChange(old, topicRouteData);   if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);   } else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);   }   if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();// 更新本地broker缓存for (BrokerData bd : topicRouteData.getBrokerDatas()) {    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);    publishInfo.setHaveTopicRouterInfo(true);    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();    while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) {     impl.updateTopicPublishInfo(topic, publishInfo); }    }}// Update sub info{    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();    while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) {     impl.updateTopicSubscribeInfo(topic, subscribeInfo); }    }}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;   }      } else {   log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);      }  } catch (MQClientException e) {      if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {   log.warn("updateTopicRouteInfoFromNameServer Exception", e);      }  } catch (RemotingException e) {      log.error("updateTopicRouteInfoFromNameServer Exception", e);      throw new IllegalStateException(e);  } finally {      this.lockNamesrv.unlock();  }     } else {  log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);     } } catch (InterruptedException e) {     log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false;    }

至此基本就可以串起来了

在这里插入图片描述