Spring Cloud Alibaba Seata

先简单了解一下,什么是分布式事务问题?

假设我们的应用,有订单模块order,会员模块account,库存模块storage

单模块应用:在最早单体应用中,这些业务操作都在一个系统内完成,直接本地一个事务提交

微服务应用:随着业务需求变化,单体应用被拆分成微服务应用,以上的三个模块变成了独立服务、独立数据源,此时,模块间rpc互相调用,那么问题来了,独立模块自身是可以保证自己的业务提交事务,那么怎么保证其他模块的事务一致性呢?

此处我们就产生了多数据源、多服务无法保证全局数据一致性的问题就是分布式事务问题

Seata是一款开源的分布式事务解决方案,为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案

基本原理:

把所有微服务的单独的事务当做一个branch transaction,本地事务还是遵循ACID的本地事务,再通过一个global transaction进行管理,在所有事务都提交之后,要么全体成功,要么全体失败

Seata中又三个组件完成整个操作

TC(Transaction Coordinator):事务协调器,维护全局事务运行状态,负责协调并驱动全局事务的提交合回滚

TM(Transaction Manager):事务管理器,控制全局事务,负责开启全局事务和提交或者回滚的决策者

RM(Resource Manager):资源管理器,控制分支事务,负责事务的注册、状态汇报,并接收事务协调器的指令驱动分支事务的提交或者回滚

分布式的执行过程:

(1)TM向TC申请开启一个全局事务,全局事务创建成功并返回一个全局唯一XID

(2)XID通过微服务链路的上下文传播

(3)RM向TC注册分支事务并关联上XID

(4)TM向TC发起XID的全局提交或者回滚决议

(5)TC调度XID下所有分支事务完成提交或者回滚

Seata AT模式

两段式提交,第一阶段所有业务数据和回滚日志在一个本地事务中提交并提交后释放资源,第二阶段提交异步化,如果回滚则通过第一阶段的回滚日志进行反向补偿(意思是,insert 变成delete,update 的数据调换顺序等等)

写隔离:(1)第一阶段提交前需要拿到全局锁;(2)拿不到锁则不能提交本地事务;(3)拿到全局锁的尝试有一定范围,即如果超出范围则回滚本地事务

场景:

读隔离:在本地事务隔离级别RC(读已提交)或者以上基础上,全局默认是RU(读未提交),通过SELECT FOR UPDATE 语句的执行会申请全局锁,如果全局锁被其他事务持有,则释放本地锁

Seata TCC模式

同样是两阶段模型,第一届阶段是prepare,第二阶段是commit或rollback行为,不同的是不需要依赖底层的本地ACID事务

Seata Saga模式

基于状态机引擎来实现:(1)通过状态图来定义服务的调用流程并生成json状态语言定义文件;(2)状态图中的一个节点调用一个服务,可以配置他的补偿节点;(3)状态图json由状态机引擎驱动执行,当出现异常时,状态引擎反向执行已成功节点对应的补偿将事务事务;(4)可以实现服务的编排、单项选择、并发、子流程、参数转换、异常捕获等

Seata XA模式

利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式

执行阶段:业务SQL操作放在XA分支上,等XA完成后执行XA prepare

完成阶段:分支提交或回滚执行XA分支的commit或rollback

我们简单做一个AT的演示,其实是跟按照官网的例子来,做了一个nacos + seata + spring cloud的整合,先提前准备

(1)mysql数据库(可以在docker跑,也可以是服务器或者本地)

