Spring Cloud Alibaba技术栈(下)

时间:2022-07-26
本文章向大家介绍Spring Cloud Alibaba技术栈(下),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Spring Cloud Alibaba

承接上面的Nacos,Sentinel的学习, 现在开始学习Seata, Dubbo和RocketMQ

Seata 分布式事务框架

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 Seata 官网: http://seata.io/zh-cn/index.html

Seata 分布式事务原理

整体机制 (两阶段提交协议的演变)

  • 一阶段: 业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段: 提交异步化,非常快速地完成。 回滚通过一阶段的回滚日志进行反向补偿。

官方文档: http://seata.io/zh-cn/docs/overview/what-is-seata.html

Seata Server 安装

下载地址:http://seata.io/zh-cn/blog/download.html

启动 Seata Server 进入 bin 目录中, 在 window 下启动 seata-server.bat, 在 linux 下启动 seata-server.bat

Seata 案例代码

测试框架搭建

项目架构

当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

四个微服务项目负责的业务

  • business-service 下单服务,
  • 调用 storage-service 删减库存,
  • 调用 order-service 创建订单,
  • 调用 account-service 扣除账户余额

项目搭建流程

  1. 如上图所示, 在根项目spring-cloud-alibaba-examples下创建父项目seata-examples, 修pom文件添加依赖
<dependencies>
        <!--服务注册-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!--Seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        <!--web 项目的基础依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
  1. 创建子模块项目account-service,修改pom文件并添加依赖
  2. 创建子模块项目order-service,修改pom文件并添加依赖
  3. 创建子模块项目business-service,修改pom文件并添加依赖
  4. 创建子模块项目storage-service(对库存的扣减), 修改pom文件并添加依赖 以该微服务创建流程为例, 演示项目搭建步骤 <dependencies> <!--服务注册--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId> </dependency> <!--Seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <!--web 项目的基础依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
  5. 导入数据库脚本到mysql
  1. 代码生成 a.在mysql中右侧的database中添加数据库连接, 输入数据库相关属性, 然后根据提示下载驱动

b.连接成功后便可在idea上面查看数据库的相关信息

c.安装 MyBatis插件(自动生成实体类以及mapper接口和xml映射文件)

d. 右击选择的表进行生成代码操作

e. 填写代码生成所在位置的信息

f.编写库存操作的接口和实现类

public interface StorageService {

    /**
     * 完成对商品库存扣减操作
     * @param productNo
     * @param count
     */
    void deduct(String productNo, int count) ;
}


@Service
public class StorageServiceImpl implements StorageService {

    @Autowired
    private StorageTblDao storageTblDao;

    private static Logger logger = LoggerFactory.getLogger(StorageServiceImpl.class);

    @Transactional
    @Override
    public void deduct(String productNo, int count) {
        logger.info("开始扣减商品{}的库存, 数量为{}", productNo, count);
        //1.查询库存
        //StorageTbl storageTbl = storageTblDao.selectByPrimaryKey(Integer.parseInt(productNo));
        StorageTblExample storageTblExample = new StorageTblExample();
        storageTblExample.createCriteria().andCommodityCodeEqualTo(productNo);
        List<StorageTbl> storageTbls = storageTblDao.selectByExample(storageTblExample);
        StorageTbl storageTbl = storageTbls.get(0);

        if (storageTbl==null){
            throw new IllegalArgumentException("商品不存在");
        }
        //2.扣减操作(扣减后的金额)
        int idleCount =storageTbl.getCount()-count;
        if (idleCount<0){
            throw new RuntimeException("存库不足!");
        }
        //3.设置商品库存
        storageTbl.setCount(idleCount);
        //4.保存到数据库中
        storageTblDao.updateByPrimaryKeySelective(storageTbl);

        logger.info("扣减库存商品{}成功, 剩余的库存为{}",productNo, idleCount);
    }
}

g.启动类

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("ah.szxy.mapper")
@RestController
public class StorageServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(StorageServiceApplication.class,args);
    }

    @Autowired
    private StorageService storageService;

    //使用外部接口暴露
    @GetMapping("/deduct/{productNo}/{count}")
    public ResponseEntity<Void> deduct(@PathVariable("productNo") String productNo, @PathVariable("count") Integer count){
        storageService.deduct(productNo, count);
        return ResponseEntity.ok().build();
    }
}

h.pom文件

server:
  port: 8093

spring:
  application:
    name: storage-service
  cloud:
    nacos:
      discovery:
        server-addr: 47.97.169.52:8848
    alibaba:
      seata:
        tx-service-group: ${spring.application.name}
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    name: storageDataSource
    type: com.alibaba.druid.pool.DruidDataSource
    url: jdbc:mysql://119.45.189.14:3306/seata?useSSL=false&serverTimezone=UTC
    username: root
    password: root123
    druid:
      max-active: 20  # 最大连接数
      min-idle: 2     # 最小活跃数
      initial-size: 2 # 初始连接数
seata:
  service:
    vgroup-mapping:
      storage-service: default
    grouplist:
      default: 127.0.0.1:8091  # seata默认端口
    disable-global-transaction: false # 开启全局事务
  enabled: true  # 开启 seata

