RocketMQ NameServer 核心源码解析
带着问题 往下看 (namesrv)
- 我们在写组件的时候 怎么管理version
- 如果现在让你 维护一个 各个jar包公用的属性
- system.exit(-1); 0 -1 -2 各种数都是干什么的,什么时候 用哪个
- 环境变量如果不想使用 rocketmq_home, 想变为 xxx 这怎么做,能做么?
- 我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改
- 大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?
- 如果我们想 自己设置使用的log 组件,怎么办
- 遍历 field[] 的时候 想跳过 static的属性 怎么写代码?
- 多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写
- kvconfigmanager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?
- kvconfigmanager 怎么保证的 持久化?
- 怎么在 并发操作的时候 保证数据的安全性?
- 方法的参数 使用final 有什么用?
- 怎么判断的broker 是不是master
- netty 怎么让nameserver 通知broker 信息的。
- nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改
- runtime.getruntime().addshutdownhook 有什么用,没有不行么?
- @importantfield 干什么的? 什么时候 使用
- 在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?
- broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?
请思考下 写写你的答案 再往下看
nameserver 启动的逻辑
nameserver 功能
- 管理broker 集群
- 属于注册中心 业务端 和nameserver 进行连接,获取broker地址
- 负责维护broker 连接/心跳/监控
nameserver 问题解答
我们在写组件的时候 怎么管理version
一方面是 在父类的 pom.xml 通过 进行 控制版本,然后 业务端通过
<dependencymanagement>
<dependencies>
<dependency>
<groupid>com.xxx</groupid>
<artifactid>xxx</artifactid>
<version>4.0.0-snapshot</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencymanagement>
这是第一个 ,第二个 是 rocketmq 这种 在common 包下面 新建一个 mqversion 管理版本
这里会有一个问题,那这个版本管理 我用在哪里啊,不用不行么?
- 为了方便测试,测试的时候 可能因为版本有差异 导致的问题。指定version 就没有这个问题了 2 broker 操作也是,(其实一句话 为了之后的版本兼容)比如
if (version < mqversion.version.v3_0_7_snapshot.ordinal()) {
result.setcode(responsecode.system_error);
result.setremark("the client does not support this feature. version="
+ mqversion.getversiondesc(version));
log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}",
remotinghelper.parsechannelremoteaddr(entry.getkey()), mqversion.getversiondesc(version));
return result;
} else if (utilall.isblank(originclientid) || originclientid.equals(clientid)) {
}
如果现在让你 维护一个 各个jar包公用的属性
1 在common包搞一个 公共的实体类 随时用随时取呗,大不了就一个map 然后就put get
2 system.setproperty 底层就是全局 map 进行put get
extends hashtable<object,object>
环境变量如果不想使用 rocketmq_home, 想变为 xxx 这怎么做,能做么?
设置 rocketmq.home.dir=xxx
我们启动broker 老是用 -n ip:9876 9876是什么,我们可以改变么?怎么改
nettyserverconfig.setlistenport(9876);
代码指定 的netty 监听端口,一般情况不改
大家如果想 把命令启动带着的 -c -p等参数放到 我们的属性中,怎么写代码?
mixall.properties2object(serverutil.commandline2properties(commandline), namesrvconfig);
这是先把commandline 转变为 properties 对象,然后调用 namesrvconfig 反射方法 赋值
如果我们想 自己设置使用的log 组件,怎么办
loggercontext lc = (loggercontext) loggerfactory.getiloggerfactory(); joranconfigurator configurator = new joranconfigurator(); configurator.setcontext(lc); lc.reset(); configurator.doconfigure(namesrvconfig.getrocketmqhome() + "/conf/logback_namesrv.xml");
遍历 field[]
遍历 field[]的时候 想跳过 static的属性 怎么写代码?
(field.getmodifiers() & 0x00000008) != 0 如果为true 就是 static false为 非static
多个对象的 属性需要进行聚合到一个对象中,要是你 怎么写
for (entry<object, object> next : from.entryset()) {
object fromobj = next.getvalue(), toobj = to.get(next.getkey());
if (toobj != null && !toobj.equals(fromobj)) {
log.info("replace, key: {}, value: {} -> {}", next.getkey(), toobj, fromobj);
}
to.put(next.getkey(), fromobj);
}
因为 可能同时操作这个对象 导致 数据不一致 ,所以要加上 读写锁的 写锁
kvconfigmanager 有什么作用
kvconfigmanager 有什么作用,怎么保证的 并发操作的数据正确性?你感觉有什么问题么?
是 kv 配置的管理器,主要是
hashmap<string/* namespace */, hashmap<string/* key */, string/* value */>> configtable
以后写map 也要像这种方式 写注释。
//读取的是 ./namesrv/kvconfig.json
kvconfigpath = system.getproperty("user.home") + file.separator + "namesrv" + file.separator + "kvconfig.json";
行吧 ,现在还不知道 这些kv的作用,先看看怎么存储的,到用的时候 我们接上,先知道 kv 存储在kvconfigmanager类 configtable 属性中
putkvconfig 使用的 reentrantreadwritelock 的写锁 保证数据一致性,如果map的key 存在了,不会进行覆盖,而是 跳过。
kvconfigmanager 持久化
kvconfigmanager 怎么保证的 持久化?
执行过 上面的 那些方法,执行 persist ,加读锁,如下图

