Java essay Java essay
首页
  • Java基础
  • Java进阶
  • 设计模式
  • Java你不知道的小事
  • Spring初识
  • Spring进阶
  • SpringBoot基础
  • SpringBoot进阶
  • 什么是微服务
  • SpringCloud全家桶
  • Dubbo
  • SpringCloud Alibaba
  • Zookeeper
  • Nginx
  • RabbitMQ
  • RocketMQ
  • Docker入门到精通
  • 性能分析工具
  • 数据库性能优化
  • 性能优化
  • Java基础面试必问
  • JavaWeb面试必问
  • Java框架面试必问
  • 数据库面试必问
  • 中间件面试必问
  • 分布式微服务面试必问
  • Linux面试必问
  • 计算机网络面试必问
  • 开放性问题面试问必问
  • 简介
  • 联系我

Mr.Fire

后端程序员一枚
首页
  • Java基础
  • Java进阶
  • 设计模式
  • Java你不知道的小事
  • Spring初识
  • Spring进阶
  • SpringBoot基础
  • SpringBoot进阶
  • 什么是微服务
  • SpringCloud全家桶
  • Dubbo
  • SpringCloud Alibaba
  • Zookeeper
  • Nginx
  • RabbitMQ
  • RocketMQ
  • Docker入门到精通
  • 性能分析工具
  • 数据库性能优化
  • 性能优化
  • Java基础面试必问
  • JavaWeb面试必问
  • Java框架面试必问
  • 数据库面试必问
  • 中间件面试必问
  • 分布式微服务面试必问
  • Linux面试必问
  • 计算机网络面试必问
  • 开放性问题面试问必问
  • 简介
  • 联系我
  • Zookeeper

  • RabbitMq

  • RocketMQ

    • RocketMQ单机和集群搭建
      • RocketMQ简介
      • RocketMQ安装
        • 下载地址
        • 系统要求
        • 准备工作
      • 单机模式安装
        • 启动nameserver
        • 启动broker
        • 测试
        • 关闭消息队列
        • 问题
        • 解决方案
      • 集群搭建
        • 启动NameServer
        • 启动broker
        • 修改配置文件
        • 启动四个broker
        • 问题
        • 解决方案
        • 测试
        • 生产者代码:
        • 消费者代码
        • 测试结果
      • 可视化管理平台
  • 中间件
  • RocketMQ
Mr.Fire
2022-12-09
目录

RocketMQ单机和集群搭建

# RocketMQ简介

RocketMQ是一个开源的分布式消息和流数据平台,由阿里研发目前属于apache顶级项目:https://rocketmq.apache.org/ (opens new window) RocketMQ消息队列主要功能功能:引用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容

# RocketMQ安装

# 下载地址

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip (opens new window)

# 系统要求

64位 Linux、unix或mac;
JDK版本1.8以上;
1
2

# 准备工作

由于下载的是zip压缩格式文件,因此在linux上安装unzip来进行解压

yum install -y unzip
1

使用unzip命令解压

unzip rocketmq-all-4.5.1-bin-release.zip
1

# 单机模式安装

# 启动nameserver

两种启动方式

  1. 进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入./mqnamesrv进行启动
