一. 初步安装使用

ActiveMQ 的官网 : http://activemq.apache.org

ActiveMQ

API 接受发送
MQ 的高可用MQ 的集群容错配置
MQ 的持久化
延时发送/定时投递
签收机制
Spring/SpringBoot 整合

1.为什么要使用 MQ ?

微服务架构后
链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例,

系统之间接口耦合比较严重

每新增一个下游功能,都要对上游的相关接口进行改造;
举个例子:如果系统A要发送数据给系统B和系统C,发送给每个系统的数据可能有差异,因此系统A对要发送给每个系统的数据进行了组装,然后逐一发送;
当代码上线后又新增了一个需求:
把数据也发送给D,新上了一个D系统也要接受A系统的数据,此时就需要修改A系统,让他感知到D系统的存在,同时把数据处理好再给D。在这个过程你会看到,每接入一个下游系统,都要对系统A进行代码改造,开发联调的效率很低。面对大流量并发时,容易被冲垮

面对大流量并发时,容易被冲垮

每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时,容易被冲垮。
举个例子秒杀业务:
上游系统发起下单购买操作,我就是下单一个操作
下游系统完成秒杀业务逻辑
(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)

等待同步存在性能问题

RPC接口上基本都是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。
比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。

2.是什么

1,要做到系统解耦,当新的模块接进来时,可以做到代码改动最小;能够解耦
2,设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能削峰
3,强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

面向消息的中间件(message-oriented middleware)MOM能够很好的解决以上问题,是指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。
大致的过程是这样的:
发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系;
尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上;
消息接收者则订阅或者监听该爱通道。一条消息可能最终转发给一个或者多个消息接收者,这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。

应用系统之间解耦合

  • 发送者和接受者不必了解对方,只需要确认消息
  • 发送者和接受者不必同时在线
image-20210224154503073

实现高可用,高性能,可伸缩,易用和安全的企业级面向消息服务的系统

  • 异步消息的消费和处理
  • 控制消息的消费顺序
  • 可以和Spring或者SpringBoot整合简化代码
  • 配置集群容错的MQ集群

3.安装

1.下载 http://activemq.apache.org

2.上传到/opt下

3.解压缩apache-activemq-5.16.1-bin.tar.gz

1
tar -zxvf  apache-activemq-5.16.1-bin.tar.gz

4.拷贝到根目录myactiveMQ

1
cp -r apache-activemq-5.16.1 /myactiveMq/

5.启动

1
2
cd bin
./activemq start

6.activemq默认端口是61616

查看端口号

1
2
ps -ef | grep activemq | grep -v grep  //屏蔽grep进程
netstat -anp|grep 61616 查看端口是否被占用

7.关闭

1
./activemq stop

8.日志

1
./activemq start > /myactiveMQ/run_active.log

4.ActiveMQ控制台

http://127.0.0.1:8161/admin

默认的用户名和密码是admin/admin

访问不了

1
2
systemctl stop firewalld.service //关闭防火墙
systemctl status firewalld.service //查看状态

服务器安装activemq,在本地浏览器无法访问8161端口,这是因为activemq默认是本地回环监听127.0.0.1,修改conf/jetty.xml文件,把127.0.0.1修改成0.0.0.0就可以了

vim ./conf/activemq.xml storeUsage 1gb 500mb

ActiveMQ采用61616端口提供JMS服务

ActiveMQ采用8161端口提供管理控制台服务

二、Java编码实现ActiveMQ通讯

1.依赖

创建maven项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lq.activemq</groupId>
<artifactId>activemq</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<!--activemq锁需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
<!--通用配置-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>

2.JMS

JMS : Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。

image-20210224154503074

在点对点的消息传递时,目的地称为 队列 queue

在发布订阅消息传递中,目的地称为 主题 topic

JDBC连接数据库

1
2
3
4
5
6
第一步:注册驱动(仅仅只做一次)   			Class.forName("com.mysql.jdbc.com");
第二步:建立连接(Connection) DriverManager.getConnection(url,user,password);
第三步:创建运行SQL语句(Statement) connection.createStatement();
第四步:运行语句 rs.executeQuery(sql);
第五步:处理结果集(ResultSet)
第六步:释放资源

