1. RocketMQ整体架构
在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了解一下RocketMQ的组件,然后基于当前主流的Docker安装RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故障、保障高可用,生产环境建议安装RocketMQ集群。
-
NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。
-
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
-
Producer消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的 Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
-
Consumer消息消费的角色,支持分布式集群方式部署。支持以push、pull两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
参考地址:https://github.com/apache/rocketmq/tree/master/docs/cn
2. 下载RocketMQ镜像
2.1. 选择镜像
~]# docker search rocketmq
2.2. 拉取镜像
~]# docker pull apache/rocketmq
2.3. 查看镜像
~]# docker images
3. 创建namesrv服务
3.1. 创建 namesrv 挂载目录
在宿主机上创建挂载目录用于挂载容器内部数据、配置文件、以及日志。(即RocketMQ运行时需要持久化到磁盘的数据)
mkdir -p /usr/local/rocketmq/data/namesrv/{logs,store}
3.2. 构建namesrv容器
~]# docker run -d \
--restart=always \
--name rmqnamesrv \
--privileged=true \
-p 9876:9876 \
-v /usr/local/rocketmq/data/namesrv/logs:/root/logs \
-v /usr/local/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
apache/rocketmq \
sh mqnamesrv
注意:通过 docker 的 -v 参数使用 volume 功能,把你本地的目录映射到容器内的目录上。否则所有数据都默认保存在容器运行时的内存中,重启之后就又回到最初的起点。(下同)
4. 创建broker节点
4.1 创建broker挂载目录
在宿主机上创建挂载目录用于挂载容器内部数据、配置文件、以及日志。
mkdir -p /usr/local/rocketmq/{data,conf}
mkdir -p /usr/local/rocketmq/data/broker/{logs,store}
4.2. 创建broker配置文件
{
echo "brokerClusterName = DefaultCluster"
echo "brokerName = broker-a"
echo "brokerId = 0"
echo "deleteWhen = 04"
echo "fileReservedTime = 48"
echo "brokerRole = ASYNC_MASTER"
echo "flushDiskType = ASYNC_FLUSH"
echo "brokerIP1 = xxx.xxx.xxx.xxx"
echo "diskMaxUsedSpaceRatio=95"
} > /usr/local/rocketmq/conf/broker.conf
备注:broken配置参数见附录C说明.
注意:brokerIP1是当前broker监听的IP,设置宿主机IP,不要使用docker 内部IP。
4.3. 构建broker容器
~]# docker run -d \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
--privileged=true \
-p 10911:10911 \
-p 10912:10912 \
-p 10909:10909 \
-v /usr/local/rocketmq/data/broker/logs:/root/logs \
-v /usr/local/rocketmq/data/broker/store:/root/store \
-v /usr/local/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-e "MAX_POSSIBLE_HEAP=200000000" \
apache/rocketmq \
sh mqbroker -c /opt/rocketmq/conf/broker.conf
5. 创建rockermq-console服务(rocketmq 控制台)
5.1. 拉取rockermq-console镜像
~]# docker pull styletang/rocketmq-console-ng
5.2. 构建rockermq-console容器
重点:JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx 中的addr是宿主机的IP地址
~]# docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8081:8080 \
--ulimit nofile=1024 \
styletang/rocketmq-console-ng:latest
6. 启动mqnamesrv 和 mqbroker
6.1 启动mqnamesrv
~]# docker start rmqnamesrv
6.2 启动mqbroker
~]# docker start rmqbroker
7. 查看控制台信息
访问宿主机IP+端口号(rockermq-console中映射的端口号)
8. 常见问题
附录
附录A. 相关联的文章
附录B. 参考
附录C. broker配置参数
-
通用配置参数
brokerClusterName=default-cluster brokerName=broker-b brokerId=0 #- ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE brokerRole=SYNC_MASTER namesrvAddr=100.100.100.100:9876;100.100.100.101:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=75 #存储路径 storePathRootDir=/data/mq/rocketmq4.3/store #commitLog 存储路径 storePathCommitLog=/data/mq/rocketmq4.3/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/data/mq/rocketmq4.3/store/consumequeue #消息索引存储路径 storePathIndex=/data/mq/rocketmq4.3/store/index #checkpoint 文件存储路径 storeCheckpoint=/data/mq/rocketmq4.3/store/checkpoint #abort 文件存储路径 abortFile=/data/mq/rocketmq4.3/store/abort #限制的消息大小 maxMessageSize=65536 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true
-
更多配置参数
#是否允许 Broker 自动创建订 阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup true brokerName broker-a #默认为listenPort+1 haListenPort 10912 clientManagerThreadPoolQueueCapacity 1000000 flushCommitLogThoroughInterval 10000 flushCommitLogLeastPages 4 #客户端处理网络请求线程数 clientCallbackExecutorThreads Runtime.getRuntime().availableProcessors() notifyConsumerIdsChangedEnable true expectConsumerNumUseFilter 32 cleanResourceInterval 10000 channelNotActiveInterval 60000 #检查物理文件磁盘空间 diskMaxUsedSpaceRatio 75 debugLockEnable false #延迟等级,可扩展 messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h clusterTopicEnable true messageIndexEnable true serverPooledByteBufAllocatorEnable true shortPollingTimeMills 1000 commercialEnable true redeleteHangedFileInterval 120000 flushConsumerOffsetInterval 5000 #false实时刷盘,true定时刷盘 flushCommitLogTimed false #允许发送的最大消息体大小 maxMessageSize 4194304 brokerId 0 syncFlushTimeout 5000 flushConsumeQueueThoroughInterval 60000 #无读写客户端存活时间 clientChannelMaxIdleTimeSeconds 120 flushDelayOffsetInterval 10000 serverSocketRcvBufSize 131072 #刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 flushDiskType ASYNC_FLUSH #单次 Pull 消息(内存)传输的 最大字节数 maxTransferBytesOnMessageInMemory 262144 clientManageThreadPoolNums 32 serverChannelMaxIdleTimeSeconds 120 serverCallbackExecutorThreads 0 enablePropertyFilter false transientStorePoolSize 5 enableConsumeQueueExt false #单次 Pull 消息(磁盘)传输的 最大字节数 maxTransferBytesOnMessageInDisk 65536 #推送消息线程数 pullMessageThreadPoolNums 16 + Runtime.getRuntime().availableProcessors() * 2 clientCloseSocketIfTimeout false #是否从 web 服务器获取 Name Server 地址 fetchNamesrvAddrByAddressServer false sendThreadPoolQueueCapacity 10000 diskFallRecorded true transientStorePoolEnable false offsetCheckInSlave false disableConsumeIfConsumerReadSlowly false commitCommitLogThoroughInterval 200 consumerManagerThreadPoolQueueCapacity 1000000 flushIntervalConsumeQueue 1000 clientOnewaySemaphoreValue 65535 warmMapedFileEnable false slaveReadEnable false transferMsgByHeap true consumerFallbehindThreshold 17179869184 serverAsyncSemaphoreValue 64 startAcceptSendRequestTimeStamp 0 flushConsumerOffsetHistoryInterval 60000 brokerIP2 192.168.59.96 filterDataCleanTimeSpan 86400000 #单次 Pull 消息(磁盘)传输的 最大条数 maxTransferCountOnMessageInDisk 8 #本机 IP 地址 brokerIP1 192.168.59.96 deleteCommitLogFilesInterval 100 adminBrokerThreadPoolNums 16 #commitLog 存储路径 storePathCommitLog /data/xxx/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue /data/xxx/store/consumequeue #消息索引存储路径 storePathIndex /data/xxx/store/index #checkpoint 文件存储路径 storeCheckpoint /data/xxx/store/checkpoint #abort 文件存储路径 abortFile=/data/xxx/store/abort filterServerNums 0 deleteConsumeQueueFilesInterval 100 checkCRCOnRecover true serverOnewaySemaphoreValue 256 filterSupportRetry false defaultQueryMaxNum 32 clientSocketRcvBufSize 131072 clientWorkerThreads 4 maxDelayTime 40 connectTimeoutMillis 3000 commercialTimerCount 1 clientPooledByteBufAllocatorEnable false serverSocketSndBufSize 131072 regionId DefaultRegion duplicationEnable false #磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用;FALSE 标记服务不可用,文件 不删除 cleanFileForciblyEnable true fastFailIfNoBufferInStorePool false #是否拒绝事务消息接入 rejectTransactionMessage false serverSelectorThreads 3 consumerManageThreadPoolNums 32 haSendHeartbeatInterval 5000 mapedFileSizeConsumeQueue 6000000 commitCommitLogLeastPages 4 longPollingEnable true flushConsumeQueueLeastPages 2 storePathRootDir C:UsersAdministratorstore #默认topic队列数 defaultTopicQueueNums 8 highSpeedMode false commercialBaseCount 1 maxErrorRateOfBloomFilter 20 accessMessageInMemoryMaxRatio 40 #是否允许 Broker 自动创建 Topic,建议线下开启,线上 关闭 autoCreateTopicEnable true commitIntervalCommitLog 200 brokerTopicEnable true namesrvAddr 127.0.0.1:9876 clientAsyncSemaphoreValue 65535 maxMsgsNumBatch 64 #文件保留时间,默认 48 小时 fileReservedTime 48 #删除文件时间点,默认凌晨 4点 deleteWhen 04 waitTimeMillsInSendQueue 200 commercialTransCount 1 osPageCacheBusyTimeOutMills 1000 maxIndexNum 20000000 registerBrokerTimeoutMills 6000 #是否提供安全的消息索引机 制,索引保证不丢 messageIndexSafe false putMsgIndexHightWater 600000 #Broker 对外服务的监听端口 listenPort 10911 serverWorkerThreads 8 clientSocketSndBufSize 131072 traceOn true maxHashSlotNum 5000000 bitMapLengthConsumeQueueExt 112 #- ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE brokerRole ASYNC_MASTER rocketmqHome D:SoftwareDevJavaDevapache-rocketmq #默认使用自旋锁,true使用重入锁 useReentrantLockWhenPutMessage false haHousekeepingInterval 20000 brokerPermission 6 #单次 Pull 消息(内存)传输的 最大条数 maxTransferCountOnMessageInMemory 32 useEpollNativeSelector false haSlaveFallbehindMax 268435456 haTransferBatchSize 32768 messageStorePlugIn pullThreadPoolQueueCapacity 100000 brokerClusterName DefaultCluster enableCalcFilterBitMap false destroyMapedFileIntervalForcibly 120000 mapedFileSizeCommitLog 1073741824 commercialBigCount 1 flushLeastPagesWhenWarmMapedFile 4096 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums 1 #commitlog刷新到磁盘间隔 flushIntervalCommitLog 500 mappedFileSizeConsumeQueueExt 50331648