Redis - 全局ID生成器 RedisIdWorker
概述
定义
:一种分布式系统下用来生成全局唯一 ID 的工具特点
- 唯一性,满足优惠券需要唯一的 ID 标识用于核销
- 高可用,随时能够生成正确的 ID
- 高性能,生成 ID 的速度很快
- 递增性,生成的 ID 是逐渐变大的,有利于数据库形成索引
- 安全性,生成的 ID 无明显规律,可以避免间接泄露信息
- 生成量大,可满足优惠券订单数据量大的需求
ID 组成部分
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
代码实现
目标
:手动实现一个简单的全局 ID 生成器实现流程
- 创建生成器:在 utils 包下创建 RedisIdWorker 类,作为 ID 生成器
- 创建时间戳:创建一个时间戳,即 RedisId 的高32位
- 获取当前日期:创建当前日期对象 date,用于自增 id 的生成
- count:设置 Id 格式,保证 Id 严格自增长
- 拼接 Id 并将其返回
代码实现
@Component
public class RedisIdWorker {
// 开始时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;
// 序列号的位数
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
// 获取下一个自动生成的 id
public long nextId(String keyPrefix){
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 3.获取当前日期
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 4.获取自增长值:生成一个递增计数值。每次调用 increment 方法时,它会在这个key之前的自增值的基础上+1(第一次为0)
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 5.拼接并返回
return timestamp << COUNT_BITS | count;
}
}
测试
一、CountDownLatch 工具类
定义
:一个同步工具类,用于协调多个线程的等待与唤醒功能
:- 控制多个线程的执行顺序和同步
- 确保主线程在所有子线程完成后才继续执行
- 防止主线程过早结束导致子线程执行状态丢失
常用方法
:- await:用于主线程的阻塞方法,使其阻塞等待直到计数器归零
- countDown:用于子线程的计数方法,使计数器递减
二、ExecutorService & Executors
定义
:Java 提供的线程池管理接口功能
:- 简化异步任务的执行管理
- 提供有关 “线程池” 和 “任务执行” 的标准 API
常用方法
方法 说明 Executors.newFixedThreadPool(xxxThreads) Executors 提供的工厂方法,用于创建 ExecutorService 实例 execute(functionName) 调用线程执行 functionName 任务,无返回值 ⭐ submit(functionName) 调用线程执行 functionName 任务,返回一个 Future 类 invokeAny(functionName) 调用线程执行一组 functionName 任务,返回首成功执行的任务的结果 invokeAll(functionName) 调用线程执行一组 functionName 任务,返回所有任务执行的结果 ⭐ shutdown() 停止接受新任务,并在所有正在运行的线程完成当前工作后关闭 ⭐ awaitTermination() 停止接受新任务,在指定时间内等待所有任务完成 - 参考资料:一文秒懂 Java ExecutorService
- 代码实现
- 目标:测试 redisIdWorker 在高并发场景下的表现(共生成 30000 个 id)
private ExecutorService es = Executors.newFixedThreadPool(500); // 创建一个含有 500 个线程的线程池
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300); // 定义一个工具类,统计线程执行300次task的进度
// 创建函数,供线程执行
Runnable task = () -> {
for(int i = 0; i < 100; i ++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
}
long begin = System.currentTimeMillis();
for( int i = 0; i < 300 ; i ++) {
es.submit(task);
}
latch.await(); // 主线程等待,直到 CountDownLatch 的计数归
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin)); // 打印任务执行的总耗时
}
超卖问题
- 目标:通过数据库的 SQL 语句直接实现库存扣减(存在超卖问题)
一、乐观锁
定义
:一种并发控制机制,不使用数据库锁,而是在更新时通过版本号或条件判断来确保数据一致性优点
:并发性能高,不会产生死锁,适合读多写少的场景实现方式
:CAS (Compare and Swap) - 比较并交换操作实现示例
(基于版本号的乐观锁)boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1, version = version + 1")
.eq("voucher_id", voucherId)
.eq("version", version)
.gt("stock", 0)
.update();分布式环境的局限性
- **原子性问题:**多个线程同时检查库存并更新时,可能导致超卖。这是因为检查和更新操作不是原子的
- **事务隔离:**在默认的"读已提交"隔离级别下,分布式环境中的多个节点可能读取到不一致的数据状态
- **分布式一致性:**在分布式环境中,不同的应用服务器可能同时操作数据库,而数据库层本身并不能感知跨服务器的事务一致性
二、悲观锁
定义
:一种并发控制机制,通过添加同步锁强制线程串行执行优点
:实现简单,可以确保数据一致性缺点
:由于串行执行导致性能较低,不适合高并发场景事务隔离级别
:读已提交及以上实现方法
:使用 SQL 的 forUpdate() 子句,可以在查询时锁定选中的数据行。被锁定的行在当前事务提交或回滚前,其他事务无法对其进行修改或读取
三、事务隔离级别
- 定义:数据库事务并发执行时的隔离程度,用于解决并发事务可能带来的问题
- 优点:可以防止脏读、不可重复读和幻读等并发问题
- 缺点:隔离级别越高,并发性能越低
- 实现方法:
- 读未提交(Read Uncommitted):允许读取未提交的数据
- 读已提交(Read Committed):只允许读取已提交的数据
- 可重复读(Repeatable Read):在同一事务中多次读取同样数据的结果是一致的
- 串行化(Serializable):最高隔离级别,完全串行化执行
一人一单问题
一、单服务器系统解决方案
需求
:每个人只能抢购一张大额优惠券,避免同一用户购买多张优惠券重点
- 事务:库存扣减操作必须在事务中执行
- 粒度:事务粒度必须够小,避免影响性能
- 锁:事务开启时必须确保拿到当前下单用户的订单,并依据用户 Id 加锁
- 找到事务的代理对象,避免 Spring 事务注解失效 (需要给启动类加
@EnableAspectJAutoProxy(exposeProxy = true)
注解)
实现逻辑
- 获取优惠券 id、当前登录用户 id
- 查询数据库的优惠券表(voucher_order)
- 如果存在优惠券 id 和当前登录用户 id 都匹配的 order 则拒绝创建订单,返回 fail()
- 如果不存在则创建订单 voucherOrder 并保存至 voucher_order 表中,返回 ok()
二、分布式系统解决方案 (通过 Lua 脚本保证原子性)
一、优惠券下单逻辑
二、代码实现 (Lua脚本)
--1. 参数列表
--1.1. 优惠券id
local voucherId = ARGV[1]
--1.2. 用户id
local userId = ARGV[2]
--1.3. 订单id
local orderId = ARGV[3]
--2. 数据key
--2.1. 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2. 订单key
local orderKey = 'seckill:order' .. voucherId
--3. 脚本业务
--3.1. 判断库存是否充足 get stockKey
if( tonumber( redis.call('get', stockKey) ) <= 0 ) then
return 1
end
--3.2. 判断用户是否下单 SISMEMBER orderKey userId
if( redis.call( 'sismember', orderKey, userId ) == 1 ) then
return 2
end
--3.4 扣库存: stockKey 的库存 -1
redis.call( 'incrby', stockKey, -1 )
--3.5 下单(保存用户): orderKey 集合中添加 userId
redis.call( 'sadd', orderKey, userId )
-- 3.6. 发送消息到队列中
redis.call( 'xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
三、加载 Lua 脚本
RedisScript 接口
:用于绑定一个具体的 Lua 脚本DefaultRedisScript 实现类
- 定义:RedisScript 接口的实现类
- 功能:提前加载 Lua 脚本
- 示例
// 创建Lua脚本对象
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
// Lua脚本初始化 (通过静态代码块)
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("/path/to/lua_script.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
四、执行 Lua 脚本
调用Lua脚本 API
:StringRedisTemplate.execute( RedisScript script, List keys, Object… args )示例
- 执行 ”下单脚本” (此时不需要 key,因为下单时只需要用 userId 和 voucherId 查询是否有锁)
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT, // 要执行的脚本
Collections.emptyList(), // KEY
voucherId.toString(), userId.toString(), String.valueOf(orderId) // VALUES
); - 执行 “unlock脚本”
实战:添加优惠券 & 单服务器创建订单
添加优惠券
目标
:商家在主页上添加一个优惠券的抢购链接,可以点击抢购按钮抢购优惠券
一、普通优惠券
定义
:日常可获取的资源代码实现
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}
二、限量优惠券
定义
:限制数量,需要设置时间限制、面对高并发请求的资源下单流程
- 查询优惠券:通过 voucherId 查询优惠券
- 时间判断:判断是否在抢购优惠券的固定时间范围内
- 库存判断:判断优惠券库存是否 ≥ 1
- 扣减库存
- 创建订单:创建订单 VoucherOrder 对象,指定订单号,指定全局唯一 voucherId,指定用户 id
- 保存订单:保存订单到数据库
- 返回结果:Result.ok(orderId)
代码实现
- VoucherController
@PostMapping("seckill")
public Result addSeckillVoucher( @RequestBody Voucher voucher ){
voucherService.addSeckillVoucher(voucher);
return Result.o(voucher.getId());
} - VoucherServiceImpl
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券到数据库
save(voucher);
// 保存优惠券信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存优惠券到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
(缺陷) 优惠券下单功能
一、功能说明
目标
:用户抢购代金券,保证用户成功获得优惠券,保证效率并且避免超卖工作流程
- 提交优惠券 ID
- 查询优惠券信息 (下单时间是否合法,下单时库存是否充足)
- 扣减库存,创建订单
- 返回订单 ID
四、代码实现
- VoucherOrderServiceImpl (下述代码在分布式环境下仍然存在超卖问题)
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService{
@Resource
private ISeckillVoucherService seckillVoucherService;
@Override
public Result seckillVoucher(Long voucherId) {
// 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 优惠券抢购时间判断
if(voucher.getBeginTime().isAfter(LocalDateTime.now) || voucher.getEndTime().isBefore(LocalDateTime.now()){
return Result.fail("当前不在抢购时间!");
}
// 库存判断
if(voucher.getStock() < 1){
return Result.fail("库存不足!");
}
// !!! 实现一人一单功能 !!!
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long userId) {
Long userId = UserHolder.getUser().getId();
// 查询当前用户是否已经购买过优惠券
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if( count > 0 ) {
return Result.fail("当前用户不可重复购买!");
// !!! 实现乐观锁 !!!
// 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1;
.eq("voucher_id", voucherId).gt("stock", 0) // where voucher_id = voucherId and stock > 0;
.update();
if(!success) {
return Result.fail("库存不足!");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(redisIdWorker.nextId("order"));
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 返回订单id
return Result.ok(orderId);
}
作者:LoopLee
来源:juejin.cn/post/7448119568567189530
来源:juejin.cn/post/7448119568567189530