Skip to content

购物秒杀及分布式锁

全局唯一ID

Redis自增ID策略

ID构造是:时间戳 + 计数器

每天一个key,方便统计订单量

业务实现

获取指定时间的秒数

java
LocalDateTime timeBegin = LocalDateTime.of(2024, 1, 1, 0, 0, 0);
long second = timeBegin.toEpochSecond(ZoneOffset.UTC);

获取当前时间的秒数

java
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);

全局唯一ID业务代码

java
public Long getID(String key) {
    // 获取时间戳
    long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
    long timestamp = now - TIMESTAMP_BEGIN;

    // 利用redis实现自增长
    String date = new SimpleDateFormat("yyyyMMdd").format(new Date());
    Long inc = redisTemplate.opsForValue().increment("ID:" + key + ":" + date);

    // 拼接并返回
    return timestamp << 32 | inc;
}

实现秒杀下单

秒杀下单逻辑流程

业务实现

mermaid
graph LR
	begin(begin) 
	a{判断库存}
	b{判断存在}
	e(end)
	
	begin --> 提交商品id --> b -->|存在| a
	b -->|不存在| 返回异常
	a -->|充足| 扣减库存 --> 创建订单 --> 返回订单id --> e
	a -->|不足| 返回异常 --> e
java
@Transactional
public String getProduct(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    // 库存 - 1
    pro.setNum(num - 1);
    update(pro, new QueryWrapper<>());
    
    // 下订单
    Ordertable order = new Ordertable();
    order.setOrderID(idUtil.getID("order").toString());
    order.setUserID(123);
    ordertableService.save(order);
    return order.getOrderID();
}

“超卖”问题

问题复现

在多线程并发会产生问题

使用5000个线程进行测试

20个库存卖了210个订单

原因分析

锁的类型

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁

悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。

  • 例如Synchronized、Lock都属于悲观锁

乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。

  • 如果没有修改则认为是安全的,自己才更新数据。
  • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁

乐观锁的关键是判断之前查询得到的数据是否有被修改过。(CAS法)

代码实现

mermaid
graph LR
	begin(begin) 
	a{判断库存}
	b{判断存在}
	c{"⭐️判断库存<br/>是否被修改"}
	e(end)
	
	begin --> 提交商品id --> b -->|存在| a
	a -->|充足| c -->|没被修改| 扣减库存 --> 创建订单 --> 返回订单id --> e
	b -->|不存在| 返回异常
	a -->|不足| 返回异常
    c -->|被修改| 返回异常
    返回异常 --> e
java
@Transactional
public String getProduct(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    // 下订单
    Ordertable order = new Ordertable();
    order.setOrderID(idUtil.getID("order").toString());
    order.setUserID(111);
    // 库存 - 1
    pro.setNum(num - 1);
    boolean success = update(pro, new QueryWrapper<Product>()
                             .eq("num", num));
    if (!success) {
        throw new BusinessException(400, "下单失败");
    }
    // 保存订单
    ordertableService.save(order);
    return order.getOrderID();
}

经过多线程测试,成功解决超卖问题。

“一人一单”问题

问题分析

新要求:一个用户只能下一单

解决并发问题:使用 synchronized 锁机制。

业务实现

mermaid
graph LR
	begin(begin) 
	a{判断库存}
	b{判断存在}
	c{"判断库存<br/>是否被修改"}
	d{"⭐️判断是否<br/>为重复订单"}
	e(end)
	
	begin --> 提交商品id --> b -->|存在| a
	a -->|充足| c -->|没被修改| 扣减库存 --> d -->|"非重复订单"| 创建订单 --> 返回订单id --> e
	d -->|重复订单| 返回异常
	b -->|不存在| 返回异常
	a -->|不足| 返回异常
    c -->|被修改| 返回异常
    返回异常 --> e