3.点对点

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class JMSProduce {

public static final String ACTIVEMQ_URL = "tcp://192.168.5.128:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接Connection +并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//3. 创建会话Session
//两个参数,第一个参数事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME); //接口
//5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
//6.通过消息生产者生成3天消息发送MQ队列
for(int i = 0; i < 3; i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg----" + i);//字符串
//8. 通过消息生产者发送mq
producer.send(textMessage);
}

//9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("********消息发布到MQ成功!");
}
}

Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers=消费者数量,消费者端的消费者数量。
Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class JMSConsumer {

public static final String ACTIVEMQ_URL = "tcp://192.168.5.128:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接Connection +并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//3. 创建会话Session
//两个参数,第一个参数事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME); //接口

//5.创建消息的生产者
MessageConsumer consumer = session.createConsumer(queue);
//6.同步阻塞方式(receive)
//订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
while (true){
//TextMessage textMessage = (TextMessage)consumer.receive();
TextMessage textMessage = (TextMessage)consumer.receive(4000L);//有时间限制的
if ( null != textMessage){
System.out.println("****消费者接收到消息:"+textMessage.getText());
}else {
break;
}
}

//8.关闭资源
consumer.close();
session.close();
connection.close();
System.out.println("********接收到MQ成功!");
}
}

第二种方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class JMSConsumer {

public static final String ACTIVEMQ_URL = "tcp://192.168.5.128:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接Connection +并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//3. 创建会话Session
//两个参数,第一个参数事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME); //接口

//5.创建消息的生产者
MessageConsumer consumer = session.createConsumer(queue);

//通过监听的方式消费
//异步非阻塞方式(监听器onMessage())
//订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,
//当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if ( null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消费者接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//保证控制台不灭
System.in.read();
consumer.close();
session.close();
connection.close();
}
}

这里的一点经验: activemq 好像自带负载均衡,当先启动两个队列(Queue)的消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)

但是当有多个主题(Topic)订阅者时,发布者发布的消息,每个订阅者都会接收所有的消息。topic 更像是被广播的消息,但是缺点是不能接受已经发送过的消息。

JMS开发的基本步骤

1:创建一个connection factory
2:通过connection factory来创建JMS connection
3:启动JMS connection
4:通过JMS connection创建JMS session
5:创建JMS destination(目的地 队列/主题)
6:创建JMS producer或者创建JMS consume并设置destination
7:创建JMS consumer或者注册一个JMS message listener
8:发送(send)或者接收(receive)JMS message
9:关闭所有JMS资源 JMS开发的基本步骤

4.一对多

