> 文档中心 > rocketmq源码①-NameServer是什么以及他的启动流程

rocketmq源码①-NameServer是什么以及他的启动流程

添加了注释的源码
https://github.com/WangTingYeYe/rocketmq_source

先来一张总体的源码流程:
请添加图片描述

前提

请大家先浏览我前面转载的官网的一些rocketmq的基本概念和架构设计之后再阅读本文。

红圈里面的就是NameServer

NameServer 是什么

  • 官网解释:
    NameServer 充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

  • 白话文:
    生产者和消费者怎么知道每个broker的地址以及broker上消息分别存放在集群中的那台机器上?
    NameServer就是为了解决这个,可以理解为就是一个数据库。
    NameServer集群之间的服务是没有任何的信息交互的

NameServer 启动流程

分析一个中间件的启动入口,第一步应该看他的启动脚本。

mqnamesrv

#!/bin/sh# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.if [ -z "$ROCKETMQ_HOME" ] ; then  ## resolve links - $0 may be a link to maven's home  PRG="$0"  # need this for relative symlinks  while [ -h "$PRG" ] ; do    ls=`ls -ld "$PRG"`    link=`expr "$ls" : '.*-> \(.*\)$'`    if expr "$link" : '/.*' > /dev/null; then      PRG="$link"    else      PRG="`dirname "$PRG"`/$link"    fi  done  saveddir=`pwd`  ROCKETMQ_HOME=`dirname "$PRG"`/..  # make it fully qualified  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`  cd "$saveddir"fiexport ROCKETMQ_HOMEsh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@

runserver.sh

#!/bin/sh# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#===========================================================================================# Java Environment Setting#===========================================================================================error_exit (){    echo "ERROR: $1 !!"    exit 1}[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"export JAVA_HOMEexport JAVA="$JAVA_HOME/bin/java"export BASE_DIR=$(dirname $0)/..export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}#===========================================================================================# JVM Configuration#===========================================================================================# The RAMDisk initializing size in MB on Darwin OS for gc-logDIR_SIZE_IN_MB=600choose_gc_log_directory(){    case "`uname`" in Darwin)     if [ ! -d "/Volumes/RAMDisk" ]; then  # create ram disk on Darwin systems as gc-log directory  DEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/null  diskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/null  echo "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."     fi     GC_LOG_DIR="/Volumes/RAMDisk" ;; *)     # check if /dev/shm exists on other systems     if [ -d "/dev/shm" ]; then  GC_LOG_DIR="/dev/shm"     else  GC_LOG_DIR=${BASE_DIR}     fi ;;    esac}choose_gc_options(){    # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...    # '1' means releases befor Java 9    JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')    if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps"      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"    else      JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"      JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"      JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"    fi}choose_gc_log_directorychoose_gc_optionsJAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"$JAVA ${JAVA_OPT} $@

可以看到 最后就是 执行了
java org.apache.rocketmq.namesrv.NamesrvStartup

启动类

org.apache.rocketmq.namesrv.NamesrvStartup#main

思考

基于NameServer 的功能思考一下我们自己写的话会需要哪些东西?

  • 要读取用户指定的一些配置
  • 开启网络io用于接受生产者、消费者的服务发现。broker的注册
  • 心跳监测剔除不活跃的服务
  • 处理消费者和生产者的服务发现逻辑
  • 处理broker的注册逻辑

要读取用户指定的一些配置

在这里插入图片描述

通过NamesrvConfig和NettyServerConfig两个对象来包装对应的NameServer的配置

这里很明显直接设置了服务端口为9876

下面的代码就是将命令行中指定的配置和配置文件的配置塞到这两个对象里面
在这里插入图片描述

最后将两个对象包装为一个NamesrvController对象,他就是用来代表NameServer的一切

在这里插入图片描述

开启网络io用于接受生产者、消费者的服务发现。broker的注册

在这里插入图片描述

此处初始化了一个NettyRemotingServer从名字上就可以看出这个就是用来与外界进行网络io的组件

在这里插入图片描述
在这里插入图片描述
这里注册了一个DefaultRequestProcessor 处理对象。后面用于处理请求

心跳监测剔除不活跃的服务

在这里插入图片描述

开启了一个线程单独去扫描所有的broker注册表(就是对比的broker发送的最后一次心跳时间)

broker注册、生产消费者的服务发现

前面remotingServer里面注册了一个DefaultRequestProcessor。
跟随代码发现。当nettry服务接收到请求后会交给DefaultRequestProcessor来处理

在这里插入图片描述

根据req.code 来分发请求处理逻辑。

broker注册的处理逻辑
在这里插入图片描述

总结:

在这里插入图片描述

从整个流程中我们可以提取出一些关键信息:

  • 通过RemotingServer (nettry服务)来接受请求
  • 接受到请求后会通过DefaultRequestProcessor根据请求code分发处理
  • RouteInfoMapper 管理了路由信息,保存了注册上来的broker信息
  • NamesrvConfig、NettyServerConfig 保存了网络和NettyServer的核心配置
  • NamesrvController 是整个NameServer的核心。整合了上面的一切