java
@Transactional
public String getProductOnlyOne(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    synchronized (UserHolder.getUser().getPhone()) {
        // ⭐️ 判断是否有重复订单
        long cnt = ordertableService.count(new QueryWrapper<Ordertable>().eq("userID", UserHolder.getUser().getPhone()).eq("productID", id));

        if (cnt != 0) {
            // ⭐️ 存在重复订单
            throw new BusinessException(400, "不允许重复下单");
        }
        // 下单
        Ordertable newOrder = new Ordertable();
        newOrder.setOrderID(idUtil.getID("order").toString());
        newOrder.setUserID(UserHolder.getUser().getPhone());
        newOrder.setProductID(Math.toIntExact(id));
        // 库存 - 1
        pro.setNum(num - 1);
        boolean success = update(pro, new QueryWrapper<Product>().eq("num", num));
        if (!success) {
            throw new BusinessException(400, "下单失败");
        }
        // 保存订单
        ordertableService.save(newOrder);
        return newOrder.getOrderID();
    }
}

分布式锁

单机锁的问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

每个锁监视器只对当前JVM有效,集群模式依旧会产生并发安全问题。

什么是分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

使用一个独立于JVM的锁监视器(分布式锁),让所有集群的JVM共享一个锁监视器,使得只有一个线程运行。

分布式锁的实现方式

分布式锁的核心是实现多进程之间互斥操作

MySQLRedisZookeeper
互斥利用mysql本身的互斥锁机制利用setnx互斥命令利用节点的唯一性和有序性实现互斥
高可用
高性能一般一般
安全性断开连接,自动释放锁利用锁超时时间,到期释放临时节点,断开连接自动释放

Redis分布式锁(1.0)

使用setnx互斥命令

sh
# 添加锁,NX是互斥、EX是设置超时时间
SET lock thread1 NX EX 10

# 释放锁
DEL key

Redis实现分布式锁流程图

image-20240120203619920

业务实现:

java
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final UUID uuid;

    static {
        uuid = UUID.randomUUID(true);
    }

    // 获取锁
    public boolean trylock(String key, Thread thread) {
        String value = uuid.toString() + thread.getId();
        Boolean success = redisTemplate
            .opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    // 删除锁
    public void unlock(String key) {
        redisTemplate.delete(key);
    }
}
java
@Transactional
@Override
public String getProductOnlyOneByRedis(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }
    
    String lock_key = REDIS_LOCK_PRE + UserHolder.getUser().getPhone();
    try {
        // 获取分布式锁(给用户上锁)
        boolean lock = redisLock.trylock(lock_key, Thread.currentThread());
        if (!lock) {
            throw new BusinessException(400, "下单失败");
        }
        // 判断是否有重复订单
        long cnt = ordertableService.count(
            new QueryWrapper<Ordertable>()
            .eq("userID", UserHolder.getUser().getPhone())
            .eq("productID", id));

        if (cnt != 0) {
            // 存在重复订单
            throw new BusinessException(400, "不允许重复下单");
        }
        // 下单
        Ordertable newOrder = new Ordertable();
        newOrder.setOrderID(idUtil.getID("order").toString());
        newOrder.setUserID(UserHolder.getUser().getPhone());
        newOrder.setProductID(Math.toIntExact(id));
        // 库存 - 1
        pro.setNum(num - 1);
        boolean success = update(pro, new QueryWrapper<Product>().eq("num", num));
        if (!success) {
            throw new BusinessException(400, "下单失败");
        }
        // 保存订单
        ordertableService.save(newOrder);
        return newOrder.getOrderID();
    } finally {
        // 释放锁
        redisLock.unlock(lock_key);
    }
}

Redis的Lua脚本

使用Lua脚本:在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。

lua
-- 执行 set name jack
redis.call('set', 'name', 'jack')
lua
-- 先执行 set name jack
redis.call('set', 'name', 'jack')
-- 再执行 get name
local name = redis.call('get', 'name')
-- 返回
return name

需要用Redis命令来调用脚本:

shell
# 调用脚本 0: 脚本需要的key类型的参数个数
EVAL "return redis.call('set', 'name', 'jack')"  0

脚本中的key、value可以作为参数传递。

key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数。

sh
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

Redis分布式锁(2.0)

Redis分布式锁(1.0)存在的问题:线程阻塞会导致锁过期,其他线程抢占锁后,之前线程会释放掉不属于自己的锁。

解决办法:释放锁时检查释放是自己的锁。

新问题:检查自己的锁时,锁过期,被其他线程抢占锁,前一个线程再次释放掉其他线程的锁。

新解决办法:释放锁时检查释放是自己的锁(且需要保证原子性)

使用lua脚本操作redis,保证操作的原子性。

基于Redis的分布式锁实现思路:

  1. 利用set nx ex获取锁,并设置过期时间,保存线程标示
  2. 释放锁时先判断线程标示是否与自己一致,一致则删除锁

lua脚本逻辑:

  1. 获取锁中的线程标示
  2. 判断是否与指定的标示(当前线程标示)一致
  3. 如果一致则释放锁(删除)
  4. 如果不一致则什么都不做
lua
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
    -- 一致,则删除锁
    return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

使用java调用lua脚本

业务代码:

java
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final UUID uuid = UUID.randomUUID(true);

    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

    static {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setLocation(new ClassPathResource("lua/unlock.lua"));
        UNLOCK_SCRIPT = redisScript;
    }

    // 获取锁
    public boolean trylock(String key, Thread thread) {
        Boolean success = redisTemplate.opsForValue().setIfAbsent(
            key, uuid.toString() + thread.getId(), 10, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    // 删除锁
    public void unlock(String key) {
        redisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(key),
                uuid.toString() + Thread.currentThread().getId());
    }
}

Redis分布式锁(2.0)存在的问题:

  1. 不可重入:同一个线程无法多次获取同一把锁
  2. 不可重试:获取锁只尝试一次就返回 false,没有重试机制
  3. 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  4. 主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现

⭐️ Redisson

Redisson功能介绍

Redisson是一个分布式操作 Redis 的 Java 客户端(分布式 Redis 数据网格),可以像在使用本地的集合一样操作 Redis。Redisson还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

redisson/redisson: Redisson (github.com)

Redisson 官网

image-20240120223739525

Redisson使用入门

1、引入依赖

xml
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.19.1</version>
</dependency>

2、配置Redisson

java
@Configuration
@ConfigurationProperties(prefix = "spring.data.redis")
@Data
public class RedissonConfig {
    private String host;
    private String port;
    private String password;

