一个业务服务会被拆分成多个微服务,各个服务之间相互通信完成整体的功能,通常有:

  • Http/RPC通信
    优点:实时通信
    缺点:服务之间耦合性高
  • 消息通信
    优点:降低耦合,提高系统处理能力
    缺点:非实时通信

提高系统处理能力为非核心业务从主流程中剥离出来异步处理,节省核心业务资源时间

什么是RocketMQ

低延迟、高可能、可伸缩、易于使用的分布式消息中间件(也称作消息队列),具有高吞吐、低延迟、海量消息堆积等优点,同时提供顺序消息、事务消息、定时消息、消息重试与追踪等功能,非常适合在电商、金融等领域广泛使用
Alibaba开源项目,后捐赠给Apache基金会
官网地址 http://rocketmq.apache.org/
image.png

RocketMQ的应用场景

  • 削峰填谷
    提供削峰填谷保护高流量导致系统超负荷甚至崩溃
  • 异步解耦
    实现异步通信和应用解耦,确保主站业务的连续性
  • 顺序收发
    提供顺序消息保证消息的FIFO(First In First Out,先进先出),保证顺序
  • 分布式事务的一致性
    RocketMQ的分布式事务可以实现系统解耦,也可以保证最终数据一致性
  • 大数据分析
    RocketMQ与流式计算引擎相结合,可以很方便地实现对业务数据进行实时分析
  • 分布式缓存同步
    RocketMQ构建分布式缓存突破带宽瓶颈

RocketMQ安装

image.png
官网介绍有2中安装方式

  • 下载源码编译
  • 下载编译好的二进制文件
    image.png
  • 编译
    image.png
    编译后的文件目录
    image.png
  • 启动
    1.WIN下启动报错,没有配置环境变量,懒得配置了
    image.png
    2.换Linux,启动又报错...一般是shell脚本保存文件格式错误
    image.png
    image.png
    执行set ff=unix
    话说官方脚本不改的嘛???
    或者下载dos2unix软件
    image.png
    继续换WIN,设置环境变量把。。。
    image.png
  • 启动集群管理NameServer,默认端口为9876
    image.png
  • 启动消息服务器Broker,指定NameServer的IP地址端口
    image.png
    报错
    image.png
    修改上图更改为set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
    image.png
    成功启动

RocketMQ发送消息

Spring Cloud Alibaba已经集成RocketMQ

  • 父工程依赖
 <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.1.11.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2.1.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
  • 消息发送端依赖
<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

其他web模块依赖省略

  • 配置文件参数
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          #RocketMQ的NameServer地址端口
          name-server: 127.0.0.1:9876
      bindings:
        output:
          #将名称指定为demo-group的Binding消息发送到TopicTest
          destination: TopicTest
          group: demo-group
server:
  port: 8081
  • 启动类
@SpringBootApplication
//Source为binding绑定的output
@EnableBinding({Source.class})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Source为SpringCloud的消息通道类名称,可以自定义实现多个消息通道
image.png

  • 发送消息类
/**
 * @Author jtao
 * @Date 2021/3/7 15:42
 * @Description
 */
@RestController
public class SendController {
    @Autowired
    private Source source;

    @GetMapping("/send")
    public String send(String msg) {
        MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(msg);
        Message<String> message = messageBuilder.build();
        source.output().send(message);
        return "Hello RockerMQ Binder,send" + msg;
    }
}

image.png

自定义实现多个消息通道

模仿org.springframework.cloud.stream.messaging.Source类

/**
 * @Author jtao
 * @Date 2021/3/7 15:51
 * @Description
 */
public interface ProducerSource {
    String OUTPUT = "producerOutput";

    @Output("ProducerSource.OUTPUT")
    MessageChannel output();
}

RocketMQ消费消息

  • 引入依赖
 <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
  • 配置文件参数
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          destination: TopicTest
          group: demo-group


server:
  port: 8082
  • 启动类
@EnableBinding({Sink.class})

@SpringBootApplication
public class Application {
    //监听器监听该通道
    @StreamListener(value = Sink.INPUT)

    public void receive(String receiveMsg) {
        System.out.println("receive: " + receiveMsg + System.currentTimeMillis());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

除了监听默认通信通道,也可以监听自定义通道,模仿Sink接口,将自定义接口加入注解参数
image.png

整体流程

  • 发送消息
    image.png
  • 消费消息
    image.png

Spring Cloud Alibaba RocketMQ

Spring Cloud Stream是Spring Cloud体系内框架,简化消息业务再Spring Cloud应用程序中的开发
image.png
image.png
image.png
应用程序通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定的中间件绑定器Binder实现连接到外部代理

Spring Cloud Stream核心组成

Spring Cloud Stream的实现基于发布/订阅机制,核心模块由4部分构成:Spring Framework中的Spring Messaging和Spring Integration
Spring Cloud Stream中的Binders和Bindings

  • Spring Messaging
    Spring Framework中的统一消息编程模型
  • Spring Integration
    Spring Framework中用于支持企业继承的扩展机制,提供一个简单的模型来构建企业集成解决方案
  • Binders
    目标绑定器,负责与外部消息中间件系统集成
  • Bindings
    外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁

Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka个RabbitMQ
Spring Cloud Alibaba加入了RocketMQ Binder,用于将RocketMQ集成到Spring Cloud Stream

Spring Cloud Alibaba RocketMQ架构图

image.png

  • MessageChannel(output):消息通道
    用于发送消息
  • MessageChannel(input):消息通道
    用于订阅消息
  • Binder bindProducer:目标绑定器
    将发送通道发过来的消息发送到RocketMQ消息服务器
  • Binder bindConsuner:目标绑定器
    将接收到RocketMQ消息服务器的消息推送给订阅通道

以上全部按照Spring Cloud Stream的标准协议实现

Spring Cloud Stream消息发送流程

包括发送、订阅、分发、委派、消息处理等
image.png

具体代码实现

  • 1.在业务代码中调用MessageChannel接口Send方法
    image.png
  • 2.调用AbstractMessageChannel提供的发送和接收消息方法
    image.png
    image.png
    消息发送到具体实现类AbstractSubscribableChannel
    image.png
  • 3.消息通过分发类接口MessageDispatcher
    image.png
    //TODO 不写了,太绕了!!!

RocketMQ Binder集成消息发送

//TODO

RocketMQ Binder集成消息订阅

//TODO

Spring Cloud Stream消息订阅流程

//TODO

RocketMQ集群管理

RocketMQ的整体架构涉及、集群管理及一些重要的概念

整体架构设计

Kafka也是一款消息中间件,但不支持消费失败重试、定时消息、事务消息、顺序消息也有缺陷(无法保证消息消费顺序),于是Alibaba用Java编写了RocketMQ
常见的消息中间件Kafka、RabbitMQ、RocketMQ都基于发布/订阅机制,消息发送者(Producer)把消息发送到消息服务器,消息消费者(Consumer)从消息服务器订阅感兴趣的消息。
客户端:消息发送和消费者都是客户端
服务端:消息服务器
二者需要注册中心感知对方的存在

RocketMQ部署架构图

image.png

  • Producer:消息发布者
    把消息发送到Broker,支持集群
  • Consumer:消息消费者
    从Broker订阅消息消费,支持集群
  • Broker:消息存储者
    负责消息存储、投递、查询及服务高可用保证,支持集群
  • NameServer:服务管理者
    管理Broker集群路由信息,类似于Dubbo的Nacos、ZooKeeper注册中心,支持集群,主要功能为:
    1.服务注册,NameServer接收Broker集群的注册信息,保存下来作为路由信息的基本数据,并提供心跳检测机制,检查Broker是否还存活。
    2.路由信息管理:NameServver保存了Broker集群的路由信息,用于提供给客户端查询Broker的队列信息,从而进行消息的投递和消费

基本概念

  • Message:消息
    系统所传输信息的物理载体,生产和消费数据的最小单位
  • Topic:主题,逻辑概念
    某一类消息的集合,每个主题都包含若干条消息,每条消息都只有一个主题,是RocketMQ进行消息订阅的基本单位
  • Queue:消息队列,物理存储
    组成Topic的最小单元,一个Topic对应多个Queue,消费者消费Topic消息时底层拉取的是Queue消息
  • Tag:消息设置标志
    用于同一主题下区分不同类型的消息,能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统
  • UserProperties:自定义属性集合
    属于Message的一部分
  • ProducerGroup:同一类Producer集合
    该集合中的Produce发送同一类消息且逻辑一致
  • ConsumerGroup:同一类Consumer的集合
    该集合Comsumer通常消费同一类消息且消费逻辑一致,实现负载均衡和容错

为什么放弃ZooKeeper选择NameServer

在Kafaka中的服务注册与发现通常用ZooKeeper中间件,早期RockerMQ也使用ZooKeeper做集群管理

Kakfa与RocketMQ异同

  • Kafa中,Master和Slave在同一台Broker机器上
    Broker机器上有多个分区,每个分区的Master/Slave身份是在运行过程中选举出来的。ZooKeeper集群中多个实例必须相互通信,如果实例过多,网络通信会变得复杂低效
  • RocketMQ中,Master和Slave不在同一台Broker机器上
    每台Broker机器的Mster/Slave身份是配置预先定义好的,各个实例互相不进行通信。NameServer是无状态的,可以部署多个,Broker向每一台NameServer注册自己信息即可

如何实现顺序消息

官方详细示例 https://github.com/apache/rocketmq/tree/master/example
顺序消息是业务中常用功能

顺序消息的使用场景

交易系统的订单创建、支付、退款等流程需要保证先进先出(FIFO)

发送和消费顺序消息

  • 发送者
    代码略,不同点
    1.配置文件中指定参数sync=true,把默认的异步更改为同步发送
    2.MessageBuilder设置Header信息头,标识为一条顺序消息
  • 接收者
    代码略,不同点
    1.更改配置文件将默认的并发消费更改为顺序消费

顺序消息发送原理

RocketMQ顺序消息分为局部有序和全局有序

  • 局部有序:发送同一个队列的消息有序
    例如同一个订单ID的消息保证有序,不同订单ID的消息没有约束,相互不影响,并行

  • 全局有序:应用场景少,性能差,不推荐
    RocketMQ消息发送方式有3种:

  • 同步:
    发送网络请求后同步等待Broker服务器返回结果,支持发送失败重试,适用于较重要的消息通知场景

  • 异步:
    异步发送网络请求,不阻塞当前线程,不支持失败重试,适用于对响应时间要求高的场景

  • 单向:
    原理和异步一致,不支持回调。适用于响应时间非常短、可靠性要求不高场景,例如日志

  • 顺序消息发送原理:
    同一类消息发送到相同的队列。使用同步发送方式,保证先发送的消息先存储到消息队列

普通发送技术原理(顺序、事务、延迟三种特殊消息外)

RocketMQ除顺序、事务、延迟三种特殊消息外,其余区分为普通消息
日常开发最常用的是普通消息,因为最常见的使用场景就是系统间的异步解耦和流量的削峰填谷,保证高性能收发即可
普通消息在发送时选择消息队列的策略不同。有2种机制轮询机制和故障规避机制(故障延迟机制)。

  • 默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列
  • 缺点
    轮询算法简单好用,但乳沟轮询选择的队列是在宕机的Broker上,会导致发送失败,且无法规避发送失败的情况。

顺序消费技术原理

RocketMQ支持两种消息模式:集群消费和广播消费
//TODO

并发消费的技术原理

RocketMQ支持顺序消费和并发消费。默认消费为并发消费方式
并发消费也称为乱序消费,相比顺序想飞没有资源争抢上锁的过程,消费的速度比顺序消费要快很多
//TODO

消息的幂等性

如果一条消息被重复收到几次,是否会导致业务重复处理,Comsumer能否不重复接受消息
在业务层进行幂等性处理,可以通过分布式锁来完成
所有的消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)、exactly-only-once(精确仅一次)

  • 最多一次:消息投递后不论消费是否成功,不会再重复投递
    有可能会导致消息未被消费,RocketMQ未采用
  • 至少一次:消息投递消费完成后,向服务器返回确认消息,没有消费则一定不会返回
    如果网络等原因,服务器未收到返回信息则会再次投递,可能被重复消费。Rocket通过ACK确保消息至少被消费一次
  • 精确一次:需满足一下条件才能做到
    1.发送消息阶段,不允许发送重复消息
    2.消费消息阶段,不允许消费重复消费消息
    由于RocketMQ为了追求高性能,不保证此特性,无法避免消息重复,必须由业务上进行幂等性处理

如何实现事务消息

用Seata,它不香吗,略过🤣
//TODO

高性能设计

RocketMQ在稳定的基础上一直保持着非常高的性能。
体现在三个方面:数据储存设计、动态伸缩的能力、消息实时投递

高可用设计

系统的可用性越高,则平均无故障时间越长。
RocketMQ的高可用设计主要体现在4个方面

  • 消息发送的高可用
  • 消息储存的高可用
  • 消息消费的高可用
  • 集群管理的高可用

这个家伙很懒,啥也没有留下😋