发布/订阅消息传递域的特点如下:
(1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
(2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
(3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class JMSProduce_Topic {

public static final String ACTIVEMQ_URL = "tcp://192.168.5.128:61616";
public static final String TOPIC_NAME = "topic01";

public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接Connection +并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//3. 创建会话Session
//两个参数,第一个参数事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME); //接口
//5.创建消息的生产者
MessageProducer producer = session.createProducer(topic);
//6.通过消息生产者生成3天消息发送MQ队列
for(int i = 0; i < 3; i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg----" + i);//字符串
//8. 通过消息生产者发送mq
producer.send(textMessage);
}

//9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("********TOPIC_NAME消息发布到MQ成功!");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class JMSConsumer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.5.128:61616";
public static final String TOPIC_NAME = "topic01";

public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接Connection +并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//3. 创建会话Session
//两个参数,第一个参数事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME); //接口

//5.创建消息的生产者
MessageConsumer consumer = session.createConsumer(topic);

//通过监听的方式消费
consumer.setMessageListener((message) -> {
if ( null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消费者接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

//保证控制台不灭
System.in.read();
consumer.close();
session.close();
connection.close();
}
}

先启动订阅者再启动生产者,不然发送的消息是废消息

5. 两个模式比较

比较项目目 Topic模式队列 Queue模式队列
工作模式 “订阅-发布”模式,如果当前没有订阅者,消息将会被去弃。如果有多个订阅者,那么这些订阅者都会收到消息 “负载均衡”模式,如果当前没有消费者。消息也不会丢弃;如果有多个消费者,那么—条消息也只会发送给其中一个消费者,并且要求消费者ack信息。
有无状态 无状态 Queue数掘默认会在mq服务器上以文件形式保存,比如Active MQ—般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
传递完整性 如果没有订阅者,消息会被丢弃 消息不会丢弃
处理效郭 由于消息要按服订阅者的数量进行复制,所以处理性能会随着订阅吉的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者。所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

三、JMS

1. 是什么

JavaEE是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计,开发。装配及部署企业应用程序。

1,JDBC(Java Databease)数据库连接
2,JNDI(Java Naming and Directory Interfaces)Java的命令和目录接口
3,EJB(Enterprise JavaBean)
4,RMI(Remote Method Invoke)远程方法调用
5,Java IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)接口定义语言/共用对象请求代理程序体系结构
6,JSP(Java Server Page)
7,Servlet
8,XML(Extensible Markup Language)可标记白标记语言
9,JMS(Java Message Service)Java消息服务
10,JTA(Java Transaction API)Java事务API
11,JTS(Java Transaction Service)Java事务服务
12,JavaMail
13,JAF(JavaBean Activation Framework)

什么是Java消息服务?

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

2.四大消息中间件的对比

特性 ActiveMQ RabbitMQ Kafka RocketMQ
PRODUCER-CUMSUMER 支持 支持 支持 支持
PUBLISH-SUBSCRIBE 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 - 支持
API完备性 低(静态配置)
多语言支持 支持,Java优先 语言无关 支持,Java优先 支持
单机吞吐量 万级 万级 十万级 单机万级
消息延迟 微秒级 毫秒级
可用性 高(主从) 高(主从) 非常高(分布式)
消息丢失 极低 理论上不会
消息重复 可控制 理论上会有重复
文档的完备性
提供快速入门
首次部署难度

3.JMS的组成结构和特点

JMS 部件 JMS provider JMS producer JMS consumer JMS message
含义 实现JMS 的消息中间件,也就是MQ服务器 消息生产者,创建和发送消息的客户端 消息消费者,接收和处理消息的客户端 JMS 消息,分为消息头、消息属性、消息体

5 个主要的消息头

消息头 JMSDestination JMSDeliveryMode JMSExpiration JMSPriority JMSMessageId
含义 消息发送的目的地,主要是指Queue和Topic 是持久还是非持久 过期时间,默认永久 优先级,默认是4有0~9 ,5-9 是紧急的,0-4 是普通的 唯一标识每个消息的标识由MQ产生。

一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。

一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。

5 种消息体格式:

5种消息体 TextMessage Mapmessage BytesMessage StreamMessage ObjectMessage
含义 普通字符串消息,包含一个String Map 类型的消息, k-> String v -> Java 基本类型 二进制数组消息,包含一个byte[] Java 数据流消息,用标准流操作来顺序的填充读取 对象消息,包含一个可序列化的Java 对象

消息属性:识别、去重、重点标注

4.JMS的可靠性

JMS 可靠性:Persistent 持久性 、 事务 、 Acknowledge 签收

PERSISTENT:持久性

队列queue

1
2
3
4
5
// 在队列为目的地的时候持久化消息
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 队列为目的地的非持久化消息
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

持久化的消息,服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息仍就会被消费。

但是非持久化的消息,服务器宕机后消息永远丢失。 而当你没有注明是否是持久化还是非持久化时,默认是持久化的消息。

对于目的地为主题(topic)来说,默认就是非持久化的,让主题的订阅支持化的意义在于:对于订阅了公众号的人来说,当用户手机关机,在开机后任就可以接受到关注公众号之前发送的消息。

代码实现:持久化topic 的消费者

主题topic

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
……    
connection.setClientID("z4");
// 前面代码相同,不复制了
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");

// 5 发布订阅
connection.start();

Message message = topicSubscriber.receive();// 一直等
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive(3000L); // 等1秒后meesage 为空,跳出循环,控制台关闭
}

发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
MessageProducer messageProducer = session.createProducer(topic);
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中

// 设置持久化topic 在启动
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i < 4 ; i++) {
// 7 创建字消息
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 8 通过messageProducer发布消息
messageProducer.send(textMessage);

MapMessage mapMessage = session.createMapMessage();
// mapMessage.setString("k1","v1");
// messageProducer.send(mapMessage);
}
// 9 关闭资源
……

先注册,离线后再启动,消息被消费。

Transaction:事务

createSession的第一个参数为true 为开启事务,开启事务之后必须在将消息提交,才可以在队列中看到消息

1
2
3
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//提交
session.commit();

事务偏生产者/签收偏消费者

事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性

对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。举个栗子:

如果消息消费的 createSession 设置为 ture ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。

Acknowledge:签收

非事务

1
2
3
4
5
Session.AUTO_ACKNOWLEDGE      自动签收,默认

Session.CLIENT_ACKNOWLEDGE 手动签收
//手动签收需要acknowledge ,不签收,就会被重复消费
textMessage.acknowledge();

而对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。

开启事务,但不commit,就算签收也没有用

1
2
3
4
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//Session session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE); // 也是自动签收
……
session.commit();

5.总结

点对点模型是基于队列的,生产者发送消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。

1:如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收

2:队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势

Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。
主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送

非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。一句话:先订阅注册才能接受到发布,只给订阅者发布消息。

客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息
当持久订阅状态下,不能恢复或重新派送一个未签收的消息。持久订阅才能恢复或重新派送一个未签收的消息。

当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅

四、Broker

相当于一个ActiveMQ服务器实例

说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,
在用的时候再去启动这样能节省了资源,也保证了可用性。

类似Redis,不同config配置文件来模拟不同的实例

1
./activemq start xbean:file:/myactivemq/conf/activemq02.xml

嵌入式Broker

用ActiveMQ Broker作为独立的消息服务器来构建Java应用。
ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<dependencies>
<!--activemq锁需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.1</version>
</dependency>
<!--嵌入式broker-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>

<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.15</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
public class EmbedBroker {

    public static void main(String[] args) throws Exception {
        //ActiveMQ也支持在vm中通信基于嵌入的broker
        BrokerService brokerService = new BrokerService();
        brokerService.setPopulateJMSXUserID(true);
        brokerService.addConnector("tcp://127.0.0.1:61616");
        brokerService.start();
    }
}

五、SpringBoot整合MQ

创建父工程pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.lq.activemq</groupId>
<artifactId>activemq_demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>boot_mq_produce</module>
</modules>
<packaging>pom</packaging>

<!--统一管理jar包版本-->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<lombok.version>1.16.18</lombok.version>
<log4j.version>1.2.17</log4j.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.16</druid.version>
<mybatis.spring.boot.version>2.1.1</mybatis.spring.boot.version>
</properties>

<!--子模块继承之后,提供作用:锁定版本+子module不用谢groupId和version-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</dependency>
<!--spring boot 2.2.2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud Hoxton.SR1-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud 阿里巴巴-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>runtime</scope>
</dependency>
<!-- druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!--log4j-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
</plugins>
</build>
</project>

队列

队列生产者pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>activemq_demo</artifactId>
<groupId>com.lq.activemq</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>boot_mq_produce</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 7777

spring:
activemq:
broker-url: tcp://192.168.5.128:61616 #my服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false #false = Queue true = Topic


#自己定义队列名称
myqueue: boot-activemq-queue

注入bean

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@EnableJms //重点,开启jms适配注解
public class ConfigBean {

@Value("${myqueue}")
private String myQueue;

@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}

业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class Queue_Produce {

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Queue queue;

public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue,"*****:"+ UUID.randomUUID().toString().substring(0,6));
}

}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest(classes = Main_Produce.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMq {

@Resource
private Queue_Produce queue_produce;

@Test
public void testSend() throws Exception{
queue_produce.produceMsg();
}
}

定时投放

业务类加上

1
2
3
4
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled(){
jmsMessagingTemplate.convertAndSend(queue,"*****:Scheduled"+ UUID.randomUUID().toString().substring(0,6));
}

主启类加

1
@EnableScheduling

消费者

建maven工程

导入依赖

yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 8888

spring:
activemq:
broker-url: tcp://192.168.5.128:61616 #my服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false #false = Queue true = Topic


#自己定义队列名称
myqueue: boot-activemq-queue

业务类

1
2
3
4
5
6
7
8
@Component
public class Queue_Consumer {

@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException{
System.out.println("*********消费者收到消息"+textMessage.getText());
}
}

主题

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 7777

spring:
activemq:
broker-url: tcp://192.168.5.128:61616 #my服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true #false = Queue true = Topic


#自己定义队列名称
myqueue: boot-activemq-queue
myTopic: boot-activemq-topic

配置类

1
2
3
4
5
6
7
8
9
10
11
@Component
@EnableJms
public class ConfigBean {

@Value("${myTopic}")
private String topicName;

public Topic topic(){
return new ActiveMQTopic(topicName);
}
}

业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class Topic_Produce {

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired
private Topic topic;

@Scheduled(fixedDelay = 3000)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic,"主题消息:"+ UUID.randomUUID().toString().substring(0,6));
}
}