    @Bean
    public RedissonClient redissonClient() {
        // 1. 创建配置
        Config config = new Config();
        String redisAddress = String.format("redis://%s:%s", host, port);
        // 使用单个Redis
        config.useSingleServer()
            .setAddress(redisAddress).setPassword(password).setDatabase(0);
        // 2. 创建实例
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

可重入锁业务实现

解决同一个线程无法多次获取同一把锁的问题。

java
@Override
public String getProductOnlyOneByRedisson(Long id) {
    // 判断商品是否存在
    Product pro = getById(id);
    if (pro == null) {
        throw new BusinessException(400, "商品不存在");
    }
    // 判断库存
    int num = pro.getNum();
    if (num <= 0) {
        throw new BusinessException(400, "库存不足");
    }

    String lock_key = REDIS_LOCK_PRE + UserHolder.getUser().getPhone();
    RLock lock = redissonClient.getLock(lock_key);
    try {
        // ⭐️ 使用Redisson自带的可重入锁(给用户上锁)
        if (!lock.tryLock()) {
            throw new BusinessException(400, "下单失败");
        }
        // 判断是否有重复订单
        long cnt = ordertableService.count(
            new QueryWrapper<Ordertable>()
            .eq("userID", UserHolder.getUser().getPhone())
            .eq("productID", id));

        if (cnt != 0) {
            // 存在重复订单
            throw new BusinessException(400, "不允许重复下单");
        }
        // 下单
        Ordertable newOrder = new Ordertable();
        newOrder.setOrderID(idUtil.getID("order").toString());
        newOrder.setUserID(UserHolder.getUser().getPhone());
        newOrder.setProductID(Math.toIntExact(id));
        // 库存 - 1
        pro.setNum(num - 1);
        boolean success = update(pro, new QueryWrapper<Product>()
                                 .eq("num", num));
        if (!success) {
            throw new BusinessException(400, "下单失败");
        }
        // 保存订单
        ordertableService.save(newOrder);
        return newOrder.getOrderID();
    } finally {
        // ⭐️ 释放锁
        lock.unlock();
    }
}

可重入锁原理

image-20240121170553645

锁重试和看门狗机制

Redisson分布式锁原理:

  1. 可重入:利用hash结构记录线程id和重入次数
  2. 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  3. 锁过期问题:watchDog看门狗机制,每隔一段时间(releaseTime / 3),重置超时时间

锁重试和看门狗机制流程图

看门狗原理:

  1. 监听当前线程,默认过期时间是 30 秒,每 10 秒续期一次(补到 30 秒)
  2. 如果线程挂掉(注意 debug 模式也会被它当成服务器宕机),则不会续期
java
@Test
void testWatchDog() {
    RLock lock = redissonClient.getLock("lock");
    try {
        // 只有一个线程能获取到锁(超时时间设置为-1)
        if (lock.tryLock(0, -1, TimeUnit.MILLISECONDS)) {
            // todo 实际要执行的代码
            Thread.sleep(300000);
            System.out.println("getLock: " + Thread.currentThread().getId());
        }
    } catch (InterruptedException e) {
        System.out.println(e.getMessage());
    } finally {
        // 只能释放自己的锁
        if (lock.isHeldByCurrentThread()) {
            System.out.println("unLock: " + Thread.currentThread().getId());
            lock.unlock();
        }
    }
}

分布式锁主从一致性

联锁机制:获取所有的节点都获取到锁才算成功。

业务实现:

Redis优化秒杀

优化秒杀思路

优化流程:使用异步操作提高吞吐量。

  1. 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  2. 再将下单业务放入阻塞队列,利用独立线程异步下单

使用Redis存储秒杀库存数量和“一人一单”的订单信息,使用Lua脚本实现原子操作。

使用Lua脚本实现秒杀

使用lua脚本实现原子性

lua
-- 参数
local productID = ARGV[1]
local userID = ARGV[2]

-- 数据
local productKey = 'product:num:' .. productID
local orderKey = "product:order" .. productID

-- 业务
-- 库存判断
if tonumber(redis.call('get', productKey)) <= 0 then
    return 1
end
-- 是否下单过
if redis.call('sismember', orderKey, userID) == 1 then
    return 2
end

redis.call('incrby', productKey, -1)
redis.call('sadd', orderKey, userID)

return 0

执行lua脚本完成下单

java
@Override
public String getProductOnlyOneByRedis2(Long id) {
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
    redisScript.setLocation(new ClassPathResource("lua/shop.lua"));
    Long success = redisTemplate
        .execute(redisScript, Collections.emptyList(), id, 
                 UserHolder.getUser().getPhone());
    if (success == null || success != 0) {
        throw new BusinessException(400, "下单失败");
    }
    Long orderID = idUtil.getID("order");
    // todo 保存到阻塞队列
    return orderID.toString();
}

模拟创建订单

java
@Test
void addProduct() {
    Product pro = new Product();
    pro.setProductName("yixuan");
    pro.setNum(10);
    boolean success = productService.save(pro);
    String key = "RedisSessionDemo:productNum:" + pro.getId();
    String value = pro.getNum().toString();
    if (success) {
        stringRedisTemplate.opsForValue().set(key, value);
    }
    Assertions.assertNotNull(stringRedisTemplate.opsForValue().get(key));
}

基于阻塞队列实现异步下单

java
private static final DefaultRedisScript<Long> redisScript;

static {
    redisScript = new DefaultRedisScript<>();
    redisScript.setLocation(new ClassPathResource("lua/shop.lua"));
    redisScript.setResultType(Long.class);
}

private BlockingQueue<Ordertable> orderTasks = 
    new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SHOP_ORDER_EXECUTOR = 
    Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
    SHOP_ORDER_EXECUTOR.submit(new OrderHandler());
}

private class OrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                // 获取队列
                Ordertable order = orderTasks.take();
                // 库存 - 1
                boolean success = update().setSql("num = num - 1")
                    .eq("id", order.getProductID())
                    .gt("num", 0)
                    .update();
                if (!success) {
                    throw new BusinessException(400, "下单失败");
                }
                // 保存订单
                ordertableService.save(order);
            } catch (Exception e) {
                log.error("", e);
            }
        }
    }
}

