Menu Close

Docker安装RocketMQ

1. RocketMQ整体架构

在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了解一下RocketMQ的组件,然后基于当前主流的Docker安装RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故障、保障高可用,生产环境建议安装RocketMQ集群。

file

  • NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。

  • Broker主要负责消息的存储、投递和查询以及服务高可用保证。

  • Producer消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的 Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer消息消费的角色,支持分布式集群方式部署。支持以push、pull两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

参考地址:https://github.com/apache/rocketmq/tree/master/docs/cn

2. 下载RocketMQ镜像

2.1. 选择镜像

~]# docker search rocketmq

file

2.2. 拉取镜像

~]# docker pull apache/rocketmq

file

file

2.3. 查看镜像

~]# docker images

file

3. 创建namesrv服务

3.1. 创建 namesrv 挂载目录

在宿主机上创建挂载目录用于挂载容器内部数据、配置文件、以及日志。(即RocketMQ运行时需要持久化到磁盘的数据)

mkdir -p /usr/local/rocketmq/data/namesrv/{logs,store}

file

3.2. 构建namesrv容器

~]# docker run -d \
--restart=always \
--name rmqnamesrv \
--privileged=true \
-p 9876:9876 \
-v /usr/local/rocketmq/data/namesrv/logs:/root/logs \
-v /usr/local/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
apache/rocketmq \
sh mqnamesrv

注意:通过 docker 的 -v 参数使用 volume 功能,把你本地的目录映射到容器内的目录上。否则所有数据都默认保存在容器运行时的内存中,重启之后就又回到最初的起点。(下同)

file

4. 创建broker节点

4.1 创建broker挂载目录

在宿主机上创建挂载目录用于挂载容器内部数据、配置文件、以及日志。

mkdir -p /usr/local/rocketmq/{data,conf}
mkdir -p /usr/local/rocketmq/data/broker/{logs,store}

file

4.2. 创建broker配置文件

{
echo "brokerClusterName = DefaultCluster"
echo "brokerName = broker-a"
echo "brokerId = 0"
echo "deleteWhen = 04"
echo "fileReservedTime = 48"
echo "brokerRole = ASYNC_MASTER"
echo "flushDiskType = ASYNC_FLUSH"
echo "brokerIP1 = xxx.xxx.xxx.xxx"
echo "diskMaxUsedSpaceRatio=95"
} > /usr/local/rocketmq/conf/broker.conf

备注:broken配置参数见附录C说明.

注意:brokerIP1是当前broker监听的IP,设置宿主机IP,不要使用docker 内部IP。

file

4.3. 构建broker容器

~]# docker run -d \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
--privileged=true \
-p 10911:10911 \
-p 10912:10912 \
-p 10909:10909 \
-v /usr/local/rocketmq/data/broker/logs:/root/logs \
-v /usr/local/rocketmq/data/broker/store:/root/store \
-v /usr/local/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-e "MAX_POSSIBLE_HEAP=200000000" \
apache/rocketmq \
sh mqbroker -c /opt/rocketmq/conf/broker.conf

file

5. 创建rockermq-console服务(rocketmq 控制台)

5.1. 拉取rockermq-console镜像

~]# docker pull styletang/rocketmq-console-ng

file

file

5.2. 构建rockermq-console容器

重点:JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx 中的addr是宿主机的IP地址

~]# docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8081:8080 \
--ulimit nofile=1024 \
styletang/rocketmq-console-ng:latest

file

6. 启动mqnamesrv 和 mqbroker

6.1 启动mqnamesrv

~]# docker start rmqnamesrv

file

6.2 启动mqbroker

~]# docker start rmqbroker

file

7. 查看控制台信息

访问宿主机IP+端口号(rockermq-console中映射的端口号)

file

8. 常见问题

附录

附录A. 相关联的文章

附录B. 参考