(2)一个docker环境(如果可以最好是可运行docker-compose,之前的文章有提过

首先,我们需要把seata跑起来(官网文档:https://seata.io/zh-cn/docs/ops/deploy-by-docker-compose.html

我们选用外部数据的方式,也可以把配置注册到nacos上

(1)先正常跑起来docker-compose,我们采用的版本是最新版(具体看docker hub的tag)

version: "3.1"
services:
  seata-server:
    image: seataio/seata-server:latest
    ports:
      - "7091:7091"
      - "8091:8091"

正常运行起来之后,我们需要把里面的配置文件cp出来

具体位置是/seata-server/resources,命令:

docker cp 运行的镜像或者服务:seata-server/resources ./seata-server/resources(后面这里是需要复制的地方)

然后我们就能停止服务,打开刚刚复制出来的文件

我们需要修改一下application.yml文件,旁边的application.example.yml是作为参考的示例文件

#  Copyright 1999-2019 Seata.io Group.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${user.home}/logs/seata
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata

seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: file
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: file
  store:
    # support: file 、 db 、 redis
    mode: db
    db:
      datasource: druid
      dbType: mysql
      # 需要根据mysql的版本调整driverClassName
      # mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver
      # mysql8以下版本的driver:com.mysql.jdbc.Driver
      driverClassName: com.mysql.jdbc.Driver
      url: jdbc:mysql://外部数据库地址:3306/seata_db?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
      user: 数据库账号
      password: 数据库密码
      
  #  server:
  #    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login

外部数据库这里不演示,可以通过docker-compose跑起来,放在同一个文件里,数据库的脚本在github上(脚本:https://github.com/seata/seata/tree/develop/script/server/db

最后再次启动docker-compose,调整脚本为:

version: "3.1"
services:
  seata-server:
    image: seataio/seata-server:latest
    ports:
      - "7091:7091"
      - "8091:8091"
    environment:
      - STORE_MODE=db
      # 以SEATA_IP作为host注册seata server
      - SEATA_IP=seata_ip
      - SEATA_PORT=8091
    volumes:
      - "/usr/share/zoneinfo/Asia/Shanghai:/etc/localtime"        #设置系统时区
      - "/usr/share/zoneinfo/Asia/Shanghai:/etc/timezone"  #设置时区
      # 假设我们通过docker cp命令把资源文件拷贝到相对路径`./seata-server/resources`中
      # 如有问题,请阅读上面的[注意事项]以及[使用自定义配置文件]
      - "./seata-server/resources:/seata-server/resources"

正常启动没有报错即可

(2)构建三个项目:seata-account-service(账户模块)、seata-account-service(订单模块)、seata-storange-service(仓储模块)

采用nacos + seata + openFeign + mybatis + spring web构建项目

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>seata-all</artifactId>
                    <groupId>io.seata</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>${seata.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter -->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>${seata.version}</version>
        </dependency>
        <!--feign 此处需要把openfegin的核心包排除,引用2.2.x版本,官方似乎没有维护这个的更新,我看到有几个issue有提到这个问题并没有close,2022.09.13-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-openfeign-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-openfeign-core</artifactId>
            <version>2.2.10.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring-boot-starter.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql-connector-java.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid-spring-boot-starter.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

用到的模块版本:

 <seata.version>1.5.2</seata.version>
        <mysql-connector-java.version>8.0.30</mysql-connector-java.version>
        <mybatis-spring-boot-starter.version>2.2.2</mybatis-spring-boot-starter.version>
        <druid-spring-boot-starter.version>1.2.11</druid-spring-boot-starter.version>

在启动文件上增加注解

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
//新版seata采用自动装配的方式动态代理数据库,里面还有其他参数,配置事务模式(默认AT)等,
@EnableAutoDataSourceProxy
@MapperScan("dao文件package")

配置文件application.yml

server:
    port: 对应端口 #8180|8181|8182
spring:
    application:
        name: 对应服务名 # seata-account-service|seata-order-service|seata-storage-service
    cloud:
        nacos:
            discovery:
                server-addr: 127.0.0.1:8848
    datasource:
        druid:
            driver-class-name: com.mysql.cj.jdbc.Driver
            password: 数据库密码
            #修改对应数据库 此处是三个数据库 seata_account | seata_order | seata_storage
            url: jdbc:mysql://数据库地址:3306/数据库?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
            username: 数据库账号

logging:
    level:
        io:
            seata: info
seata:
    tx-service-group: my-tx-group
    service:
        # 事务组对应的集群名称
        vgroup-mapping:
            my-tx-group: seata-server
        # seata-server的地址
        grouplist:
            seata-server: 127.0.0.1:8091
mybatis:
    mapperLocations: classpath:mapper/*.xml

一切准备就绪,准备三个表,分别在不同的数据库,需要准备entity、xml、dao和service

account模块:

@Data
public class Account {

private Long id;

/**
* 用户id
*/
private Long userId;

/**
* 总额度
*/
private BigDecimal total;

/**
* 已用额度
*/
private BigDecimal used;

/**
* 剩余额度
*/
private BigDecimal residue;
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.jay.cloud.dao.AccountDAO">
    <resultMap id="BaseResultMap" type="com.jay.cloud.entity.Account">
        <id column="id" property="id" jdbcType="BIGINT"/>
        <result column="user_id" property="userId" jdbcType="BIGINT"/>
        <result column="total" property="total" jdbcType="DECIMAL"/>
        <result column="used" property="used" jdbcType="DECIMAL"/>
        <result column="residue" property="residue" jdbcType="DECIMAL"/>
    </resultMap>
    <update id="decrease">
        UPDATE account
        SET residue = residue - #{money},
            used    = used + #{money}
        WHERE user_id = #{userId};
    </update>
</mapper>
@Repository
public interface AccountDAO {

    /**
     * 扣减账户余额
     */
    void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
public interface AccountService {
    /**
     * 扣减账户余额
     * @param userId 用户id
     * @param money 金额
     */
    void decrease(Long userId, BigDecimal money);
}
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {

    @Autowired
    private AccountDAO accountDAO;

    /**
     * 扣减账户余额
     */
    @Override
    public void decrease(Long userId, BigDecimal money) {
        log.info("------->account-service中扣减账户余额开始");
        accountDAO.decrease(userId,money);
        log.info("------->account-service中扣减账户余额结束");
    }
}
@RestController
@RequestMapping("/account")
public class AccountController {

    @Autowired
    private AccountService accountService;

    /**
     * 扣减账户余额
     */
    @RequestMapping("/decrease")
    public String decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
        accountService.decrease(userId, money);
        return "扣减账户余额成功!";
    }
}

storage模块:

@Data
public class Storage {

private Long id;

/**
* 产品id
*/
private Long productId;

/**
* 总库存
*/
private Integer total;

/**
* 已用库存
*/
private Integer used;

/**
* 剩余库存
*/
private Integer residue;
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.jay.cloud.dao.StorageDAO">
    <resultMap id="BaseResultMap" type="com.jay.cloud.entity.Storage">
        <id column="id" property="id" jdbcType="BIGINT"/>
        <result column="product_id" property="productId" jdbcType="BIGINT"/>
        <result column="total" property="total" jdbcType="INTEGER"/>
        <result column="used" property="used" jdbcType="INTEGER"/>
        <result column="residue" property="residue" jdbcType="INTEGER"/>
    </resultMap>
    <update id="decrease">
        UPDATE storage
        SET used    = used + #{count},
            residue = residue - #{count}
        WHERE product_id = #{productId}
    </update>
</mapper>
@Repository
public interface StorageDAO {

    /**
     * 扣减库存
     */
    void decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
public interface StorageService {

    /**
     * 扣减库存
     */
    void decrease(Long productId, Integer count);

}
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {

    @Autowired
    private StorageDAO storageDAO;

    /**
     * 扣减库存
     */
    @Override
    public void decrease(Long productId, Integer count) {
        log.info("------->storage-service中扣减库存开始");
        storageDAO.decrease(productId, count);
        log.info("------->storage-service中扣减库存结束");
    }
}
@RestController
@RequestMapping("/storage")
public class StorageController {

    @Autowired
    private StorageService storageService;

    /**
     * 扣减库存
     */
    @RequestMapping("/decrease")
    public String decrease(Long productId, Integer count) {
        storageService.decrease(productId, count);
        return "扣减库存成功!";
    }
}

order模块:

@Data
public class Order {

    private Long id;

    private Long userId;

    private Long productId;

    private Integer count;

    private BigDecimal money;

    private Integer status;

}
@Repository
public interface OrderDAO {

    /**
     * 创建订单
     */
    void create(Order order);

    /**
     * 修改订单金额
     */
    void update(@Param("userId") Long userId, @Param("status") Integer status);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.jay.cloud.dao.OrderDAO">
    <resultMap id="BaseResultMap" type="com.jay.cloud.entity.Order">
        <id column="id" property="id" jdbcType="BIGINT"/>
        <result column="user_id" property="userId" jdbcType="BIGINT"/>
        <result column="product_id" property="productId" jdbcType="BIGINT"/>
        <result column="count" property="count" jdbcType="INTEGER"/>
        <result column="money" property="money" jdbcType="DECIMAL"/>
        <result column="status" property="status" jdbcType="INTEGER"/>
    </resultMap>
    <insert id="create">
        INSERT INTO `order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)
        VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0);
    </insert>

    <update id="update">
        UPDATE `order`
        SET status = 1
        WHERE user_id = #{userId}
          AND status = #{status};
    </update>
</mapper>
public interface OrderService {


    /**
     * 创建订单
     */
    void create(Order order);

}
@RestController
@RequestMapping(value = "/order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    /**
     * 创建订单
     */
    @GetMapping("/create")
    public String create(Order order) {
        orderService.create(order);
        return "订单创建成功!";
    }
}

此处,我们通过下单接口触发分布式事务操作,所以需要在order模块添加其他模块的fegin调用,如果其他模块需要调用也是同理

@FeignClient(value = "seata-storage-service")
public interface StorageService {

    /**
     * 扣减库存
     */
    @GetMapping(value = "/storage/decrease")
    String decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
@FeignClient(value = "seata-account-service")
public interface AccountService {

    /**
     * 扣减账户余额
     */
    @RequestMapping("/account/decrease")
    String decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}

在order模块的serviceImpl具体实现整个流程

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderDAO       orderDAO;

    @Autowired
    private StorageService storageService;

    @Autowired
    private AccountService accountService;

    /**
     * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
     */
    @Override
    @GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
    public void create(Order order) {
        log.info("------->下单开始");
        //本应用创建订单
        orderDAO.create(order);

        //远程调用库存服务扣减库存
        log.info("------->order-service中扣减库存开始");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("------->order-service中扣减库存结束");

        //远程调用账户服务扣减余额
        log.info("------->order-service中扣减余额开始");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("------->order-service中扣减余额结束");

        //修改订单状态为已完成
        log.info("------->order-service中修改订单状态开始");
        orderDAO.update(order.getUserId(), 0);
        log.info("------->order-service中修改订单状态结束");

        log.info("------->下单结束");
    }
}

启动项目,查看nacos,服务成功注册

调用创建订单接口

http://localhost:8180/order/create?userId=1&productId=1&count=10&money=100

查看三个项目的日志,tc返回了全局事务xid=seata_ip:8091:18320019454197912并传递到account和storage两个服务中,两个服务都会注册一个分支事务,本地服务的角色是RM,完成以后通知TC,并报告给TM,所有分支都完成以后TM决策整个事务,成功的话通知TC完成事务,否则通知TC回滚

我们看一下数据情况验证我们事务成功提交

到此完成正常流程事务提交

Leave a Reply

Your email address will not be published. Required fields are marked *