@Override
public String getProductOnlyOneByRedis2(Long id) {
    // 执行lua脚本
    String userPhone = UserHolder.getUser().getPhone();
    Long success = redisTemplate.execute(
        redisScript,Collections.emptyList(), id.toString(), userPhone);
    if (success == null || success != 0) {
        throw new BusinessException(400, "下单失败");
    }

    // 创建订单
    Long orderID = idUtil.getID("order");
    Ordertable order = new Ordertable();
    order.setOrderID(orderID.toString());
    order.setProductID(Math.toIntExact(id));
    order.setUserID(userPhone);

    // 保存到阻塞队列
    orderTasks.add(order);
    return orderID.toString();
}

Redis消息队列

什么是消息队列

消息队列(Message Queue):存放消息的队列。

最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

使用List模拟MQ

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。

当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的MQ

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

sh
# 订阅一个或多个频道
SUBSCRIBE channel [channel] 

# 向一个频道发送消息
PUBLISH channel msg

# 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern[pattern]

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的MQ

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:XADD

读取消息:XREAD

① 读取第一个消息

image-20240130125054472

② 阻塞方式读取消息

image-20240130125106814

当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

STREAM类型消息队列的XREAD命令特点:

  1. 消息可回溯
  2. 一个消息可以被多个消费者读取
  3. 可以阻塞读取
  4. 有消息漏读的风险

基于Stream消费者组的MQ

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

特点:消息分流、消息标示、消息确认。

创建消费者组:

sh
XGROUP CREATE key groupName ID [MKSTREAM]
# 1. key:队列名称
# 2. groupName:消费者组名称
# 3. ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
# 4. MKSTREAM:队列不存在时自动创建队列

其它常见命令:

sh
# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

sh
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
# 1. group:消费组名称
# 2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# 3. count:本次查询的最大数量
# 4. BLOCK milliseconds:当没有消息时最长等待时间
# 5. NOACK:无需手动ACK,获取到消息后自动确认
# 6. STREAMS key:指定队列名称
# 7. ID:获取消息的起始ID:
# 7.1. ">":从下一个未消费的消息开始
# 7.2. 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路

