注册

Redis - 全局ID生成器 RedisIdWorker

概述



  1. 定义:一种分布式系统下用来生成全局唯一 ID 的工具
  2. 特点

    1. 唯一性,满足优惠券需要唯一的 ID 标识用于核销
    2. 高可用,随时能够生成正确的 ID
    3. 高性能,生成 ID 的速度很快
    4. 递增性,生成的 ID 是逐渐变大的,有利于数据库形成索引
    5. 安全性,生成的 ID 无明显规律,可以避免间接泄露信息
    6. 生成量大,可满足优惠券订单数据量大的需求


  3. ID 组成部分

    1. 符号位:1bit,永远为0
    2. 时间戳:31bit,以秒为单位,可以使用69年
    3. 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID



image.png


代码实现



  1. 目标:手动实现一个简单的全局 ID 生成器
  2. 实现流程

    1. 创建生成器:在 utils 包下创建 RedisIdWorker 类,作为 ID 生成器
    2. 创建时间戳:创建一个时间戳,即 RedisId 的高32位
    3. 获取当前日期:创建当前日期对象 date,用于自增 id 的生成
    4. count:设置 Id 格式,保证 Id 严格自增长
    5. 拼接 Id 并将其返回


  3. 代码实现
    @Component
    public class RedisIdWorker {

    // 开始时间戳
    private static final long BEGIN_TIMESTAMP = 1640995200L;

    // 序列号的位数
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
    this.stringRedisTemplate = stringRedisTemplate;
    }

    // 获取下一个自动生成的 id
    public long nextId(String keyPrefix){
    // 1.生成时间戳
    LocalDateTime now = LocalDateTime.now();
    long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
    long timestamp = nowSecond - BEGIN_TIMESTAMP;

    // 3.获取当前日期
    String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    // 4.获取自增长值:生成一个递增计数值。每次调用 increment 方法时,它会在这个key之前的自增值的基础上+1(第一次为0)
    long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
    // 5.拼接并返回
    return timestamp << COUNT_BITS | count;
    }
    }



测试


一、CountDownLatch 工具类



  1. 定义:一个同步工具类,用于协调多个线程的等待与唤醒
  2. 功能

    1. 控制多个线程的执行顺序和同步
    2. 确保主线程在所有子线程完成后才继续执行
    3. 防止主线程过早结束导致子线程执行状态丢失


  3. 常用方法

    1. await:用于主线程的阻塞方法,使其阻塞等待直到计数器归零
    2. countDown:用于子线程的计数方法,使计数器递减



二、ExecutorService & Executors



  1. 定义:Java 提供的线程池管理接口
  2. 功能

    1. 简化异步任务的执行管理
    2. 提供有关 “线程池” 和 “任务执行” 的标准 API


  3. 常用方法
    方法说明
    Executors.newFixedThreadPool(xxxThreads)Executors 提供的工厂方法,用于创建 ExecutorService 实例
    execute(functionName)调用线程执行 functionName 任务,无返回值
    ⭐ submit(functionName)调用线程执行 functionName 任务,返回一个 Future 类
    invokeAny(functionName)调用线程执行一组 functionName 任务,返回首成功执行的任务的结果
    invokeAll(functionName)调用线程执行一组 functionName 任务,返回所有任务执行的结果
    ⭐ shutdown()停止接受新任务,并在所有正在运行的线程完成当前工作后关闭
    ⭐ awaitTermination()停止接受新任务,在指定时间内等待所有任务完成


  4. 参考资料:一文秒懂 Java ExecutorService
  5. 代码实现

    1. 目标:测试 redisIdWorker 在高并发场景下的表现(共生成 30000 个 id)

    private ExecutorService es = Executors.newFixedThreadPool(500);     // 创建一个含有 500 个线程的线程池

    @Test
    void testIdWorker() throws InterruptedException
    {

    CountDownLatch latch = new CountDownLatch(300); // 定义一个工具类,统计线程执行300次task的进度

    // 创建函数,供线程执行
    Runnable task = () -> {
    for(int i = 0; i < 100; i ++) {
    long id = redisIdWorker.nextId("order");
    System.out.println("id = " + id);
    }
    latch.countDown();
    }

    long begin = System.currentTimeMillis();
    for( int i = 0; i < 300 ; i ++) {
    es.submit(task);
    }
    latch.await(); // 主线程等待,直到 CountDownLatch 的计数归
    long end = System.currentTimeMillis();
    System.out.println("time = " + (end - begin)); // 打印任务执行的总耗时
    }





超卖问题



  1. 目标:通过数据库的 SQL 语句直接实现库存扣减(存在超卖问题)