消费者

yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 8888

spring:
activemq:
broker-url: tcp://192.168.5.128:61616 #my服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true #false = Queue true = Topic


#自己定义队列名称
myqueue: boot-activemq-queue
myTopic: boot-activemq-topic

业务类

1
2
3
4
5
6
7
8
@Component
public class Topic_Consumer {

@JmsListener(destination = "${myTopic}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("*********消费者收到消息"+textMessage.getText());
}
}

六、activeMQ的传输协议

61616如何改端口?

你生产上的连接协议如何配置的?使用tcp吗?

http://activemq.apache.org/configuring-version-5-transports.html

ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。
其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。

1
2
3
4
5
6
7
8
9
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

在上文给出的配置信息中,
URI描述信息的头部都是采用协议名称:例如
描述amqp协议的监听端口时,采用的URI描述格式为“amqp://······”;
描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”;
唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire

传输协议有哪些

1.Transmission Control Protocol(TCP)默认

1.这是默认的Broker配置,TCP的Client监听端口61616
2.在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。
3.TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。
4.TCP传输的的优点:
       (4.1)TCP协议传输可靠性高,稳定性强
       (4.2)高效率:字节流方式传递,效率很高
       (4.3)有效性、可用性:应用广泛,支持任何平台
5.关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html

2.New I/O API Protocol(NIO)

