0%

SpringAMQP

什么是SpringAMQP

AMQP(高级消息队列协议)

是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求

Spirng AMQP

基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

利用SpringAMQP实现基础消息队列功能

发送消息
  • 在父工程中引入spring-amqp依赖
  • 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
接受消息
  • 引入amqp的starter依赖
  • 配置RabbitMQ地址
  • 定义类,添加@Component注解
  • 类中声明方法,添加@RabbitListener注解,方法参数就是消息

Work Queue 工作队列

与基础消息队列区别:多个消费者,可以提高消息处理速度,避免队列消息堆积

消费预取限制

配置文件设置preFetch值,可以控制预取消息上限

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条,默认无限

发布、订阅模型(Publish&Subscribe)

允许将同一消息发送给多个消费者,实现方式是加入了交换机(exchange)

实现方式:通过交换机发送给多个队列

常见交换机类型:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

注意:exchange负责消息路由,只负责转发消息,路由失败则消息丢失

发布订阅-FanoutExchange

会将接收到的消息路由到每一个与其绑定的queue

实现思路如下:

  • 在cunsumer服务中,声明队列、交换机,并将两者绑定
  • 在consumer服务中,编写消费者方法,分别监听各个队列
  • 在publisher中编写测试方法,发送消息
发布订阅-DirectExchange

会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式

  • 每一个Queue都与Exchange设置一个BindingKey,可以绑定多个
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey宇RoutingKey一致的队列
1
2
3
4
5
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
发布订阅-TopicExchange

与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

Queue与Exchange指定BindingKey时可以使用通配符:

  • :代指0个或多个单词

  • *:代指一个单词
1
2
3
4
5
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))

消息转换器

消息发送Java对象时,SpringAMQP会帮我们序列化为字节后发送,基于JDK的ObjectOutputStream完成序列化

如果要修改只需要定义一个MessageConverter的Bean即可。推荐使用JSON方式序列化,步骤如下:

发送消息
  • 在publisher服务引入依赖
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
  • 在publisher服务声明MessageConverter
1
2
3
4
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
  • 发送消息即可
接收消息
  • 在consumer服务引入Jackson依赖
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
  • 在consumer服务定义MessageConverter:
1
2
3
4
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
  • 接收消息即可
------ THEEND ------

欢迎关注我的其它发布渠道