提出问题

目前市面上常用的四种消息中间件:ActiveMQ、RabbitMQ、RocketMQ、Kafka。由于每个项目需求的不同,在消息中间件的选型上也就会不同。

在项目开发中:多部门配合,MQ差异化带来的联调问题。A部门使用 RabbitMQ 进行消息发送,大数据部门却用 Kafka, MQ 选型的不同,MQ 切换、维护、开发等困难随之而来。

那有没有一种技术,可以让我们不再关注 MQ 的细节,只需要用一种适配绑定的方式,就可以帮助我们自动的在各种 MQ 之间切换呢?Spring Cloud Stream 消息驱动应运而生

Spring Cloud Stream 消息驱动,它可以屏蔽底层 MQ 之间的细节差异。我们只需要操作Spring Cloud Stream 就可以操作底层多种多样的MQ。从而解在 MQ 切换维护开发 方面的难度问题。

SpringCloud Stream

官方定义:Spring Cloud Stream 是一个 构建消息驱动微服务的框架

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中的 binder 对象交互。通过我们的配置来进行 binding(绑定), 然后 Spring Cloud Stream 通过 binder 对象与消息中间件交互。我们只需要搞清楚如何与 Spring Cloud Stream 交互,就可以方便使用消息驱动的方式。

Spring Cloud Stream 通过使用 Spring Integration 来连接消息代理中间件,以实现消息时间驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动配置发现,引用了 发布-订阅消费组分区 三个核心概念。

目前仅支持 RabbitMQ、Kafka。

也就是说: Spring Cloud Stream 屏蔽了底层消息中间件的差异,降低 MQ 切换成本,统一消息的编程模型。开发中使用的就是各种 xxxBinder

设计思想

标准的MQ

生产者和消费者之间靠消息媒介传递信息内容

消息必须走特定的通道:MessageChannel

消息通道里的消息如何被消费:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

Spring Cloud Stream

假如我们用到了 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同。像 RabbitMQexchangeKafkaTopicPartions 分区的概念。

这些中间件的差异性,给我们实际项目的开发造成了一定的困扰。我们如果用了两个消息队列中的其中一个,后面的业务需求如果向往另外一种消息队列进行迁移,这需求简直是灾难性的。因为它们之间的耦合性过高,导致一大堆东西都要重新推到来做,这时候 Spring Cloud Stream 无疑是一个好的选择,它为我们提供了一种解耦合的方式。

Spring Cloud Stream如何统一底层差异

在没有绑定器这个概念的情况下,我们的 Spring Boot 应用直接与消息中间件进行信息交互时,由于个消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

通过定义绑定器(Binder)作为中间层,就可以完美的实现应用程序与消息中间件细节的隔离。

通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要在考虑各种不同的消息中间件的实现。

Spring CLoud Stream标准流程

  1. Source/Sink:Source 输入消息,Sink 输出消息
  2. Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel 对队列进行配置;
  3. Binder:很方便的 连接中间件,屏蔽 MQ 之间的差异

常用的注解

注解说明
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding通道Channel和exchange绑定在一起

测试开发

生产者

消息生产者模块,命名为:cloud-stream-rabbitmq-provider8801

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <!--stream-rabbitmq-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.yml,详细看注解

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此处配置绑定的rabbitmq的服务信息
        defaultRabbit: #表示定义的名称,用于与binding整合
          type: rabbit #消息组件类型
          environment: #设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 192.168.56.10
                port: 5672
                username: guest
                password: guest
      bindings: #服务的整合处理
        output: #通道名称
          destination: studyExchange #使用Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本可设置为text/plain
        binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
  client: #客户端进行eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2  #设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  #在信息列表时显示主机名称
    prefer-ip-address: true     #访问的路径变为IP地址

发送接口的实现

public interface IMessageProvider {
    String send();
}

实现

@EnableBinding(Source.class) //定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; // 消息发送通道

    @Override
    public String send() {
        String s = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(s).build());
        System.out.println("******s: " + s);
        return null;
    }
}

控制层

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}

启动类

@EnableEurekaClient
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

启动RabbitMQ、Eureka模块和Stream模块,连续点击 http://localhost:8801/sendMessage 进行消息发送,可以看到后台有显示发送消息,进入 RabbitMQ 可视化界面,可以看到有发送消息波峰出现。

消费者

消息消费者模块,命名为:cloud-stream-rabbitmq-provider8802

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <!--stream-rabbitmq-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

applicaiton.yml 配置

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合(可以自定义名称)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.56.10
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置(需与自定义名称一致)(飘红:Settings->Editor->Inspections->Spring->Spring Boot->Spring Boot application.yml 对勾去掉)
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

业务层,加个@Component,让这个类注入为bean一个组件,你要写成别的注解也行。

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("1号消费者,接收:"+message.getPayload()+"\t port:"+serverPort);
    }
}

启动类

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

依旧连续点击 http://localhost:8801/sendMessage

8802成功接收到。

消息重复消费

上述情况,只有一个生产者、一个消费者,并不会发现有问题存在。此时如果来两个消费者(8802、8803集群同时存在),就会出现重复消费的情况,这也是rabbitmq一种非常常见的情况。你可以搭建一个8003测试,做法跟上面8002一样。

当集群方式进行消息消费时,就会存在 消息的重复消费问题比如订单库存相关消息,购物完成库存 -1,消息重复消费就会导致库存不准确问题出现,这显然是不能接受的。

这是因为没有进行分组的原因,不同组就会出现重复消费;同一组内会发生竞争关系,只有一个可以消费。 如果我们不指定(8802、8803)集群分组信息,它会默认将其当做两个分组来对待。这个时候,如果发送一条消息到 MQ,不同的组就都会收到消息,就会造成消息的重复消费。

解决方式很简单,只需要用到 Stream 当中 group 属性对消息进行分组即可。将8802、8803分到一个组即可。

只要是一个组的消费者,就处于竞争关系,一次只能有一个去消费,这就可以解决重复消费的问题了。(项目中,是否分组就视业务情况而定)

值得一提的是:分组(group)还解决了持久化的问题噢。


Last modification:August 19, 2020
如果觉得我的文章对你有用,请随意赞赏