什么是SpringAMQP
AMQP(高级消息队列协议)
是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求
Spirng AMQP
基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
利用SpringAMQP实现基础消息队列功能
发送消息
- 在父工程中引入spring-amqp依赖
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
接受消息
- 引入amqp的starter依赖
- 配置RabbitMQ地址
- 定义类,添加@Component注解
- 类中声明方法,添加@RabbitListener注解,方法参数就是消息
Work Queue 工作队列
与基础消息队列区别:多个消费者,可以提高消息处理速度,避免队列消息堆积
消费预取限制
配置文件设置preFetch值,可以控制预取消息上限
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
发布、订阅模型(Publish&Subscribe)
允许将同一消息发送给多个消费者,实现方式是加入了交换机(exchange)
实现方式:通过交换机发送给多个队列
常见交换机类型:
- Fanout:广播
- Direct:路由
- Topic:话题
注意:exchange负责消息路由,只负责转发消息,路由失败则消息丢失
发布订阅-FanoutExchange
会将接收到的消息路由到每一个与其绑定的queue
实现思路如下:
- 在cunsumer服务中,声明队列、交换机,并将两者绑定
- 在consumer服务中,编写消费者方法,分别监听各个队列
- 在publisher中编写测试方法,发送消息
发布订阅-DirectExchange
会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式
- 每一个Queue都与Exchange设置一个BindingKey,可以绑定多个
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey宇RoutingKey一致的队列
1 2 3 4 5
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} ))
|
发布订阅-TopicExchange
与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
Queue与Exchange指定BindingKey时可以使用通配符:
1 2 3 4 5
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" ))
|
消息转换器
消息发送Java对象时,SpringAMQP会帮我们序列化为字节后发送,基于JDK的ObjectOutputStream完成序列化
如果要修改只需要定义一个MessageConverter的Bean即可。推荐使用JSON方式序列化,步骤如下:
发送消息
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
- 在publisher服务声明MessageConverter
1 2 3 4
| @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); }
|
接收消息
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
- 在consumer服务定义MessageConverter:
1 2 3 4
| @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); }
|