什么是SpringAMQP
AMQP(高级消息队列协议)
是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求
Spirng AMQP
基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
利用SpringAMQP实现基础消息队列功能
发送消息
- 在父工程中引入spring-amqp依赖
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
接受消息
- 引入amqp的starter依赖
- 配置RabbitMQ地址
- 定义类,添加@Component注解
- 类中声明方法,添加@RabbitListener注解,方法参数就是消息
Work Queue 工作队列
与基础消息队列区别:多个消费者,可以提高消息处理速度,避免队列消息堆积
消费预取限制
配置文件设置preFetch值,可以控制预取消息上限
1 | spring: |
发布、订阅模型(Publish&Subscribe)
允许将同一消息发送给多个消费者,实现方式是加入了交换机(exchange)
实现方式:通过交换机发送给多个队列
常见交换机类型:
- Fanout:广播
- Direct:路由
- Topic:话题
注意:exchange负责消息路由,只负责转发消息,路由失败则消息丢失
发布订阅-FanoutExchange
会将接收到的消息路由到每一个与其绑定的queue
实现思路如下:
- 在cunsumer服务中,声明队列、交换机,并将两者绑定
- 在consumer服务中,编写消费者方法,分别监听各个队列
- 在publisher中编写测试方法,发送消息
发布订阅-DirectExchange
会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式
- 每一个Queue都与Exchange设置一个BindingKey,可以绑定多个
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey宇RoutingKey一致的队列
1 | (bindings = ( |
发布订阅-TopicExchange
与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
Queue与Exchange指定BindingKey时可以使用通配符:
1 | (bindings = ( |
消息转换器
消息发送Java对象时,SpringAMQP会帮我们序列化为字节后发送,基于JDK的ObjectOutputStream完成序列化
如果要修改只需要定义一个MessageConverter的Bean即可。推荐使用JSON方式序列化,步骤如下:
发送消息
- 在publisher服务引入依赖
1 | <dependency> |
- 在publisher服务声明MessageConverter
1 |
|
- 发送消息即可
接收消息
- 在consumer服务引入Jackson依赖
1 | <dependency> |
- 在consumer服务定义MessageConverter:
1 |
|
- 接收消息即可