1
2
3
4
5
6
1.NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
2.适合使用NIO协议的场景:
(2.1)可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
(2.2)可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
3.NIO连接的URI形式:nio://hostname:port?key=value&key=value
4.关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html

nio案例演示

http://activemq.apache.org/configuring-version-5-transports.html

修改activemq.xml

1
2
3
4
5
6
<transportConnectors>
      <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
</transportConnectors>
如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都讲使用BIO网络IO模型
所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。
如下所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。

修改普通代码

1
2
public static final String ACTIVEMQ_URL = "nio://192.168.5.128:61618";
public static final String QUEUE_NAME = "transport";

nio增强

URI格式以”nio”开头,代表这个端口使用TCP协议为基础的NIO网络模型。
但是这样的设置方式,只能使这个端口支持Openwire协议。

我们怎么能够让这个端口既支持NIO网络模型,又让他支持多个协议呢?

http://activemq.apache.org/auto

使用”+”符号来为端口设置多种特性

1
<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>

七、ActiveMQ的消息存储和持久化

http://activemq.apache.org/persistence

1.是什么

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。

消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

2.有哪些

AMQ Mesage Store(了解)

基于文件的存储方式,是以前的默认消息存储,现在不用了

KahaDB消息存储(默认)

基于日志文件,从ActiveMQ5.4开始默认的持久化插件

http://activemq.aache.org/kahadb

KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。
数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
KahaDB在消息保存的目录中有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比,这就非常简洁了。

1,db-number.log
KahaDB存储消息到预定大小的数据纪录文件中,文件名为db-number.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如没32M一个文件,文件名按照数字进行编号,如db-1.log,db-2.log······。当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。

2,db.data
该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-number。log里面存储消息。

3,db.free
当问当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID

4,db.redo
用来进行消息恢复,如果KahaDB消息存储再强制退出后启动,用于恢复BTree索引。

5,lock
文件锁,表示当前kahadb独写权限的broker。

JDBC消息存储(重点)

LevelDB消息存储(了解)

这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

默认配置如下:

1
2
3
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>

JDBC Message Store with ActiveMQ Journal

3.JDBC存储消息

MQ+MySQL

1.添加mysql数据库的驱动包到lib文件夹

https://downloads.mysql.com/archives/c-j/

2.jdbcPersistenceAdapter配置