java
while (true) {
    //尝试监听队列,使用阻塞模式,最长等待2000毫秒
    Object msg = redis.call(
        "XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
    if (msg == null) {
        //null说明设有消息,继续下一次
        continue;
    }
    try {
        //处理消息,完成后一定要ACK
        handleMessage(msg);
    } catch (Exception e) {
        while (true) {
            Object msg2 = redis.call(
                "XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
            if (msg2 == null) {
                //null说明没有异常消息,所有消息都已确以,结束循环
                break;
            }
            try {
                //说明有异常消息,再次处理
                handleMessage(msg2);
            } catch (Exception e) {
                //再次出现异常,记录日志,继续循环
                continue;
            }
        }
    }
}

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

三种MQ实现方式对比

ListPubSubStream
消息持久化支持不支持支持
阻塞读取支持支持支持
消息堆积处理受限于内存空间,可以利用多消费者加快处理受限于消费者缓冲区受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制不支持不支持支持
消息回溯不支持不支持支持

使用基于Stream的MQ优化秒杀

修改Lua脚本创建stream消息队列

lua
-- 参数
local productID = ARGV[1]
local userID = ARGV[2]
local orderID = ARGV[3]

-- 数据
local productKey = 'RedisSessionDemo:productNum:' .. productID
local orderKey = "RedisSessionDemo:productOrder:" .. productID

-- 业务
-- 库存判断
if tonumber(redis.call('get', productKey)) <= 0 then
    return 1
end
-- 是否下单过
if redis.call('sismember', orderKey, userID) == 1 then
    return 2
end

redis.call('incrby', productKey, -1)
redis.call('sadd', orderKey, userID)

-- 下单
redis.call('xadd', 'stream.orders', '*',
        'orderID', orderID, 'userID', userID, 'productID', productID)

return 0

执行Lua脚本下单

java
private static final DefaultRedisScript<Long> StreamScript;

static {
    StreamScript = new DefaultRedisScript<>();
    StreamScript.setLocation(new ClassPathResource("lua/shop-stream.lua"));
    StreamScript.setResultType(Long.class);
}

@Override
public String getProductOnlyOneByStream(Long id) {
    String userPhone = UserHolder.getUser().getPhone();
    Long orderID = idUtil.getID("order");
    Long success = redisTemplate.execute(
        StreamScript, Collections.emptyList(), id, userPhone, orderID);
    if (success == null || success != 0) {
        throw new BusinessException(400, "下单失败");
    }
    return orderID.toString();
}

使用多线程读取MQ的信息

java
private static final ExecutorService SHOP_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
    SHOP_ORDER_EXECUTOR.submit(new StreamHandler());
}

private class StreamHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 获取stream
                // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >
                List<MapRecord<String, Object, Object>> list = 
                    redisTemplate.opsForStream().read(Consumer.from("g1", "c1"),           StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.order", ReadOffset.lastConsumed())
                );
                if (list == null || list.isEmpty()) {
                    // 没有消息
                    continue;
                }
                // 解析消息
                Map<Object, Object> values = list.get(0).getValue();
                Ordertable order = BeanUtil.fillBeanWithMap(
                    values, new Ordertable(), true);
                // 库存 - 1
                boolean success = update().setSql("num = num - 1")
                    .eq("id", order.getProductID())
                    .gt("num", 0)
                    .update();
                if (!success) {
                    throw new BusinessException(400, "下单失败");
                }
                // 保存订单
                ordertableService.save(order);
                // ACK 确认
                redisTemplate.opsForStream().acknowledge(
                    "stream.order","g1", list.get(0).getId());
            } catch (Exception e) {
                while (true) {
                    try {
                        // 读取 pending-list
                        List<MapRecord<String, Object, Object>> pendingList = redisTemplate.opsForStream()
                            .read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(
                                "stream.order", ReadOffset.from("0"))
                        );
                        if (pendingList == null) {
                            // pendingList 为空 -> 结束循环
                            break;
                        }
                        // 解析消息
                        Map<Object, Object> values = 
                            pendingList.get(0).getValue();
                        Ordertable order = BeanUtil.fillBeanWithMap(
                            values, new Ordertable(), true);
                        // 库存 - 1
                        boolean success = update().setSql("num = num - 1")
                            .eq("id", order.getProductID())
                            .gt("num", 0)
                            .update();
                        if (!success) {
                            throw new BusinessException(400, "下单失败");
                        }
                        // 保存订单
                        ordertableService.save(order);
                        // ACK 确认
                        redisTemplate.opsForStream()
                            .acknowledge("stream.order","g1", 
                                         pendingList.get(0).getId());
                    } catch (Exception excp) {
                        log.error("", excp);
                    }
                }
            }
        }
    }

Released under the MIT License.