附录C. broker配置参数

  • 通用配置参数

    brokerClusterName=default-cluster
    brokerName=broker-b
    brokerId=0
    #- ASYNC_MASTER 异步复制Master   - SYNC_MASTER 同步双写Master   - SLAVE
    brokerRole=SYNC_MASTER
    namesrvAddr=100.100.100.100:9876;100.100.100.101:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=75
    #存储路径
    storePathRootDir=/data/mq/rocketmq4.3/store
    #commitLog 存储路径
    storePathCommitLog=/data/mq/rocketmq4.3/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/data/mq/rocketmq4.3/store/consumequeue
    #消息索引存储路径
    storePathIndex=/data/mq/rocketmq4.3/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/data/mq/rocketmq4.3/store/checkpoint
    #abort 文件存储路径
    abortFile=/data/mq/rocketmq4.3/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #并发send线程数,多线程来发送消息可能会出现broker busy
    sendMessageThreadPoolNums=128
    useReentrantLockWhenPutMessage=true
  • 更多配置参数

    #是否允许 Broker 自动创建订 阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup true
    brokerName broker-a
    #默认为listenPort+1
    haListenPort 10912
    clientManagerThreadPoolQueueCapacity 1000000
    flushCommitLogThoroughInterval 10000
    flushCommitLogLeastPages 4
    #客户端处理网络请求线程数
    clientCallbackExecutorThreads  Runtime.getRuntime().availableProcessors()
    notifyConsumerIdsChangedEnable true
    expectConsumerNumUseFilter 32
    cleanResourceInterval 10000
    channelNotActiveInterval 60000
    #检查物理文件磁盘空间
    diskMaxUsedSpaceRatio 75
    debugLockEnable false
    #延迟等级,可扩展
    messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    clusterTopicEnable true
    messageIndexEnable true
    serverPooledByteBufAllocatorEnable true
    shortPollingTimeMills 1000
    commercialEnable true
    redeleteHangedFileInterval 120000
    flushConsumerOffsetInterval 5000
    #false实时刷盘,true定时刷盘
    flushCommitLogTimed false
    #允许发送的最大消息体大小
    maxMessageSize  4194304
    brokerId 0
    syncFlushTimeout 5000
    flushConsumeQueueThoroughInterval 60000
    #无读写客户端存活时间
    clientChannelMaxIdleTimeSeconds 120
    flushDelayOffsetInterval 10000
    serverSocketRcvBufSize 131072
    #刷盘方式  - ASYNC_FLUSH 异步刷盘     - SYNC_FLUSH 同步刷盘 
    flushDiskType ASYNC_FLUSH
    #单次 Pull 消息(内存)传输的 最大字节数
    maxTransferBytesOnMessageInMemory 262144
    clientManageThreadPoolNums 32
    serverChannelMaxIdleTimeSeconds 120
    serverCallbackExecutorThreads 0
    enablePropertyFilter false
    transientStorePoolSize 5
    enableConsumeQueueExt false
    #单次 Pull 消息(磁盘)传输的 最大字节数
    maxTransferBytesOnMessageInDisk 65536
    #推送消息线程数
    pullMessageThreadPoolNums  16 + Runtime.getRuntime().availableProcessors() * 2
    clientCloseSocketIfTimeout false
    #是否从 web 服务器获取 Name Server 地址 
    fetchNamesrvAddrByAddressServer false
    sendThreadPoolQueueCapacity 10000
    diskFallRecorded true
    transientStorePoolEnable false
    offsetCheckInSlave false
    disableConsumeIfConsumerReadSlowly false
    commitCommitLogThoroughInterval 200
    consumerManagerThreadPoolQueueCapacity 1000000
    flushIntervalConsumeQueue 1000
    clientOnewaySemaphoreValue 65535
    warmMapedFileEnable false
    slaveReadEnable false
    transferMsgByHeap true
    consumerFallbehindThreshold 17179869184
    serverAsyncSemaphoreValue 64
    startAcceptSendRequestTimeStamp 0
    flushConsumerOffsetHistoryInterval 60000
    brokerIP2 192.168.59.96
    filterDataCleanTimeSpan 86400000
    #单次 Pull 消息(磁盘)传输的 最大条数
    maxTransferCountOnMessageInDisk 8
    #本机 IP 地址
    brokerIP1 192.168.59.96
    deleteCommitLogFilesInterval 100
    adminBrokerThreadPoolNums 16
    #commitLog 存储路径
    storePathCommitLog    /data/xxx/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue    /data/xxx/store/consumequeue
    #消息索引存储路径
    storePathIndex    /data/xxx/store/index
    #checkpoint 文件存储路径
    storeCheckpoint    /data/xxx/store/checkpoint
    #abort 文件存储路径
    abortFile=/data/xxx/store/abort
    filterServerNums 0
    deleteConsumeQueueFilesInterval 100
    checkCRCOnRecover true
    serverOnewaySemaphoreValue 256
    filterSupportRetry false
    defaultQueryMaxNum 32
    clientSocketRcvBufSize 131072
    clientWorkerThreads 4
    maxDelayTime 40
    connectTimeoutMillis 3000
    commercialTimerCount 1
    clientPooledByteBufAllocatorEnable false
    serverSocketSndBufSize 131072
    regionId DefaultRegion
    duplicationEnable false
    #磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用;FALSE 标记服务不可用,文件 不删除 
    cleanFileForciblyEnable true
    fastFailIfNoBufferInStorePool false
    #是否拒绝事务消息接入 
    rejectTransactionMessage false
    serverSelectorThreads 3
    consumerManageThreadPoolNums 32
    haSendHeartbeatInterval 5000
    mapedFileSizeConsumeQueue 6000000
    commitCommitLogLeastPages 4
    longPollingEnable true
    flushConsumeQueueLeastPages 2
    storePathRootDir C:UsersAdministratorstore
    #默认topic队列数
    defaultTopicQueueNums 8
    highSpeedMode false
    commercialBaseCount 1
    maxErrorRateOfBloomFilter 20
    accessMessageInMemoryMaxRatio 40
    #是否允许 Broker 自动创建 Topic,建议线下开启,线上 关闭
    autoCreateTopicEnable true
    commitIntervalCommitLog 200
    brokerTopicEnable true
    namesrvAddr 127.0.0.1:9876
    clientAsyncSemaphoreValue 65535
    maxMsgsNumBatch 64
    #文件保留时间,默认 48 小时
    fileReservedTime 48
    #删除文件时间点,默认凌晨 4点
    deleteWhen 04
    waitTimeMillsInSendQueue 200
    commercialTransCount 1
    osPageCacheBusyTimeOutMills 1000
    maxIndexNum 20000000
    registerBrokerTimeoutMills 6000
    #是否提供安全的消息索引机 制,索引保证不丢 
    messageIndexSafe false
    putMsgIndexHightWater 600000
    #Broker 对外服务的监听端口
    listenPort 10911
    serverWorkerThreads 8
    clientSocketSndBufSize 131072
    traceOn true
    maxHashSlotNum 5000000
    bitMapLengthConsumeQueueExt 112
    #- ASYNC_MASTER 异步复制Master   - SYNC_MASTER 同步双写Master   - SLAVE 
    brokerRole ASYNC_MASTER
    rocketmqHome D:SoftwareDevJavaDevapache-rocketmq
    #默认使用自旋锁,true使用重入锁
    useReentrantLockWhenPutMessage false
    haHousekeepingInterval 20000
    brokerPermission 6
    #单次 Pull 消息(内存)传输的 最大条数
    maxTransferCountOnMessageInMemory 32
    useEpollNativeSelector false
    haSlaveFallbehindMax 268435456
    haTransferBatchSize 32768
    messageStorePlugIn 
    pullThreadPoolQueueCapacity 100000
    brokerClusterName DefaultCluster
    enableCalcFilterBitMap false
    destroyMapedFileIntervalForcibly 120000
    mapedFileSizeCommitLog 1073741824
    commercialBigCount 1
    flushLeastPagesWhenWarmMapedFile 4096
    #并发send线程数,多线程来发送消息可能会出现broker busy
    sendMessageThreadPoolNums 1
    #commitlog刷新到磁盘间隔
    flushIntervalCommitLog 500
    mappedFileSizeConsumeQueueExt 50331648