1
2
3
4
5
6
7
8
9
10
修改activemq.xml配置文件
修改前的KahaDB修改后的jdbcPersisteceAdapter
<persistenceAdapter>        
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>          
<persistenceAdapter>              
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>          
</persistenceAdapter>
dataSource是指定将要引用的持久化数据库的bean名称。
createTableOnStartup是否在启动的时候创建数据库表,默认是true,这样每次启动都会去创建表了,一般是第一次启动的时候设置为true,然后再去改成false。

3.数据库连接池配置

1
2
3
4
5
6
7
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.1.239:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean><

4.建库SQL和创表说明

建一个名为activemq的数据库

三张表的说明

ACTIVEMQ_MSGS

ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高

ACTIVEMQ_ACKS

用于存储订阅关系,如果持久化Topic,订阅者和服务器的订阅关系在这个表保存

ACTIVEMQ_LOCK

表ACTIVEMQ_LOCK在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker

如果新建数据库ok,上述配置ok,代码运行ok,3张表会自动生成

如果表没生成,可能需要自己创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
-- auto-generated definition
create table ACTIVEMQ_ACKS
(
    CONTAINER     varchar(250)     not null comment '消息的Destination',
    SUB_DEST      varchar(250)     null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
    CLIENT_ID     varchar(250)     not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
    SUB_NAME      varchar(250)     not null comment '订阅者名称',
    SELECTOR      varchar(250)     null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
    LAST_ACKED_ID bigint           null comment '记录消费过消息的ID',
    PRIORITY      bigint default 5 not null comment '优先级,默认5',
    XID           varchar(250)     null,
    primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
    comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';

create index ACTIVEMQ_ACKS_XIDX
    on ACTIVEMQ_ACKS (XID);

-- auto-generated definition
create table ACTIVEMQ_LOCK
(
    ID          bigint       not null
        primary key,
    TIME        bigint       null,
    BROKER_NAME varchar(250) null
);

-- auto-generated definition
create table ACTIVEMQ_MSGS
(
    ID         bigint       not null
        primary key,
    CONTAINER  varchar(250) not null,
    MSGID_PROD varchar(250) null,
    MSGID_SEQ  bigint       null,
    EXPIRATION bigint       null,
    MSG        blob         null,
    PRIORITY   bigint       null,
    XID        varchar(250) null
);

create index ACTIVEMQ_MSGS_CIDX
    on ACTIVEMQ_MSGS (CONTAINER);

create index ACTIVEMQ_MSGS_EIDX
    on ACTIVEMQ_MSGS (EXPIRATION);

create index ACTIVEMQ_MSGS_MIDX
    on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);

create index ACTIVEMQ_MSGS_PIDX
    on ACTIVEMQ_MSGS (PRIORITY);

create index ACTIVEMQ_MSGS_XIDX
    on ACTIVEMQ_MSGS (XID);

https://blog.csdn.net/jia970426/article/details/104516383

数据库授权

1
2
3
4
5
mysql > grant all privileges on db_name.* to usr_name@'%' identified by 'pwd';

其中,db_name 是数据库名, usr_name 用户名, pwd 密码。'%' 为通配符。

mysql > flush privileges ;

5.代码运行验证

一定要开启持久化

1
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

6.数据库情况

在点对点类型中

当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

而且点对点类型中消息一旦被Consumer消费,就从数据中删除

消费前的消息,会被存放到数据库

上面的消息被消费后被MQ自动删除

设置了持久订阅数据库里面会保存订阅者的信息

ACTIVEMQ_ACKS表中的LAST_ACKED_ID记录了CLIENT_ID最后签收的一条消息

而LAST_ACKED_ID和ACTIVEMQ_MSGS的ID字段是外键关联关系,这样就可以实现,Topic的消息保存到ACTIVEMQ_MSGS表内,还能根据ACTIVEMQ_ACKS表中的持久订阅者查到该订阅者上次收到的最后一条消息是什么

值得注意的是,Topic内的消息是不会被删除的,而Queue的消息在被删除后,会在数据库中被删除,如果需要保存Queue,应该使用其他方案解决

7.总结

如果是queue
在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除消费过的消息

如果是topic,
一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs,
在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。

8.开发

在配置关系型数据库作为ActiveMQ的持久化存储方案时,有坑

数据库jar包
注意把对应版本的数据库jar或者你自己使用的非自带的数据库连接池jar包

createTablesOnStartup属性
默认为true,每次启动activemq都会自动创建表,在第一次启动后,应改为false,避免不必要的损失。

java.lang.IllegalStateException: LifecycleProcessor not initialized
确认计算机主机名名称没有下划线

4.JDBC Message store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。

当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:
生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

1
2
3
4
5
6
7
8
修改配置前
<persistenceAdapter>         
<jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
</persistenceAdapter>
<persistenceFactory>                      <journalPersistenceAdapterFactory                                    journalLogFiles="5"                                    journalLogFileSize="32768"                                    useJournal="true"                                    useQuickJournal="true"                                   
dataSource="#mysql-ds"
dataDirectory="../activemq-data" /> 
</persistenceFactory>

以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用

5.持久化总结

持久化消息主要指的是:
MQ所在服务器宕机了消息不会丢试的机制。

持久化机制演变的过程:
从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。

ActiveMQ消息持久化机制有:
AMQ 基于日志文件
KahaDB 基于日志文件,从ActiveMQ5.4开始默认使用
JDBC 基于第三方数据库
Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。

七.ActiveMQ的多节点集群

引入消息中间件后如何保证其高可用?

基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。

zookeeper+replicated-leveldb-store的主从集群

三种集群

基于shareFileSystem共享文件系统(KahaDB)

基于JDBC

基于可复制的LevelDB

http://activemq.apache.org/replicated-leveldb-store

集群原理

http://activemq.apache.org/replicated-leveldb-store

使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。
如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Maste的Slaves。
如果Master宕机得到了最新更新的Slave会变成Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。

所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。
所以,如给你配置了replicas=3,name法定大小是(3/2)+1 = 2。Master将会存储更新然后等待(2-1)=1个Slave存储和更新完成,才汇报success,至于为什么是2-1,zookeeper讲解过自行复习。
有一个ode要作为观察者存在。当一个新的Master被选中,你需要至少保障一个法定mode在线以能够找到拥有最新状态的ode,这个ode才可以成为新的Master。

因此,推荐运行至少3个replica nodes以防止一个node失败后服务中断。

配置集群步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1,创建我们自己的文件夹存放Zookeeper
mkdir /my_zookeeper
 
2,下载Zookeeper
wget -P /my_zookeeper/ https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
 
3,解压
tar -zxvf apache-zookeeper-3.5.6-bin.tar.g
 
4,修改配置文件
文件名必须叫这个zoo.cfg
cp zoo_sample.cfg zoo.cfg
设置一下自定义的数据文件夹(注意要手动创建这个目录,后面会用到),,注意最后一定要有/结尾,没有/会报错这是坑
dataDir=/my_zookeeper/apache-zookeeper-3.5.6-bin/data/
在zoo.cfg件最后面追加集群服务器
server.1=192.168.10.130:2888:3888
server.2=192.168.10.132:2888:3888
server.3=192.168.10.133:2888:3888
 
server.1=leantaot-zk-01:2888:3888
1是一个数字,标识这个是第几号服务器
leantaot-zk-01是这个服务器的IP地址(或者是与IP地址做了映射的主机名)
2888第一个端口用来集群成员的信息交换,标识这个服务器与集群中的leader服务器交换信息的端口
3888是在leader挂掉时专门用来进行选举leader所用的端口
 
6.把刚刚配置好的Zookeeper整个文件夹远程推送到其他服务器的/my_zookeeper文件夹内
scp -r /my_zookeeper/ root@192.168.10.132:/
scp -r /my_zookeeper/ root@192.168.10.133:/ 
 
7.创建myid文件, id 与 zoo.cfg 中的序号对应
在192.168.10.130机器上执行
echo 1 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/zookeeper_server.pid
echo 1 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/myid
在192.168.10.132机器上执行
echo 2 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/zookeeper_server.pid
echo 2 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/myid
在192.168.10.133机器上执行
echo 3 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/zookeeper_server.pid
echo 3 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/myid
 
8.bin目录下启动Zookeeper做测试
 ./zkServer.sh start
输出
ZooKeeper JMX enabled by default
Using config: /my_zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 1.
 

5.创建3台集群目录

就是一台电脑复制三份ActiveMQ

6.修改管理控制台端口

就是ActiveMQ后台管理页面的访问端口

node1不改

jetty.xml port

7.hostname名字映射(如果不映射只需要吧mq配置文件的hostname改成当前主机ip)

8.ActiveMQ集群配置

配置文件里面的BrokerName要全部一致

持久化配置(必须)

1
2
3
4
5
6
7
8
9
10
<persistenceAdapter>
   <replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:62621"
zkAddress="192.168.10.130:2181,192.168.10.132:2181,192.168.10.133:2181"
hostname="192.168.10.130"
zkPath="/activemq/leveldb-stores"
                        />
  </persistenceAdapter>

9.修改各个节点的消息端口

61616/61617/61618

10.按顺序启动3个ActiveMQ节点,到这步前提是zk集群已经成功启动运行

使用zkCli.sh连接一台Zookeeper
/my_zookeeper/apache-zookeeper-3.5.6-bin/bin/zkCli.sh -server 192.168.10.130:2181

八、消息中间件高级特性

1.异步投递

http://activemq.apache.org/async-sends

对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送

是什么

ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer知道broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。

异步发送
它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加
此外它不能有效的确保消息的发送成功。在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。

开启

1
connection.setUseAsyncSend(true);

异步消息如何确定发送成功?

异步发送丢失消息的场景是:生产者设置userAsyncSend=true,使用producer.send(msg)持续发送消息。
如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。

所以,正确的异步发送方法是需要接收回调的。

同步发送和异步发送的区别就在此,
同步发送等send不阻塞了就表示一定发送成功了,
异步发送需要客户端回执并由客户端再判断一次是否发送成功

JmsProduce_AsyncSend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.lq.activemq.async;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;

import javax.jms.*;
import java.util.UUID;

public class JmsProduce_AsyncSend {

public static final String ACTIVEMQ_URL = "tcp://192.168.5.128:61616";
public static final String QUEUE_NAME = "async01";

public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true); //异步发送
//2.通过连接工厂,获得连接Connection +并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

//3. 创建会话Session
//两个参数,第一个参数事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME); //接口
//5.创建消息的生产者
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);

//6.通过消息生产者生成3天消息发送MQ队列
for(int i = 0; i < 3; i++){
//7.创建消息
TextMessage textMessage = session.createTextMessage("async msg----" + i);//字符串
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"---orderLQ");
String msgID = textMessage.getJMSMessageID();
//8. 通过消息生产者发送mq,异步回调
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(msgID+"has been ok send");
}

@Override
public void onException(JMSException e) {
System.out.println(msgID+"fail to send mq");
}
});
}

//9.关闭资源
activeMQMessageProducer.close();
session.close();
connection.close();
System.out.println("********消息发布到MQ成功!");
}

}

2.延迟投递和定时投递

http://activemq.apache.org/delay-and-schedule-message-delivery.html

属性 type description
AMQ_SCHEDULED_DELAY long 延迟投递时间
AMQ_SCHEDULED_PERIOD long 重复投递时间
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON string Cron表达式

要在activemq.xml中配置schedulerSupport属性为true

Java代码里面封装的辅助消息类型:ScheduledMessage

1
2
3
4
5
6
7
8
9
10
11
12
long delay = 3 * 1000;      //延迟投递的时间
long period = 4 * 1000;     //每次投递的时间间隔
int repeat = 5;             //投递的次数

for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("message-延时投递" + i);
//给消息设置属性以便MQ服务器读取到这些信息,好做对应的处理
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
messageProducer.send(textMessage);
}

3.ActiveMQ消息重试机制

具体哪些情况会引发消息重发?

1:Client用了transactions且再session中调用了rollback
2:Client用了transactions且再调用commit之前关闭或者没有commit
3:Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

请说说消息重发时间间隔和重发次数?

间隔:1
次数:6
每秒发6次

有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。

http://activemq.apache.org/redelivery-policy

4.死信队列

ActiveMQ中引入了“死倍队列”(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入“死信队列”。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。

一般生产环境中在使用MQ的时候设计两个队列:*一个是核心业务队列,一个是死信队列。**
核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。
假如第三方物流系统故障了此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。
一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中**。然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求的,不停的监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,

5.防止重复调用

网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。