屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型 官网: https://spring.io/projects/spring-cloud-stream#overview https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/ 中文: https://m.wang1314.com/doc/webapp/topic/20971999.html 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
标准MQ 配置 POM <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency> YML server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址 示例 消息发送 接口 public interface IMessageProvider{public String send() ;} 实现类 @EnableBinding(Source.class) // 可以理解为是一个消息的发送管道的定义public class MessageProviderImpl implements IMessageProvider{@Resourceprivate MessageChannel output; // 消息的发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息System.out.println("***serial: "+serial);return serial;}} Controller @RestControllerpublic class SendMessageController{@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage(){return messageProvider.send();}} 配置RabbitMQ可视化插件 rabbitmq-plugins enable rabbitmq_management http://localhost:15672/ 消息消费者 POM <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--基础配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency> Controller @Component@EnableBinding(Sink.class)public class ReceiveMessageListenerController{@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);}} 遇到的问题 有重复消费问题消息持久化问题 复现比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息, 那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。 这时我们就可以使用Stream中的消息分组来解决 注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。 不同组是可以全面消费的(重复消费), 同一组内会发生竞争关系,只有其中一个可以消费。
解决:修改YML添加group
spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置group: atguiguA同一个消费组的多个微服务实例,每次只会有一个拿到
注意没有分到消费组中,不会持久化,会丢失未曾消费消息