MQ 概述 & Active MQ HelloWorld
MQ 是我们经常所说的消息中间件
MQ的引入
一个例子
学生给老师提问,都得排队,如果排队的人越来越多(高并发)
-
老师的负担加重
-
等待时间变长
-
老师可能垮掉
如果使用一种方式,能缓解老师的压力,比如说让同学们把自己的问题以一种规定的格式(老师能看懂)提交给班长,班长再提交给老师,班长不需要处理问题,他只负责收集问题
这样的话。可以有一些优点
- 老师不再直接接收同学的提问,转而接收班长收集到的信息(解决耦合调用的问题)
- 老师的压力不会激增(消除峰时,高并发)
- 提问完你也可以做别的事情,老师通过发信息给你反馈(异步)
消息中间件
在分布式系统中,调用的耦合非常严重
分布式架构中,存在以下问题:
- 系统间接口耦合比较严重
- 面对大流量并发时,容易被冲垮
- 等待同步存在的性能问题
我们期望的目标:
- 要做到系统解耦,当新的模块接进来时,可以做到代码改动最小
- 设置流量缓冲池,不要让业务系统被冲垮,削峰
- 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐登录,异步
消息中间件的定义
MOM的定义
面向消息的中间件使用消息传送提供者来协调消息传送操作。MOM 系统的基本元素是客户端、消息和 MOM 提供者,后者包括 API 和管理工具。MOM 提供者使用不同的体系结构路由和传送消息:它可以使用集中式消息服务器,也可以将路由和传送功能分布在每个客户端上。某些 MOM 产品结合了这两个方法。
使用 MOM 系统,客户端可以进行 API 调用,以便将消息发送到由提供者管理的目的地。该调用会调用提供者服务以路由和传送消息。在发送消息之后,客户端会继续执行其他工作,并确信在接收方客户端检索该消息之前,提供者一直保留该消息。基于消息的模型与提供者的协调耦合在一起,使得创建松散耦合的组件系统成为可能。这样的系统可以继续可靠地工作,即使在有个别组件或连接失败时也不会停机。
由消息传送提供者协调客户端之间的消息传送的另一个优点是:通过添加管理界面,可以监视和调整性能。这样,客户端应用程序便不必关心发送、接收和处理消息之外的任何问题。对于互操作性、可靠性、安全性、可伸缩性和性能之类的问题,应当由管理员通过编码实现 MOM 系统来解决。
发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接收者,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系
安装Active MQ
下载ActiveMQ
下载地址: http://activemq.apache.org/
Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server.
ActiveMQ 是非常流行的开源的,多协议的,基于Java 的消息服务
linux 中安装 Active MQ
解压缩
tar -zxvf 压缩包
新建文件夹
mkdir /ActiveMQ
cp -r 解压出来的文件 /ActiveMQ/activemq
启动active mq
cd bin
./activemq start
带日志的启动方式
./activemq start > /myactiveMQ/run_activemq.log
检查启动
ps -ef|grep activemq|grep -v grep
netstat -anp|grep 61616
lsof -i:61616
开启后,activemq会占用61616端口
使用Java操控ActiveMQ
导入maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.16</version>
</dependency>
生产者编码
//创建消息工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话
//第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
//参数的目的地名称
Queue queue = session.createQueue("queue1");
//创建消息生产者
MessageProducer producer = session.createProducer(queue);
//创建消息,由生产者发送
for (int i=0;i<3;i++)
{
// 创建消息对象
TextMessage textMessage = session.createTextMessage("textMessage"+i);
// 发送消息
producer.send(textMessage);
}
//释放资源
producer.close();
session.close();
connection.close();
执行完代码控制台的状态
Number Of Pending Message | 等待消费的消息 | 当前未出队的数量 |
---|---|---|
Number Of Consumers | 消费者数量 | 消费者端的消费者数量 |
Messages Enqueued | 进队消息数 | 进入队列的总数量,包括出队列的 |
Messages Dequeue | 出队消息数 | 可以理解为是消费者消费掉的数量 |
消费者编码
// 创建工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话
//第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
//参数的目的地名称
Queue queue = session.createQueue("queue1");
//创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//创建接受的消息
TextMessage message = null;
//循环接受消息,这个一定要进行类型转换,发出消息和接受的消息要一一对应
while ((message=(TextMessage)consumer.receive())!=null)
{
//打印消息
System.out.println(message.getText());
}
consumer.close();
session.close();
connection.close();
其中consumer.receive()
可以使用构造方法的timeout
属性来指出超时时间
如果超时,消费者将不再阻塞,直接结束consumer
,直接释放资源
消费者监听的方式接受消息
// 创建工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话
//第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
//参数的目的地名称
Queue queue = session.createQueue("queue1");
//创建消费者
MessageConsumer consumer = session.createConsumer(queue);
// 添加消息监听器(匿名内部类)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//如果消息不为空且是TextMessage
if (null != message && message instanceof TextMessage)
{
try {
//打印消息
System.out.println(((TextMessage) message).getText());
}catch (Exception e) {
e.printStackTrace();
}
}
}
});
//消费需要时间,需要给监听器时间进行消费
System.in.read();
consumer.close();
session.close();
connection.close();
多消费者模式
如果有两个消费者同时进行消费(先启动),此时再发布消息,平均分配,轮询机制
Topic 方式
- 生产者将消息发布到topic当中,每个消息可以有多个消费者,属于1:N关系
- 生产者和消费者之间有时间上的相关性,订阅某一个主题的消费者智能消费自它订阅之后发布的消息
- 生产者生产时,topic不保存消息,topic是无状态的,发布了就不管了,没人订阅就发布就是一条废消息,一般先订阅再发布
消费者代码:
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener((message)->{
if (message!=null && message instanceof TextMessage)
{
try {
System.out.println(((TextMessage) message).getText());
}catch (Exception e) {
e.printStackTrace();
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
生产者代码:
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic");
MessageProducer producer = session.createProducer(topic);
for (int i=0;i<5;i++)
{
producer.send(session.createTextMessage("topic Message"+i));
}
producer.close();
session.close();
connection.close();
记得先启动消费者再启动生产者
Topic 和 Queue对比
Topic 模式 | Queue 模式 | |
---|---|---|
工作模式 | 订阅-发布模式,如果没有订阅者就没人收得到消息,如果有多个订阅者,所有人都会收到消息 | 负载均衡模式,消息会被保存,直到有人来消费掉它,一条消息只会发送给一个消费者,多个消费者的情况会负载均衡进行消费 |
状态 | 无状态 | 会进行存储 |
是否丢弃 | 如果没有订阅者,发布完就丢弃 | 消息不会被丢弃 |
效率 | 订阅者越多,效率越低 | 效率不会明显降低 |
JMS简介
实现产品:
- activeMQ
- rocketMQ
- Kafka
- rabbitMQ
JMS组成
消息头
- JMSDestination 目的地
- JMSDeliveryMode 持久和非持久模式
- 持久性的消息:应该被传送一次,如果JMS提供者出现故障,该消息不会丢失,会在服务恢复之后再次传递
- 最多传递一次,如果服务器故障,该消息永远丢失
- JMSExpiration 消息过期时间
- 如果设置为0,永不过期
- 如果设置为非零,在消息过期时间之后消息还没被发送到目的地,则该消息被清楚
- JMSPriority 消息优先级
- 0-9 十个级别,0-4是普通消息,5-9是加急消息
- 保证加急消息比普通消息更快到达
- 默认级别4
- JMSMessageID:唯一编号
消息属性
如果需要除了消息头意外的属性,我们使用消息属性
使用属性名:属性值对的方式,对消息进一步去重,加以区分
有各种类型的属性
textMessage.setStringProperty(name, value);
textMessage.setDoubleProperty(name, value);
textMessage.setBooleanProperty(name, value);
textMessage.setIntegerProperty(name, value);
消息体
封装具体消息的数据
消息类型:
- TextMessage:一个普通字符串消息,包含一个String
- MapMessage:一个Map类型的消息,key是string,值是Java基本类型
- BytesMessage:二进制数组消息,包含byte[]
- StreamMessage:Java数据流消息
- ObjectMessage:对象消息,包含一个可序列化Java对象s
消息的可靠性
持久化
设置消息持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
如果服务器宕机,消息还是会存在
默认是持久化的
持久化Topic
消费者部分
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
Connection connection = activeMQConnectionFactory.createConnection();
//这个步骤不能少,需要指定id
connection.setClientID("subscriber 1");
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic");
//创建订阅者
TopicSubscriber consumer = session.createDurableSubscriber(topic, "remark..备注");
connection.start();
//接受消息
Message message = null;
while((message= consumer.receive(4000L))!=null)
{
String text = ((TextMessage) message).getText();
System.out.println(text);
}
consumer.close();
session.close();
connection.close();
持久化主题可以总结为
- 创建工厂
- 开启连接
- 设置clientID
- 创建session
- 创建主题
- 创建持久化订阅者
持久化主题一定要先开启持久化订阅
生产者程序
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.189.104:61616");
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i=0;i<5;i++)
{
producer.send(session.createTextMessage("topic Message"+i));
}
producer.close();
session.close();
connection.close();
注意producer.setDeliveryMode(DeliveryMode.PERSISTENT);
代码
流程总结为
- 创建连接工厂
- 创建连接
- 获取session
- 创建主题
- 创建生产者
- 设置生产者的DeliveryMode
事务
提供者提交事务
如果事务开启了,需要commit来进行执行
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
// 使用commit进行提交
session.commit;
使用以下结构使用事务
try {
session.commit();
}catch (Exception e) {
e.printStackTrace();
//出错
session.rollback();
}finally {
//释放资源
consumer.close();
session.close();
connection.close();
}
消费者事务
如果一样
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
如果不使用session.commit()
那么mq还是会认为事务没有被提交
此处格外注意:如果这里设置不好,极易产生消息重复消息的情况
签收
自动签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
手动签收
Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
如果选这个必须使用以下语句
message.acknowledge();
否则会一直不确认
保持在队列中
事务和签收下的消费者
事务和手动签收同时开始
如果是以事务来提交,就认为是自动签收了
- 非事务的会话中,消息何时被确认取决于创建会话时的应答模式
- 事务性会话中,一个事务被自动提交则消息被自动签收
事务和签收的总结
- 事务是自动签收的
- 签收了就相当于先拿着,如果不签收会产生重复消费的问题
- 如果是事务,在多线程中即使不提交也不会产生线程重复消费的问题,是线程安全的
点对点传输
是使用队列进行消息的控制,是自带(默认)持久化的,可以实现异步传输
广播传输
会把产生的消息发给所有订阅者,如果是默认情况是不带有持久化的,只有是持久化订阅者才会收到不在线时广播的消息