一、乐观锁



  1. 定义:一种并发控制机制,不使用数据库锁,而是在更新时通过版本号或条件判断来确保数据一致性
  2. 优点:并发性能高,不会产生死锁,适合读多写少的场景
  3. 实现方式:CAS (Compare and Swap) - 比较并交换操作
  4. 实现示例 (基于版本号的乐观锁)
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock - 1, version = version + 1")
    .eq("voucher_id", voucherId)
    .eq("version", version)
    .gt("stock", 0)
    .update();


  5. 分布式环境的局限性

    1. **原子性问题:**多个线程同时检查库存并更新时,可能导致超卖。这是因为检查和更新操作不是原子的
    2. **事务隔离:**在默认的"读已提交"隔离级别下,分布式环境中的多个节点可能读取到不一致的数据状态
    3. **分布式一致性:**在分布式环境中,不同的应用服务器可能同时操作数据库,而数据库层本身并不能感知跨服务器的事务一致性



二、悲观锁



  1. 定义:一种并发控制机制,通过添加同步锁强制线程串行执行
  2. 优点:实现简单,可以确保数据一致性
  3. 缺点:由于串行执行导致性能较低,不适合高并发场景
  4. 事务隔离级别:读已提交及以上
  5. 实现方法:使用 SQL 的 forUpdate() 子句,可以在查询时锁定选中的数据行。被锁定的行在当前事务提交或回滚前,其他事务无法对其进行修改或读取

三、事务隔离级别



  1. 定义:数据库事务并发执行时的隔离程度,用于解决并发事务可能带来的问题
  2. 优点:可以防止脏读、不可重复读和幻读等并发问题
  3. 缺点:隔离级别越高,并发性能越低
  4. 实现方法:

    • 读未提交(Read Uncommitted):允许读取未提交的数据
    • 读已提交(Read Committed):只允许读取已提交的数据
    • 可重复读(Repeatable Read):在同一事务中多次读取同样数据的结果是一致的
    • 串行化(Serializable):最高隔离级别,完全串行化执行





一人一单问题


一、单服务器系统解决方案



  1. 需求:每个人只能抢购一张大额优惠券,避免同一用户购买多张优惠券
  2. 重点

    1. 事务:库存扣减操作必须在事务中执行
    2. 粒度:事务粒度必须够小,避免影响性能
    3. 锁:事务开启时必须确保拿到当前下单用户的订单,并依据用户 Id 加锁
    4. 找到事务的代理对象,避免 Spring 事务注解失效 (需要给启动类加 @EnableAspectJAutoProxy(exposeProxy = true) 注解)


  3. 实现逻辑

    1. 获取优惠券 id、当前登录用户 id
    2. 查询数据库的优惠券表(voucher_order)

      1. 如果存在优惠券 id 和当前登录用户 id 都匹配的 order 则拒绝创建订单,返回 fail()
      2. 如果不存在则创建订单 voucherOrder 并保存至 voucher_order 表中,返回 ok()





二、分布式系统解决方案 (通过 Lua 脚本保证原子性)


一、优惠券下单逻辑


image.png


二、代码实现 (Lua脚本)


--1. 参数列表
--1.1. 优惠券id
local voucherId = ARGV[1]
--1.2. 用户id
local userId = ARGV[2]
--1.3. 订单id
local orderId = ARGV[3]

--2. 数据key
--2.1. 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2. 订单key
local orderKey = 'seckill:order' .. voucherId

