购物秒杀及分布式锁
全局唯一ID
Redis自增ID策略
ID构造是:时间戳 + 计数器
每天一个key,方便统计订单量
业务实现
获取指定时间的秒数
LocalDateTime timeBegin = LocalDateTime.of(2024, 1, 1, 0, 0, 0);
long second = timeBegin.toEpochSecond(ZoneOffset.UTC);
获取当前时间的秒数
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
全局唯一ID业务代码
public Long getID(String key) {
// 获取时间戳
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
long timestamp = now - TIMESTAMP_BEGIN;
// 利用redis实现自增长
String date = new SimpleDateFormat("yyyyMMdd").format(new Date());
Long inc = redisTemplate.opsForValue().increment("ID:" + key + ":" + date);
// 拼接并返回
return timestamp << 32 | inc;
}
实现秒杀下单
秒杀下单逻辑流程
业务实现
graph LR
begin(begin)
a{判断库存}
b{判断存在}
e(end)
begin --> 提交商品id --> b -->|存在| a
b -->|不存在| 返回异常
a -->|充足| 扣减库存 --> 创建订单 --> 返回订单id --> e
a -->|不足| 返回异常 --> e
@Transactional
public String getProduct(Long id) {
// 判断商品是否存在
Product pro = getById(id);
if (pro == null) {
throw new BusinessException(400, "商品不存在");
}
// 判断库存
int num = pro.getNum();
if (num <= 0) {
throw new BusinessException(400, "库存不足");
}
// 库存 - 1
pro.setNum(num - 1);
update(pro, new QueryWrapper<>());
// 下订单
Ordertable order = new Ordertable();
order.setOrderID(idUtil.getID("order").toString());
order.setUserID(123);
ordertableService.save(order);
return order.getOrderID();
}
“超卖”问题
问题复现
在多线程并发会产生问题
使用5000个线程进行测试
20个库存卖了210个订单
原因分析
锁的类型
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁
悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
- 例如Synchronized、Lock都属于悲观锁
乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
- 如果没有修改则认为是安全的,自己才更新数据。
- 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁
乐观锁的关键是判断之前查询得到的数据是否有被修改过。(CAS法)
代码实现
graph LR
begin(begin)
a{判断库存}
b{判断存在}
c{"⭐️判断库存<br/>是否被修改"}
e(end)
begin --> 提交商品id --> b -->|存在| a
a -->|充足| c -->|没被修改| 扣减库存 --> 创建订单 --> 返回订单id --> e
b -->|不存在| 返回异常
a -->|不足| 返回异常
c -->|被修改| 返回异常
返回异常 --> e
@Transactional
public String getProduct(Long id) {
// 判断商品是否存在
Product pro = getById(id);
if (pro == null) {
throw new BusinessException(400, "商品不存在");
}
// 判断库存
int num = pro.getNum();
if (num <= 0) {
throw new BusinessException(400, "库存不足");
}
// 下订单
Ordertable order = new Ordertable();
order.setOrderID(idUtil.getID("order").toString());
order.setUserID(111);
// 库存 - 1
pro.setNum(num - 1);
boolean success = update(pro, new QueryWrapper<Product>()
.eq("num", num));
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(order);
return order.getOrderID();
}
经过多线程测试,成功解决超卖问题。
“一人一单”问题
问题分析
新要求:一个用户只能下一单
解决并发问题:使用 synchronized 锁机制。
业务实现
graph LR
begin(begin)
a{判断库存}
b{判断存在}
c{"判断库存<br/>是否被修改"}
d{"⭐️判断是否<br/>为重复订单"}
e(end)
begin --> 提交商品id --> b -->|存在| a
a -->|充足| c -->|没被修改| 扣减库存 --> d -->|"非重复订单"| 创建订单 --> 返回订单id --> e
d -->|重复订单| 返回异常
b -->|不存在| 返回异常
a -->|不足| 返回异常
c -->|被修改| 返回异常
返回异常 --> e
@Transactional
public String getProductOnlyOne(Long id) {
// 判断商品是否存在
Product pro = getById(id);
if (pro == null) {
throw new BusinessException(400, "商品不存在");
}
// 判断库存
int num = pro.getNum();
if (num <= 0) {
throw new BusinessException(400, "库存不足");
}
synchronized (UserHolder.getUser().getPhone()) {
// ⭐️ 判断是否有重复订单
long cnt = ordertableService.count(new QueryWrapper<Ordertable>().eq("userID", UserHolder.getUser().getPhone()).eq("productID", id));
if (cnt != 0) {
// ⭐️ 存在重复订单
throw new BusinessException(400, "不允许重复下单");
}
// 下单
Ordertable newOrder = new Ordertable();
newOrder.setOrderID(idUtil.getID("order").toString());
newOrder.setUserID(UserHolder.getUser().getPhone());
newOrder.setProductID(Math.toIntExact(id));
// 库存 - 1
pro.setNum(num - 1);
boolean success = update(pro, new QueryWrapper<Product>().eq("num", num));
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(newOrder);
return newOrder.getOrderID();
}
}
分布式锁
单机锁的问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
每个锁监视器只对当前JVM有效,集群模式依旧会产生并发安全问题。
什么是分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
使用一个独立于JVM的锁监视器(分布式锁),让所有集群的JVM共享一个锁监视器,使得只有一个线程运行。
分布式锁的实现方式
分布式锁的核心是实现多进程之间互斥操作。
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
Redis分布式锁(1.0)
使用setnx互斥命令
# 添加锁,NX是互斥、EX是设置超时时间
SET lock thread1 NX EX 10
# 释放锁
DEL key
Redis实现分布式锁流程图
业务实现:
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final UUID uuid;
static {
uuid = UUID.randomUUID(true);
}
// 获取锁
public boolean trylock(String key, Thread thread) {
String value = uuid.toString() + thread.getId();
Boolean success = redisTemplate
.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
// 删除锁
public void unlock(String key) {
redisTemplate.delete(key);
}
}
@Transactional
@Override
public String getProductOnlyOneByRedis(Long id) {
// 判断商品是否存在
Product pro = getById(id);
if (pro == null) {
throw new BusinessException(400, "商品不存在");
}
// 判断库存
int num = pro.getNum();
if (num <= 0) {
throw new BusinessException(400, "库存不足");
}
String lock_key = REDIS_LOCK_PRE + UserHolder.getUser().getPhone();
try {
// 获取分布式锁(给用户上锁)
boolean lock = redisLock.trylock(lock_key, Thread.currentThread());
if (!lock) {
throw new BusinessException(400, "下单失败");
}
// 判断是否有重复订单
long cnt = ordertableService.count(
new QueryWrapper<Ordertable>()
.eq("userID", UserHolder.getUser().getPhone())
.eq("productID", id));
if (cnt != 0) {
// 存在重复订单
throw new BusinessException(400, "不允许重复下单");
}
// 下单
Ordertable newOrder = new Ordertable();
newOrder.setOrderID(idUtil.getID("order").toString());
newOrder.setUserID(UserHolder.getUser().getPhone());
newOrder.setProductID(Math.toIntExact(id));
// 库存 - 1
pro.setNum(num - 1);
boolean success = update(pro, new QueryWrapper<Product>().eq("num", num));
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(newOrder);
return newOrder.getOrderID();
} finally {
// 释放锁
redisLock.unlock(lock_key);
}
}
Redis的Lua脚本
使用Lua脚本:在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
-- 执行 set name jack
redis.call('set', 'name', 'jack')
-- 先执行 set name jack
redis.call('set', 'name', 'jack')
-- 再执行 get name
local name = redis.call('get', 'name')
-- 返回
return name
需要用Redis命令来调用脚本:
# 调用脚本 0: 脚本需要的key类型的参数个数
EVAL "return redis.call('set', 'name', 'jack')" 0
脚本中的key、value可以作为参数传递。
key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数。
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose
Redis分布式锁(2.0)
Redis分布式锁(1.0)存在的问题:线程阻塞会导致锁过期,其他线程抢占锁后,之前线程会释放掉不属于自己的锁。
解决办法:释放锁时检查释放是自己的锁。
新问题:检查自己的锁时,锁过期,被其他线程抢占锁,前一个线程再次释放掉其他线程的锁。
新解决办法:释放锁时检查释放是自己的锁(且需要保证原子性)
使用lua脚本操作redis,保证操作的原子性。
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
lua脚本逻辑:
- 获取锁中的线程标示
- 判断是否与指定的标示(当前线程标示)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
使用java调用lua脚本
业务代码:
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final UUID uuid = UUID.randomUUID(true);
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setLocation(new ClassPathResource("lua/unlock.lua"));
UNLOCK_SCRIPT = redisScript;
}
// 获取锁
public boolean trylock(String key, Thread thread) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(
key, uuid.toString() + thread.getId(), 10, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
// 删除锁
public void unlock(String key) {
redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(key),
uuid.toString() + Thread.currentThread().getId());
}
}
Redis分布式锁(2.0)存在的问题:
- 不可重入:同一个线程无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回 false,没有重试机制
- 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
- 主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现
⭐️ Redisson
Redisson功能介绍
Redisson是一个分布式操作 Redis 的 Java 客户端(分布式 Redis 数据网格),可以像在使用本地的集合一样操作 Redis。Redisson还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
redisson/redisson: Redisson (github.com)
Redisson使用入门
1、引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.1</version>
</dependency>
2、配置Redisson
@Configuration
@ConfigurationProperties(prefix = "spring.data.redis")
@Data
public class RedissonConfig {
private String host;
private String port;
private String password;
@Bean
public RedissonClient redissonClient() {
// 1. 创建配置
Config config = new Config();
String redisAddress = String.format("redis://%s:%s", host, port);
// 使用单个Redis
config.useSingleServer()
.setAddress(redisAddress).setPassword(password).setDatabase(0);
// 2. 创建实例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
可重入锁业务实现
解决同一个线程无法多次获取同一把锁的问题。
@Override
public String getProductOnlyOneByRedisson(Long id) {
// 判断商品是否存在
Product pro = getById(id);
if (pro == null) {
throw new BusinessException(400, "商品不存在");
}
// 判断库存
int num = pro.getNum();
if (num <= 0) {
throw new BusinessException(400, "库存不足");
}
String lock_key = REDIS_LOCK_PRE + UserHolder.getUser().getPhone();
RLock lock = redissonClient.getLock(lock_key);
try {
// ⭐️ 使用Redisson自带的可重入锁(给用户上锁)
if (!lock.tryLock()) {
throw new BusinessException(400, "下单失败");
}
// 判断是否有重复订单
long cnt = ordertableService.count(
new QueryWrapper<Ordertable>()
.eq("userID", UserHolder.getUser().getPhone())
.eq("productID", id));
if (cnt != 0) {
// 存在重复订单
throw new BusinessException(400, "不允许重复下单");
}
// 下单
Ordertable newOrder = new Ordertable();
newOrder.setOrderID(idUtil.getID("order").toString());
newOrder.setUserID(UserHolder.getUser().getPhone());
newOrder.setProductID(Math.toIntExact(id));
// 库存 - 1
pro.setNum(num - 1);
boolean success = update(pro, new QueryWrapper<Product>()
.eq("num", num));
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(newOrder);
return newOrder.getOrderID();
} finally {
// ⭐️ 释放锁
lock.unlock();
}
}
可重入锁原理
锁重试和看门狗机制
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 锁过期问题:watchDog看门狗机制,每隔一段时间(releaseTime / 3),重置超时时间
锁重试和看门狗机制流程图
看门狗原理:
- 监听当前线程,默认过期时间是 30 秒,每 10 秒续期一次(补到 30 秒)
- 如果线程挂掉(注意 debug 模式也会被它当成服务器宕机),则不会续期
@Test
void testWatchDog() {
RLock lock = redissonClient.getLock("lock");
try {
// 只有一个线程能获取到锁(超时时间设置为-1)
if (lock.tryLock(0, -1, TimeUnit.MILLISECONDS)) {
// todo 实际要执行的代码
Thread.sleep(300000);
System.out.println("getLock: " + Thread.currentThread().getId());
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
// 只能释放自己的锁
if (lock.isHeldByCurrentThread()) {
System.out.println("unLock: " + Thread.currentThread().getId());
lock.unlock();
}
}
}
分布式锁主从一致性
联锁机制:获取所有的节点都获取到锁才算成功。
业务实现:
Redis优化秒杀
优化秒杀思路
优化流程:使用异步操作提高吞吐量。
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
使用Redis存储秒杀库存数量和“一人一单”的订单信息,使用Lua脚本实现原子操作。
使用Lua脚本实现秒杀
使用lua脚本实现原子性
-- 参数
local productID = ARGV[1]
local userID = ARGV[2]
-- 数据
local productKey = 'product:num:' .. productID
local orderKey = "product:order" .. productID
-- 业务
-- 库存判断
if tonumber(redis.call('get', productKey)) <= 0 then
return 1
end
-- 是否下单过
if redis.call('sismember', orderKey, userID) == 1 then
return 2
end
redis.call('incrby', productKey, -1)
redis.call('sadd', orderKey, userID)
return 0
执行lua脚本完成下单
@Override
public String getProductOnlyOneByRedis2(Long id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setLocation(new ClassPathResource("lua/shop.lua"));
Long success = redisTemplate
.execute(redisScript, Collections.emptyList(), id,
UserHolder.getUser().getPhone());
if (success == null || success != 0) {
throw new BusinessException(400, "下单失败");
}
Long orderID = idUtil.getID("order");
// todo 保存到阻塞队列
return orderID.toString();
}
模拟创建订单
@Test
void addProduct() {
Product pro = new Product();
pro.setProductName("yixuan");
pro.setNum(10);
boolean success = productService.save(pro);
String key = "RedisSessionDemo:productNum:" + pro.getId();
String value = pro.getNum().toString();
if (success) {
stringRedisTemplate.opsForValue().set(key, value);
}
Assertions.assertNotNull(stringRedisTemplate.opsForValue().get(key));
}
基于阻塞队列实现异步下单
private static final DefaultRedisScript<Long> redisScript;
static {
redisScript = new DefaultRedisScript<>();
redisScript.setLocation(new ClassPathResource("lua/shop.lua"));
redisScript.setResultType(Long.class);
}
private BlockingQueue<Ordertable> orderTasks =
new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SHOP_ORDER_EXECUTOR =
Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SHOP_ORDER_EXECUTOR.submit(new OrderHandler());
}
private class OrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 获取队列
Ordertable order = orderTasks.take();
// 库存 - 1
boolean success = update().setSql("num = num - 1")
.eq("id", order.getProductID())
.gt("num", 0)
.update();
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(order);
} catch (Exception e) {
log.error("", e);
}
}
}
}
@Override
public String getProductOnlyOneByRedis2(Long id) {
// 执行lua脚本
String userPhone = UserHolder.getUser().getPhone();
Long success = redisTemplate.execute(
redisScript,Collections.emptyList(), id.toString(), userPhone);
if (success == null || success != 0) {
throw new BusinessException(400, "下单失败");
}
// 创建订单
Long orderID = idUtil.getID("order");
Ordertable order = new Ordertable();
order.setOrderID(orderID.toString());
order.setProductID(Math.toIntExact(id));
order.setUserID(userPhone);
// 保存到阻塞队列
orderTasks.add(order);
return orderID.toString();
}
Redis消息队列
什么是消息队列
消息队列(Message Queue):存放消息的队列。
最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
使用List模拟MQ
Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此应该使用BRPOP或者BLPOP来实现阻塞效果。
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的MQ
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
# 订阅一个或多个频道
SUBSCRIBE channel [channel]
# 向一个频道发送消息
PUBLISH channel msg
# 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern[pattern]
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的MQ
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:XADD
读取消息:XREAD
① 读取第一个消息
② 阻塞方式读取消息
当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
基于Stream消费者组的MQ
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
特点:消息分流、消息标示、消息确认。
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
# 1. key:队列名称
# 2. groupName:消费者组名称
# 3. ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
# 4. MKSTREAM:队列不存在时自动创建队列
其它常见命令:
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
# 1. group:消费组名称
# 2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# 3. count:本次查询的最大数量
# 4. BLOCK milliseconds:当没有消息时最长等待时间
# 5. NOACK:无需手动ACK,获取到消息后自动确认
# 6. STREAMS key:指定队列名称
# 7. ID:获取消息的起始ID:
# 7.1. ">":从下一个未消费的消息开始
# 7.2. 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
消费者监听消息的基本思路
while (true) {
//尝试监听队列,使用阻塞模式,最长等待2000毫秒
Object msg = redis.call(
"XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if (msg == null) {
//null说明设有消息,继续下一次
continue;
}
try {
//处理消息,完成后一定要ACK
handleMessage(msg);
} catch (Exception e) {
while (true) {
Object msg2 = redis.call(
"XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if (msg2 == null) {
//null说明没有异常消息,所有消息都已确以,结束循环
break;
}
try {
//说明有异常消息,再次处理
handleMessage(msg2);
} catch (Exception e) {
//再次出现异常,记录日志,继续循环
continue;
}
}
}
}
STREAM类型消息队列的XREADGROUP
命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
三种MQ实现方式对比
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
使用基于Stream的MQ优化秒杀
修改Lua脚本创建stream消息队列
-- 参数
local productID = ARGV[1]
local userID = ARGV[2]
local orderID = ARGV[3]
-- 数据
local productKey = 'RedisSessionDemo:productNum:' .. productID
local orderKey = "RedisSessionDemo:productOrder:" .. productID
-- 业务
-- 库存判断
if tonumber(redis.call('get', productKey)) <= 0 then
return 1
end
-- 是否下单过
if redis.call('sismember', orderKey, userID) == 1 then
return 2
end
redis.call('incrby', productKey, -1)
redis.call('sadd', orderKey, userID)
-- 下单
redis.call('xadd', 'stream.orders', '*',
'orderID', orderID, 'userID', userID, 'productID', productID)
return 0
执行Lua脚本下单
private static final DefaultRedisScript<Long> StreamScript;
static {
StreamScript = new DefaultRedisScript<>();
StreamScript.setLocation(new ClassPathResource("lua/shop-stream.lua"));
StreamScript.setResultType(Long.class);
}
@Override
public String getProductOnlyOneByStream(Long id) {
String userPhone = UserHolder.getUser().getPhone();
Long orderID = idUtil.getID("order");
Long success = redisTemplate.execute(
StreamScript, Collections.emptyList(), id, userPhone, orderID);
if (success == null || success != 0) {
throw new BusinessException(400, "下单失败");
}
return orderID.toString();
}
使用多线程读取MQ的信息
private static final ExecutorService SHOP_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SHOP_ORDER_EXECUTOR.submit(new StreamHandler());
}
private class StreamHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 获取stream
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
List<MapRecord<String, Object, Object>> list =
redisTemplate.opsForStream().read(Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.order", ReadOffset.lastConsumed())
);
if (list == null || list.isEmpty()) {
// 没有消息
continue;
}
// 解析消息
Map<Object, Object> values = list.get(0).getValue();
Ordertable order = BeanUtil.fillBeanWithMap(
values, new Ordertable(), true);
// 库存 - 1
boolean success = update().setSql("num = num - 1")
.eq("id", order.getProductID())
.gt("num", 0)
.update();
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(order);
// ACK 确认
redisTemplate.opsForStream().acknowledge(
"stream.order","g1", list.get(0).getId());
} catch (Exception e) {
while (true) {
try {
// 读取 pending-list
List<MapRecord<String, Object, Object>> pendingList = redisTemplate.opsForStream()
.read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(
"stream.order", ReadOffset.from("0"))
);
if (pendingList == null) {
// pendingList 为空 -> 结束循环
break;
}
// 解析消息
Map<Object, Object> values =
pendingList.get(0).getValue();
Ordertable order = BeanUtil.fillBeanWithMap(
values, new Ordertable(), true);
// 库存 - 1
boolean success = update().setSql("num = num - 1")
.eq("id", order.getProductID())
.gt("num", 0)
.update();
if (!success) {
throw new BusinessException(400, "下单失败");
}
// 保存订单
ordertableService.save(order);
// ACK 确认
redisTemplate.opsForStream()
.acknowledge("stream.order","g1",
pendingList.get(0).getId());
} catch (Exception excp) {
log.error("", excp);
}
}
}
}
}