[root@node1 bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
1
2
3
4
  1. 命令行提示启动成功

进入解压目录:/rocketmq-all-4.5.1-bin-release/bin,输入指令nohup sh bin/mqnamesrv &

[root@node1 bin]# nohup sh ./mqnamesrv &
[1] 68705
1
2

命令行显示启动的进程号,查看启动日志

namesrv.log 
...
The Name Server boot success. serializeType=JSON
1
2
3

日志提示启动成功

# 启动broker

两种启动方式:

  1. 直接运行mqbroker
./mqbroker -n localhost:9876
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
1
2

或者

./mqbroker
The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON
1
2

提示启动成功

  1. 通过nohup指令启动
nohup sh ./mqbroker -n localhost:9876 &
[2] 69248
1
2

查看broker启动日志

tail -f ~/logs/rocketmqlogs/broker.log
...
INFO main - The broker[node1, 172.17.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
1
2
3

提示启动成功

# 测试

运行源文件中以写好的测试demo

export NAMESRV_ADDR=localhost:9876
# 生产者
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=AC110001104B2B193F2D6EA72CC003E7...
# 消费者
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=3, storeSize=179...
1
2
3
4
5
6
7

单点消息队列启动成功

# 关闭消息队列

通过mqshutdown命令关闭消息队列,依次关闭nameserver和broker

sh mqshutdown broker
The mqbroker(69255) is running...
Send shutdown request to mqbroker(69255) OK
1
2
3
sh mqshutdown namesrv
The mqnamesrv(68711) is running...
Send shutdown request to mqnamesrv(68711) OK
1
2
3

# 问题

broker内存不够

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /component/rocketmq-all-4.5.1-bin-release/bin/hs_err_pid68880.log
1
2
3
4
5
6

# 解决方案

报错原因是虚拟机启动内存不够,修改bin下的服务启动脚本 runserver.sh 、runbroker.sh 中对于内存的限制,

runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
1
2
3
4
5

改成如下示例:

runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"
1
2
3
4
5

# 集群搭建

通过两台虚拟机(192.168.108.128和192.168.108.129)模拟集群

# 启动NameServer

分别启动两台虚拟机的NameServer,启动方式与单机模式相同

# 启动broker

两台虚拟机的broker互为主备,即各自启动一个Master和一个Slave,在启动broker之前需要修改相关配置文件

# 修改配置文件

在目录conf/2m-2s-sync中(两主两从,同步更新),该目录下有四个配置文件,分别对应两个Master和两个Slave配置

├── broker-a.properties		#Master-a配置
├── broker-a-s.properties	#Slave-a配置
├── broker-b.properties		#Master-b配置
└── broker-b-s.properties	#Slave-b配置
1
2
3
4

在这里我们需要修改192.168.108.128的broker-a.properties和broker-b-s.properties,即128这台机器是名字为broker-a的broker的主节点,是名称为broker-b的broker的从节点;相对应的192.168.108.129是名字为broker-a的broker的从节点,是名称为broker-b的broker的主节点,因此修改129机器的broker-b.properties和broker-a-s.properties配置文件。修改如下:

修改机器1的配置文件(192.168.108.128)

master:broker-a.properties
namesrvAddr=192.168.108.128:9876;192.168.108.129:9876	#配置nameserver地址
listenPort=10911									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-a									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER								 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							 #刷盘策略
1
2
3
4
5
6
7
8
9
10
11

slave:broker-b-s.properties

namesrvAddr=192.168.108.128:9876;192.168.108.129:9876	#配置nameserver地址
listenPort=11011									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-b									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE									 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							 #刷盘策略
1
2
3
4
5
6
7
8
9
10

修改机器2的配置文件(192.168.108.129)

master:broker-b.properties
namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876	#配置nameserver地址	
listenPort=10911									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-b				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-b									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=0											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SYNC_MASTER								 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							 #刷盘策略
1
2
3
4
5
6
7
8
9
10
11

slave:broker-a-s.properties

namesrvAddr=192.168.108.128:9876; 192.168.108.129:9876	#配置nameserver地址	
listenPort=11011									 #broke监听端口号
storePathRootDir=/home/rocketmq/store-a				   #存储消息和配置信息路径
brokerClusterName=DefaultCluster					  #集群名称
brokerName=broker-a									 #broke名称,集群中master和slave通过相同的broke名来进行关联
brokerId=1											#0表示master,大于0为slave对应ID
deleteWhen=04										#删除消息时间,04表示凌晨4点
fileReservedTime=48									 #磁盘上保存消息的时长,单位小时
brokerRole=SLAVE									 #broke角色,有三种角色:SYNC_MASTER、ASYNC_MASTER、SLAVE
flushDiskType=ASYNC_FLUSH							  #刷盘策略
1
2
3
4
5
6
7
8
9
10

# 启动四个broker

nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b-s.properties & 

nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-b.properties &
nohup sh ./bin/mqbroker -c ./conf/2m-2s-sync/broker-a-s.properties & 
1
2
3
4
5
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave consumer offset from master, 192.168.108.128:10911
2019-07-25 14:00:19 INFO BrokerControllerScheduledThread1 - Update slave delay offset from master, 192.168.108.128:10911
2019-07-25 14:00:20 INFO brokerOutApi_thread_2 - register broker[0]to name server 192.168.108.129:9876 OK
2019-07-25 14:00:20 INFO brokerOutApi_thread_1 - register broker[0]to name server 192.168.108.128:9876 OK

1
2
3
4
5

名两行出现broker注册到nameserver成功,则集群搭建成功

# 问题

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.2:10911> failed
1

# 解决方案

出现该问题是因为虚拟机中搭建了docker产生了虚拟网卡过多,RocketMQ在访问docker网络时不通产生问题。 首先关闭docker服务

systemctl stop docker
systemctl is-enabled  docker         #查询是否自启动
systemctl disable  docker            #禁止自启动
1
2
3

然后重启虚拟机即可解决

# 测试

通过java代码对集群进行测试

# 生产者代码:

public class producer {
    public static void main(String[] args) throws Exception {
    	//创建生产者,并设置group
        DefaultMQProducer producer = new DefaultMQProducer("group");
        //设置nameserver
        producer.setNamesrvAddr("192.168.108.128:9876");
        //启动生产者
        producer.start();
        //创建发送的消息,发送100条数据
        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest", "TagA",("MQ Test"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送
            SendResult result = producer.send(msg);
            System.out.println(result);
        }
		//关闭生产者
        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 消费者代码

public class consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
        consumer.setNamesrvAddr("192.168.108.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(Thread.currentThread().getName() + " Receive Message: " + list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        System.out.println("consumer start...");
        consumer.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 测试结果

生产者

消费者

从控制台可以看到,在消费者端显示消息发送成功的数据,sendStatus均为ok,消费者端显示读取出来对应的消息,集群搭建成功

# 可视化管理平台

RocketMQ拥有一个可视化的管理平台,可以通过图形界面的方式查看集群情况、topic、生产者、消费者等具体信息地址: https://github.com/apache/rocketmq-externals (opens new window),项目拉下来后进入目录rocketmq-console 项目使用springboot搭建,在配置文件application.properties中修改配置rocketmq.config.namesrvAddr

rocketmq.config.namesrvAddr=192.168.108.128:9876;192.168.108.129:9876
1

启动项目,运行App.java。访问localhost:8080即可进入管理平台

最后更新时间: 2022/12/25, 22:09:58
高级消费队列RabbitMq入门

← 高级消费队列RabbitMq入门

最近更新
01
分布式系统核心理论CAP和BASE理论
03-05
02
分布式锁的几种实现方式
03-05
03
一文详解ThreadLocal是什么
03-01
更多文章>
Theme by Vdoing | Copyright © 2021-2023 Mr.Fire | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式