--3. 脚本业务
--3.1. 判断库存是否充足 get stockKey
if( tonumber( redis.call('get', stockKey) ) <= 0 ) then
return 1
end
--3.2. 判断用户是否下单 SISMEMBER orderKey userId
if( redis.call( 'sismember', orderKey, userId ) == 1 ) then
return 2
end
--3.4 扣库存: stockKey 的库存 -1
redis.call( 'incrby', stockKey, -1 )
--3.5 下单(保存用户): orderKey 集合中添加 userId
redis.call( 'sadd', orderKey, userId )
-- 3.6. 发送消息到队列中
redis.call( 'xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )

三、加载 Lua 脚本



  1. RedisScript 接口:用于绑定一个具体的 Lua 脚本
  2. DefaultRedisScript 实现类

    1. 定义:RedisScript 接口的实现类
    2. 功能:提前加载 Lua 脚本
    3. 示例
      // 创建Lua脚本对象
      private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

      // Lua脚本初始化 (通过静态代码块)
      static {
      SECKILL_SCRIPT = new DefaultRedisScript<>();
      SECKILL_SCRIPT.setLocation(new ClassPathResource("/path/to/lua_script.lua"));
      SECKILL_SCRIPT.setResultType(Long.class);
      }





四、执行 Lua 脚本



  1. 调用Lua脚本 API :StringRedisTemplate.execute( RedisScript script, List keys, Object… args )
  2. 示例

    1. 执行 ”下单脚本” (此时不需要 key,因为下单时只需要用 userId 和 voucherId 查询是否有锁)
      Long result = stringRedisTemplate.execute(
      SECKILL_SCRIPT, // 要执行的脚本
      Collections.emptyList(), // KEY
      voucherId.toString(), userId.toString(), String.valueOf(orderId) // VALUES
      );


    2. 执行 “unlock脚本”





实战:添加优惠券 & 单服务器创建订单


添加优惠券



  1. 目标:商家在主页上添加一个优惠券的抢购链接,可以点击抢购按钮抢购优惠券

一、普通优惠券



  1. 定义:日常可获取的资源
  2. 代码实现
    @PostMapping
    public Result addVoucher(@RequestBody Voucher voucher) {
    voucherService.save(voucher);
    return Result.ok(voucher.getId());
    }



二、限量优惠券



  1. 定义:限制数量,需要设置时间限制、面对高并发请求的资源
  2. 下单流程

    1. 查询优惠券:通过 voucherId 查询优惠券
    2. 时间判断:判断是否在抢购优惠券的固定时间范围内
    3. 库存判断:判断优惠券库存是否 ≥ 1
    4. 扣减库存
    5. 创建订单:创建订单 VoucherOrder 对象,指定订单号,指定全局唯一 voucherId,指定用户 id
    6. 保存订单:保存订单到数据库
    7. 返回结果:Result.ok(orderId)


  3. 代码实现

    1. VoucherController
      @PostMapping("seckill")
      public Result addSeckillVoucher( @RequestBody Voucher voucher ){
      voucherService.addSeckillVoucher(voucher);
      return Result.o(voucher.getId());
      }


    2. VoucherServiceImpl
      @Override
      @Transactional
      public void addSeckillVoucher(Voucher voucher) {
      // 保存优惠券到数据库
      save(voucher);
      // 保存优惠券信息
      SeckillVoucher seckillVoucher = new SeckillVoucher();
      seckillVoucher.setVoucherId(voucher.getId());
      seckillVoucher.setStock(voucher.getStock());
      seckillVoucher.setBeginTime(voucher.getBeginTime());
      seckillVoucher.setEndTime(voucher.getEndTime());
      seckillVoucherService.save(seckillVoucher);
      // 保存优惠券到Redis中
      stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
      }





(缺陷) 优惠券下单功能


一、功能说明



  1. 目标:用户抢购代金券,保证用户成功获得优惠券,保证效率并且避免超卖
  2. 工作流程

    1. 提交优惠券 ID
    2. 查询优惠券信息 (下单时间是否合法,下单时库存是否充足)
    3. 扣减库存,创建订单
    4. 返回订单 ID



四、代码实现



  • VoucherOrderServiceImpl (下述代码在分布式环境下仍然存在超卖问题)
    @Service
    public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService{

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Override
    public Result seckillVoucher(Long voucherId) {

    // 查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

    // 优惠券抢购时间判断
    if(voucher.getBeginTime().isAfter(LocalDateTime.now) || voucher.getEndTime().isBefore(LocalDateTime.now()){
    return Result.fail("当前不在抢购时间!");
    }

    // 库存判断
    if(voucher.getStock() < 1){
    return Result.fail("库存不足!");
    }

    // !!! 实现一人一单功能 !!!
    Long userId = UserHolder.getUser().getId();
    synchronized (userId.toString().intern()) {
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
    }
    }

    @Transactional
    public Result createVoucherOrder(Long userId) {
    Long userId = UserHolder.getUser().getId();

    // 查询当前用户是否已经购买过优惠券
    int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
    if( count > 0 ) {
    return Result.fail("当前用户不可重复购买!");

    // !!! 实现乐观锁 !!!
    // 扣减库存
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock - 1") // set stock = stock - 1;
    .eq("voucher_id", voucherId).gt("stock", 0) // where voucher_id = voucherId and stock > 0;
    .update();
    if(!success) {
    return Result.fail("库存不足!");
    }

    // 创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setId(redisIdWorker.nextId("order"));
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setVoucherId(voucherId);
    save(voucherOrder);

    // 返回订单id
    return Result.ok(orderId);
    }



作者:LoopLee
来源:juejin.cn/post/7448119568567189530

0 个评论

要回复文章请先登录注册