怎么在 并发操作的时候 保证数据的安全性?
一方面 是 不可变类,其中返回属性的时候 要进行copy 简单来说 就是我通过get 方法出去的 对象 是 copy的对象,而不是 原来的对象,防止 外面通过引用 修改 属性值,把我们的对象 属性 进行修改。
方法的参数 使用final 有什么用?
- 确保,不会也不能对于参数进行修改,保证了调用发起方数据的安全;
- 避免在方法体中修改参数,引起不必要的错误
- 程序员工作不是一个人的工作,你设置为final,别人将来维护的时候一看就知道这个变量不能修改,而不需要去记忆这个是不能变化的值,是常量。这个是代码规范。
broker 是不是master判断
怎么判断的broker 是不是master
//0 == brokerid mixall.master_id == brokerid
这个其实可以 抽出来一个公共的方法, 方便之后的修改
netty 怎么让nameserver 通知broker 信息的。
netty 保存的 channel 到时候用了 直接从map 获取 然后发送消息
nameserver 是否存活的判断标准是什么? 能修改么? 怎么修改
broker_channel_expired_time = 1000 * 60 * 2
static final 写死的,如果 最后一次心跳时间 + 2分钟 都小于system.currenttimemillis() 执行删除操作
- 关闭 netty channel
- brokerlivetable 删除对应的实例
这是一个定时任务 从项目 启动5s之后 ,每10s执行一次,说明 对broker的感知 会有些许 延迟。(最大也就 20s,一般10s以内感知)
runtime.getruntime().addshutdownhook 有什么用,没有不行么?
当程序正常退出,系统调用 system.exit方法或虚拟机被关闭时才会执行添加的shutdownhook线程。其中shutdownhook是一个已初始化但并不有启动的线程,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addshutdownhook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以可通过这些钩子在jvm关闭的时候进行内存清理、资源回收等工作。
@importantfield
@importantfield 干什么的? 什么时候 使用
最后的true 代表 是否只打印关键属性,写@importantfield的 就一定会打,没有这个注解的就不打印了
mixall.printobjectproperties(console, brokerconfig, true);
在同一台计算机上部署多个代理时 想区分日志路径 用哪个参数,调成什么?
isolatelogenable 改为 true
if (brokerconfig.isisolatelogenable()) {
system.setproperty("brokerlogdir", brokerconfig.getbrokername() + "_" + brokerconfig.getbrokerid());
}
if (brokerconfig.isisolatelogenable() && messagestoreconfig.isenabledlegercommitlog()) {
system.setproperty("brokerlogdir", brokerconfig.getbrokername() + "_" + messagestoreconfig.getdlegerselfid());
}
broker 为什么 -p 和 -m 同时有的时候 -m的总是不生效呢?
无论是 -p 还是 -m 都是print 输出,本来就是希望打印日志,然后进程停止。
opt = new option("p", "printconfigitem", false, "print all config item");
opt = new option("m", "printimportantconfig", false, "print important config item");
总结
这些 只是 namestr 的namesrvcontroller 初始化,更多关于rocketmq nameserver 解析的资料请关注萬仟网其它相关文章!
看完文章,还可以扫描下面的二维码下载快手极速版领4元红包
除了扫码领红包之外,大家还可以在快手极速版做签到,看视频,做任务,参与抽奖,邀请好友赚钱)。
邀请两个好友奖最高196元,如下图所示:







