01消息队列


1.概述

作用:发消息。
常用技术:activeMq、rabbitMq、Kafka

主要功能:

  1. 解耦
    应用场景:例如分布式项目中,卖家中心发布了一个商品,需要将该商品同步到solr索引库当中,然而,导入索引库的功能在门户模块,如何通知门户进行导入索引库操作,就用到了消息队列。

    image-20210829155604702

  2. 削峰
    应用场景:淘宝秒杀活动中,如果秒杀接口被多个请求同时访问会导致卡死宕机,这时就需要在秒杀接口之前加上一个消息队列,设置好请求数之后,存入队列出队方式调用秒杀接口,超过预设的请求数后,丢弃其余秒杀请求。

image-20210829155618738

2.常用技术

2.1activeMq

相关概念:
生产者:发送消息的一方
消费者:接收消息的一方
消息形式:

  1. 点对点 queue:一个生产者对应一个消费者,消息队列会暂存消息,消费者接收到消息后,消息从队列中删除。
  2. 发布/订阅 topic:一个生产者对应多个消费者,消息队列不会暂存消息,如果当时消费者不处于接收信息的状态,消息无法接受。

消息的正文格式
StreamMessage 数据流
MapMessage 一组键值对
TextMessage 字符串
ObJectMessage 一个序列化的java对象
BytesMessage 字节的数据流

2.2docker安装activeMq

https://blog.csdn.net/qq_35981283/article/details/69666706

1
2
3
4
5
6
1.拉取activemq镜像
docker pull docker.io/webcenter/activemq
2.运行容器
docker run -d --name activemq -p 61616:61616 -p 8161:8161 docker.io/webcenter/activemq
3.查看容器是否运行正常
docker ps

61616:接收请求端口
8161:activemq客户端端口
客户端默认账号密码:admin admin

2.3 测试代码

2.3.1 点对点

直到消费者消费了提供者发布的消息,消息才消失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    @Test
public void producer(){
// jms支持消息
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.66.66:61616");
try {
Connection connection;
connection = connectionFactory.createConnection();
connection.start();
// 是否开启事务:true,否,则第二个参数没有意义;false时第二个参数有意义 消息的应答模式:自动应答、手动应答
Session session;//自动应答
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列 点对点
Queue queue=session.createQueue("test_queue");
//创建消息生产者
MessageProducer producer = session.createProducer(queue);
//创建消息
TextMessage message=session.createTextMessage("你好这是第一个消息队列");
// 发送消息
producer.send(message);
producer.close();
session.close();
connection.close();
//
} catch (JMSException e) {
e.printStackTrace();
}

}
@Test
public void consumer ()throws Exception{
// jms支持消息
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.66.66:61616");
Connection connection;
connection = connectionFactory.createConnection();
connection.start();
// 是否开启事务:true,否,则第二个参数没有意义;false时第二个参数有意义 消息的应答模式:自动应答、手动应答
Session session;//自动应答
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列 点对点
Queue queue=session.createQueue("test_queue");
//创建消息生产者
MessageConsumer consumer = session.createConsumer(queue);

//消息监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage= (TextMessage) message;
String text= null;
try {
// 打印消息
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}

}
});
System.in.read(); //相当于把这个方法抱在了while(true)里面
consumer.close();
session.close();
connection.close();

//


}

2.3.2 发布订阅

直到消费者如果没有在生产者发布消息时及时消费那么消费者将消费不到信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    @Test
public void producer ()throws Exception{
// 收发消息的端口号61616
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.66.66:61616");
Connection connection=connectionFactory.createConnection();
connection.start();
// 是否开启事务
// true代标开启事务第二个参数就忽略了
// false 第二参数才有意义 消息的应答模式 :自动应答或手动应答
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
// 点对点
// Queue queue=session.createQueue("test_queue");
// 发布订阅
Topic topic=session.createTopic("test_topic");
// 创建一个生产者
MessageProducer messageProducer=session.createProducer(topic);
TextMessage message=session.createTextMessage("hello i am hayabusa");
messageProducer.send(message);
messageProducer.close();
session.close();
connection.close();
}
@Test
//创建消费者
public void consumer()throws Exception{
// 收发消息的端口号61616
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.66.66:61616");
Connection connection=connectionFactory.createConnection();
connection.start();
// 是否开启事务
// true代标开启事务第二个参数就忽略了
// false 第二参数才有意义 消息的应答模式 :自动应答或手动应答
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建队列
// 点对点
// Queue queue=session.createQueue("test_queue");
//发布订阅
Topic topic=session.createTopic("test_topic");
MessageConsumer messageConsumer=session.createConsumer(topic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage= (TextMessage) message;
try {
String text=textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}

2.4 整合到springboot

2.4.1生产者消费者项目的application.properties分别写上

1
2
3
4
5
6
7
8
9
10
#收发信息的地址
server.port=8081
spring.activemq.broker-url=tcp://192.168.66.66:61616

spring.activemq.user=admin
spring.activemq.password=admin

#默认使用点对点 ,如果想使用发布订阅改为true
spring.jms.pub-sub-domain=false

2.4.2 启动类分别加上

1
@EnableJms

2.4.3 生产者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Controller
public class ProducerController {
// 注入消息模板
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("sendMsg")
@ResponseBody
public String sendMsg()throws Exception{
Queue queue=new ActiveMQQueue("test-queue"); //队列名称
Topic topic=new ActiveMQTopic("test-topic"); //队列名称
jmsMessagingTemplate.convertAndSend(queue,"test-message");
jmsMessagingTemplate.convertAndSend(topic,"test-message");
return "success";


}
}

2.4.4 消费者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class Consumer {
@JmsListener(destination = "test-queue")
public void getMessage(String text){
//对象匹配进行绑定
System.out.println("接收到的test_queue的消息"+text);

}
@JmsListener(destination = "test-topic")
public void getMessage2(String text){
//对象匹配进行绑定
System.out.println("接收到的test_topic的消息"+text);

}
@JmsListener(destination = "test-topic")
public void getMessage3(String text){
//对象匹配进行绑定
System.out.println("接收到的test_topic的消息"+text);

}
}



文章作者: 哈雅布撒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 哈雅布撒 !
  目录