mybatis-plus:
  mapper-locations: classpath:/mapper/*.xml

i.启动并访问测试即可

当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

测试分布式事务

  1. 如图所示, 启动所有项目, 待所有项目启动完毕后测试分布式事务
  1. 初始化三个表的数据 账户表

订单表

库存表

  1. 访问下单接口 注意在这里如果访问 SXT_USER_2用户就会报错, 因此我们可以分别测试下面两个url, 看看出现错误时是否回滚
http://localhost:8096/purchase/SXT_USER_2/HUAWEI_0001/1  //经测试,发生;额回滚
http://localhost:8096/purchase/SXT_USER_1/HUAWEI_0001/1  //未发生回滚

Dubbo Spring Cloud

是什么

Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人 员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。 Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。 Dubbo Spring Cloud 首个 Preview Release,随同 Spring Cloud Alibaba 0.2.2.RELEASE 和 0.9.0.RELEASE 一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)

功能完成度

由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可 认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟 的实现

框架的搭建

我们将搭建如图所示的项目框架

搭建的代码已分享至码云: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

测试

  1. 测试消费者项目能不能消费到提供者的项目, 访问 http://localhost:8080/rpc/csdn-timepause
  2. 启动多个服务提供者项目, 方式如下

修改启动类的名称, 添加指定端口的参数

启动这三个提供者项目, 并重启消费者项目

Nacos上面可以看到该项目的详细信息

  1. 重复访问步骤1的url, 可以看到消费者在随机的访问提供者 由此可以知道Dubbo SpringCloud对服务进行了负载均衡(自动), 且无需任何配置

RocketMQ

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、 高可靠的消息发布与订阅服务。 同时,广泛应用于多个领域,包括异步通信解耦、企业解决 方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

下载 RocketMQ

//这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
下载地址:  http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
  1. 配置环境变量
变量名: ROCKETMQ_HOME
变量值: D:SoftWarerocket-mqrocketmq-all-4.4.0-bin-release //(RocketMq软件所在目录) 
  1. 启动mqnamesrv.cmd 方法: win+R 输入 cmd, 将mqnamesrv.cmd文件拖到cmd命令行中回车即可(退出只需将该命令行关闭即可)
  2. 启动mqbroker.cmd 方法: win+R 输入 cmd, 将mqbroker.cmd文件拖到cmd命令行中, 然后输入 -n 主机名:端口号,然后回车即可(退出同上)
  1. 启动图形化界面 方法: win+R 输入 cmd, 首先输入java -jar , 然后将rocketmq-console-ng-1.0.0.jar文件拖到cmd命令行中, 回车即可(RocketMQ和图形化软件会分享在底部)
  1. 访问图形化界面 输入 http://localhost:8080/ ,页面右上角可以进行中英文切换

SpringCloud Stream

介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。 Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe(发布订阅)、 consumer groups(消费者组)、partition(分区) 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding:

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。 举例说明: Kafka的实现KafkaMessageChannelBinder, RabbitMQ 的实现RabbitMessageChannelBinder 以及 RocketMQ 的实现RocketMQMessageChannelBinder
  • Binding: 包括 Input Binding 和 Output Binding。 Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

测试Demo环境搭建

按照下面的结构图, 创建pom类型的父项目, 创建两个子模块项目, 添加对应的Maven依赖, 修改配置文件, 编写测试代码, 最后启动项目 项目地址: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git

配置文件

logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG

#rocketmq 服务器 namerserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

#stream->bindings->output(input)
#output1
#发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
#消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
#生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
#消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true

#output2 主要演示事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
#发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup

#output3 用它演示消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group

spring.application.name=rocketmq-produce-example

server.port=28081

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
  1. 消息提供者项目启动完毕后, 可以测试 测试普通字符串消息

测试带tag的消息

测试发送对象消息

测试发送事务消息(half)=>需要创建事务消息监听后才能发送成功(稍后演示)

测试发送消息到pull 的目的地址,为了演示我们消息的手动拉取

添加事务监听

package ah.szxy.listener;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@RocketMQTransactionListener(
        txProducerGroup = "myTxProducerGroup" ,
        corePoolSize = 2 ,
        maximumPoolSize = 5
)
public class RockerMQLocalTransactionListenerImpl implements RocketMQLocalTransactionListener {

    /**
     * 当我们发送半(half)消息成功后,mq 服务要求我们执行本地的事务,并且返回本地事务的执行结果
     * RocketMQLocalTransactionState:
     *  COMMIT 提交->其他的消费者将收到该消息
     *  ROLLBACK 回滚->mq ->半消息删除
     *  UNKNOWN -> mq 会再次检查本地的事务->checkLocalTransaction
     *
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String type = message.getHeaders().get("type").toString();
        switch (type){
            case "1" :
                System.out.println("本地事务执行状态未知");
                return RocketMQLocalTransactionState.UNKNOWN ;
            case "2":
                System.out.println("本地事务执行状态成功");
                return RocketMQLocalTransactionState.COMMIT ;
            case "3":
                System.out.println("本地事务执行状态失败");
                return RocketMQLocalTransactionState.ROLLBACK ;
        }
        return null ;
    }

    /**
     * 当mq 收到我们的本地的事务为UNKNOWN ,它会再次来检查我们的本地事务状态,要求返回一个本地事务的状态
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("来检查了,本次我提交本地事务");
        return RocketMQLocalTransactionState.COMMIT;
    }
}
  1. 运行测试 测试提交成功的情况

测试回滚的情况

测试再次检查本地的事务的情况

Consumer项目的完善

点击查看项目地址

配置文件application.properties

#rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876

#stream->bindings->input

#input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true


#input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1

#input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20

#input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5


#input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group

spring.application.name=rocketmq-consume-example

server.port=28082

management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

测试Consumer项目

  1. 测试input1(接收字符类型)

2. 测试input2(接收tag类型)

3.测试input3(接收对象类型)

4.测试input4(接收事务类型, 成功/失败/确认成功还是失败)


相关软件分享如下

链接:https://pan.baidu.com/s/1o0WausCDJ6PA4OIpaz8qYA 提取码:d7wr