ActiveMQ高级
ActiveMQ通信协议
ActiveMQ支持很多协议,包括
- TCP
- NIO
- UDP
- SSL
- Http
- VM
实际上NIO协议会使用的很多
查看配置文件activemq.xml
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
TCP协议
这是默认的Broker 的配置
- 在网络传输数据之前必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流的默认情况下ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据的快速交互
- 这个协议还可以使用
?
添加一些参数调节性能
具体参考:
NIO协议
NIO协议和TCP协议类似但NIO更侧重底层的访问操作。它允许开发人员对统一资源可以有更多的客户端访问和更多的负载
适合使用NIO的changjing
- 有大量客户端连接到Broker上,大量客户端连接Broker是被操作系统的线程所限制的,因此,NIO的实现比TCP需要更少的线程去运行
- 可能对Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能
配置方式
<broker>
...
<transportConnectors>
<transportConnector name="nio+ssl" uri="nio+ssl://0.0.0.0:61616"/>
</<transportConnectors>
...
</broker>
配置NIO
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
代码编写十分简单,更改传输协议就行了
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("nio://192.168.189.104:61618");
多协议支持
- 使用Auto关键字
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61618?trace=true"/>
持久化
根据官网所述
activemq支持的持久化方式有两个
- LevelDB
- KahaDB
- JDBC
- AMQ
在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件,内存数据库或者远程数据库等再试图将消息发送给接受者,成功则将消息丛存储中删除,失败则继续发送
消息中心启动后首先检查指定的存储位置,如果有未成功发送的消息,则需要把消息发送出去
AMQ
AMQ是基于文件的存储方式,已经过时
KahaDB
基于日志进行消息的存储,是ActiveMQ5.4
开始的默认持久化方式
打开配置文件可以发现这是默认的持久化方式
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
消息存储使用一个事务日志和仅仅使用一个索引文件来存储它所有的地址
数据会被追加到data.log
中,如果不需要了就会被丢弃
其目录下会有四类文件和一个lock
- db-1.log:这是日志文件,会有一个极限的大小,如32M,如果超出了会新建一个文件
- db.data:包含了持久化的B-树的索引,索引指向
log
中的信息 - db.free:记录空闲页的索引
- db.redo:索引的恢复和备份文件
- lock:文件锁,表示当前获得kahadb读写权限的broker
LevelDB
LevelDB
是从ActiveMQ5.8
之后引进的,和KahaDB十分相似,也是基于文件的本地数据库存储形式,但是提供比KahaDB更快的持久性
它不使用B-树进行索引,而是另一种基于LevelDB
的索引
JDBC
驱动问题
添加mysql的jar包到lib
文件夹下
配置配置文件
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
dataSource 指定将要引用的持久化的数据库的bean名称
是否在启动时创建表,createTablesOnStartup
首次使用true
,后面使用false
数据库连接池的配置
和spring的数据源一样
注意这里的id:
mysql-ds
和上面的id是对应的
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?serverTimezone=UTC"/>
<property name="username" value="root"/>
<property name="password" value="njjames200927"/>
<property name="initialSize" value="5"/>
<property name="maxActive" value="10"/>
<property name="maxWait" value="3000"/>
</bean>
如果消息是持久化的,保存在内存当中
使用了这个数据库持久化策略之后,添加消息,我们发现ACTIVEMQ_MSGS
多了这么多数据
下面介绍一下会自动生成哪些表
- ACTIVEMQ_MSGS:记录队列中或者其他持久化的消息内容
- ACTIVEMQ_LOCK:记录锁的信息,同一时间只允许一个activemq进行写操作
- ACTIVEMQ_ACKS:记录订阅的信息
如果使用非持久化,将不会记录到数据库中,会只存在内存中,如果服务器宕机数据就会丢失
小总结
如果是队列
如果没有消费者消费的情况下会将消息保存到activemq_msgs
表中,只要有任意一个消费者消费过了, 消息会被立即删除
如果是主题
一般先启动订阅再产生消息的情况下会被存放到activemq_acks
中
Journal
在JDBC持久化和activemq之间可以再使用Journal,进行高速缓存,提升性能,如果消费者消费得足够快的话,可以使用Journal免去大量的读写操作
配置方法
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data" />
</persistenceFactory>
一定程度上说,这个东西实验了一些读写分离
高级特性
异步投递
ActiveMQ支持同步,异步两个发送的模式将消息发送到broker,模式的选择对发送的延迟有巨大的影响。
ActiveMQ默认使用异步的发送模式,除非明确指定使用同步发送方式,或者在未使用事务的前提下发送持久化消息。
如果没有使用事务且发送的是一个持久化消息,每一次发送都是同步的,并且会阻塞producer知道broker返回一个确认,表示消息已经被持久化到磁盘
异步投递不能保证不难保证消息的发送成功
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61618");
activeMQConnectionFactory.setUseAsyncSend(true);
如果使用异步投递,保证消息不丢失的方式可以使用回调方法,使用方法如下
//创建消息生产者
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(queue);
//创建消息
for (int i=0;i<50;i++)
{
// 创建消息对象
TextMessage textMessage = session.createTextMessage("textMessage"+i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"message1");
String jmsMessageID = textMessage.getJMSMessageID();
producer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
//发送成功调用
System.out.println(jmsMessageID+"发送成功");
}
@Override
public void onException(JMSException exception) {
//发送失败调用
System.out.println(jmsMessageID+"发送失败");
}
});
}
注意这里的Producer
是ActiveMQMessageProducer
延迟投递&定时投递
在xml配置文件中开启schedulerSupport
属性为true
在Java代码中使用
//发5次
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,5);
//消息延迟5s
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,5000);
//消息间隔5s
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,5000);
消息重试机制
哪些情况会引发消息的重发
- session调用了
rollback
- 没有
commit()
- Client使用了
CLIENT_ACKNOWLEDGE
的传递模式下,在 session中使用了recover()
消息的重发间隔和重发次数
间隔(重发延迟):1s
次数(最大重试次数):6次
有毒消息Poison ACK
当超过默认重发次数,消费端会给MQ发送一个poison ack
,MQ会把这个消息放到死信队列
死信队列
Dead Letter Queue
开发人员可以在这个Queue中查看出错的消息,进行人工干预
主要是用于处理失败的消息
共享死信队列
默认策略。所有出错的消息都使用共同的队列
独占死信队列
对一个业务的消息使用单独的死信队列,方便进行集中处理
死信队列的配置方式
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
// “>”表示对所有队列生效,如果需要设置指定队列,则直接写队列名称
<policyEntry queue=">">
<deadLetterStrategy>
//queuePrefix:设置死信队列前缀
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false"/>
//是否丢弃过期消息
<!--<sharedDeadLetterStrategy processExpired="false" />-->
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
消息幂等性(防止重复消费)
- 对数据库进行插入,存放到数据库中,把id作为主键
- 使用redis,给消息分配全局id,把id放进redis
分发策略
分发策略有很多,比如说对于队列:
- 轮询
- 固定顺序
队列默认轮询,要使用严格顺序的分发策略,使用如下配置
<policyEntry queue=">" strictOrderDispatch="false" />