【329期】如何利用redis分布式锁,解决秒杀场景下的订单超卖问题

共 17934字,需浏览 36分钟

 ·

2021-09-14 21:21

1. 秒杀场景

Controller层:

@RestController
@RequestMapping("/skill")
@Slf4j
public class SecKillController {
    @Autowired
    private SecKillService secKillService;
 
    //查询秒杀活动特价商品的信息
    @GetMapping("/query/{productId}")
    public String query(@PathVariable String productId)throws Exception {
        return secKillService.querySecKillProductInfo(productId);
    }

    //秒杀
    @GetMapping("/order/{productId}")
    public String skill(@PathVariable String productId)throws Exception {
        log.info("@skill request, productId:" + productId);
        secKillService.orderProductMockDiffUser(productId);
        return secKillService.querySecKillProductInfo(productId);
    }
}

Service层:

@Service
public class SecKillServiceImpl implements SecKillService {
    
    private static final int TIMEOUT = 10 * 1000//超时时间 10s
    
    @Autowired
    private RedisLock redisLock;

   // 雅诗兰黛特价小棕瓶,限量100000份
    static Map<String,Integer> products;
    static Map<String,Integer> stock;
    static Map<String,String> orders;
    static {
        //模拟多个表,商品信息表,库存表,秒杀成功订单表
        products = new HashMap<>();
        stock = new HashMap<>();
        orders = new HashMap<>();
        //商品Id---商品库存
        products.put("123456"100000);
        //商品id---商品库存
        stock.put("123456"100000);
    }

    private String queryMap(String productId) {
        return "雅诗兰黛小棕瓶特价,限量份"
                + products.get(productId)
                +" 还剩:" + stock.get(productId)+" 份"
                +" 该商品成功下单用户数目:"
                +  orders.size() +" 人" ;
    }

    @Override
    public String querySecKillProductInfo(String productId) {
        return this.queryMap(productId);
    }

    //秒杀的逻辑:可以在该方法生加上Synchronized解决超卖
    @Override
    public void  orderProductMockDiffUser(String productId) {
        //1.查询该商品库存,为0则活动结束。
        int stockNum = stock.get(productId);
        if(stockNum == 0) {
            throw new SellException(100,"活动结束");
        }else {
            //2.下单(模拟不同用户openid不同)
            orders.put(KeyUtil.genUniqueKey(),productId);
            //3.减库存
            stockNum =stockNum-1;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //4.更新库存
            stock.put(productId,stockNum);
        }
    }
}

使用压测工具测试结果:雅诗兰黛小棕瓶特价,限量份100000 还剩:99938份 该商品成功下单用户数目:646人

很明显订单超卖了,如何解决?可以在秒杀方法上加上Synchronized,但是只适合单点服务器,且性能低

2. Redis分布式锁解决订单超卖

2.1 两个命令介绍

业务场景:

天猫双11热卖过程中,对已经售罄的货物追加补货,且补货完成。客户购买热情高涨, 3秒内将所有商品购买完毕。本次补货已经将库存全部清空,如何避免最后一件商品不被多人同时购买?【超卖问题】就是说如果只剩一件商品,但是有5个人要买,如何保证不被超卖???

业务分析:

使用watch监控一个key有没有改变已经不能解决问题,此处要监控的是具体数据 ,我们要监控的是商品的数量什么时候到1,这个商品的数量是一直变化的,不可能别每次变化,都放弃执行。

解决方案:

使用 setnx 设置一个公共锁:setnx lock-key value,操作完毕通过del操作释放锁

利用setnx命令的返回值特征,有值则返回设置失败,无值则返回设置成功

对于返回设置成功的,拥有控制权进行下一步的具体业务操作,对于返回设置失败的,不具有控制权,排队或等待

127.0.0.1:6379> set num 10
OK
127.0.0.1:6379> setnx lock-num 1 -- 加锁
(integer) 1
127.0.0.1:6379> incrby num -1 
(integer) 9
127.0.0.1:6379> del lock-num  -- 释放锁
(integer) 1

127.0.0.1:6379> setnx lock-num 1  --  当前客户端加锁
(integer) 1 

127.0.0.1:6379> setnx lock-num 1  -- 其他客户端获取不到锁
(integer) 0

死锁: 如果加了锁,但是没有释放,就会导致死锁,其他客户端一直获取不到锁。

使用expire为锁key添加时间限定,到时间不释放,放弃锁

由于操作通常都是微秒或毫秒级,因此该锁定时间不宜设置过大。具体时间需要业务测试后确认。推荐:Java进阶视频资源

  • expire lock-key second
  • pexpire lock-key milliseconds
127.0.0.1:6379> set name 123
OK
127.0.0.1:6379> setnx lock-name 1 -- 锁的名称key
(integer) 1
127.0.0.1:6379> expire lock-name 20 -- 使用expire为锁key添加时间限定
(integer) 1
127.0.0.1:6379> get name
"123"

GETSET key value

将给定 key 的值设为 value ,并返回 key 的旧值(old value)。

redis> GETSET db mongodb    # 没有旧值,返回 nil
(nil)

redis> GET db
"mongodb"

redis> GETSET db redis      # 返回旧值 mongodb
"mongodb"

redis> GET db
"redis"

2.2 RedisLock

@Component
@Slf4j
public class RedisLock {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 加锁
     * @param key:productId
     * @param value 当前时间+超时时间
     */

    public boolean lock(String key, String value) {
        //setnx----对应方法 setIfAbsent(key, value),如果可以加锁返回true,不可以加锁返回false
        if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
            return true;
        }
 
        //下面这段代码时为了解决可能出现的死锁情况
        String currentValue = redisTemplate.opsForValue().get(key);
        //如果锁过期
        if (!StringUtils.isEmpty(currentValue)
                && Long.parseLong(currentValue) < System.currentTimeMillis()) {
            //获取上一个锁的时间:重新设置锁的过期时间value,并返回上一个过期时间
            String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
            //currentValue =2020-12-28,两个线程的value=2020-12-29,只会有一个线程拿到锁
            if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
                return true;
            }
        }
        return false;
    }

    //解锁
    public void unlock(String key, String value) {
        try {
            String currentValue = redisTemplate.opsForValue().get(key);
            if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
                redisTemplate.opsForValue().getOperations().delete(key);
            }
        }catch (Exception e) {
            log.error("【redis分布式锁】解锁异常, {}", e);
        }
    }
}

2.3 将redis分布式锁应用于秒杀业务

@Override
public void orderProductMockDiffUser(String productId) {
    //加锁
    //锁的过期时间为当前时间+过期时长
    long time = System.currentTimeMillis()+TIMEOUT;
    if(!redisLock.lock(productId,String.valueOf(time))){
        throw new SellException(101,"人太多,稍后再来");
    }
    //1.查询该商品库存,为0则活动结束。
    int stockNum = stock.get(productId);
    if(stockNum == 0) {
        throw new SellException(100,"活动结束");
    }else {
        //2.下单(模拟不同用户openid不同)
        orders.put(KeyUtil.genUniqueKey(),productId);
        //3.减库存
        stockNum =stockNum-1;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.更新库存
        stock.put(productId,stockNum);
    }
    //解锁
    redisLock.unlock(productId,String.valueOf(time));
}

2.4 分析RedisLock

重点分析加锁逻辑,有两个逻辑需要考虑:

public boolean lock(String key, String value) {

    if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
        return true;
    }
    //下面的代码是为了解决可能出现的死锁的情况????
    String currentValue = redisTemplate.opsForValue().get(key);
    if (!StringUtils.isEmpty(currentValue)
        && Long.parseLong(currentValue) < System.currentTimeMillis()) {
        //下面这个逻辑又怎么理解????
        String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
        if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
            return true;
        }
    }
    return false;
}

为了解决可能出现的死锁的情况????

//秒杀业务方法
@Override
public void orderProductMockDiffUser(String productId) {
    //加锁
    //锁的过期时间为当前时间+过期时长
    long time = System.currentTimeMillis()+TIMEOUT;
    if(!redisLock.lock(productId,String.valueOf(time))){
        throw new SellException(101,"人太多,稍后再来");
    }
    //1.查询该商品库存,为0则活动结束。
    int stockNum = stock.get(productId);
    if(stockNum == 0) {
        throw new SellException(100,"活动结束");
    }else {
        //2.下单
        orders.put(KeyUtil.genUniqueKey(),productId);
        //3.减库存
        stockNum =stockNum-1;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.更新库存
        stock.put(productId,stockNum);
    }
    //解锁
    redisLock.unlock(productId,String.valueOf(time));
}

假如我们将中间那段逻辑去掉会出现声明情况???

public boolean lock(String key, String value) {
    if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
        return true;
    }
    return false;
}

① 线程A执行秒杀的业务逻辑方法,并对这个方法加了锁,key=proeuctId,value=加锁时间+过期时长,然后开始执行下单----》减库存-----》更新库存等操作,如果在执行的过程中,这段代码发生了异常,那么线程A是不会释放锁的,导致其他线程都无法获取到锁导致死锁的产生,所以下面的逻辑是很有必要加的,即如果当前时间晚于锁的过期时间,那么就会向下走if()条件:

//下面的代码是为了解决可能出现的死锁的情况????
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue)
    && Long.parseLong(currentValue) < System.currentTimeMillis()) {
        //下面这个逻辑又怎么理解????
        String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
        if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
        return true;
    }
}

② 可是下面的if()条件怎么理解?

currentValue=2020-12-18

假如现在两个线程A和B同时执行lock()方法,也就是这两个线程的value是完全相同的,都为value=2020-12-19,而他们都执行 String oldValue = redisTemplate.opsForValue().getAndSet(key, value);,会有一个先执行一个后执行:

假如线程A先执行,返回的oldValue=2020-12-18,同时设置value = 2020-12-19,由于oldvalue=currentValue返回true,即A线程加了锁;

此时B线程继续执行 ,返回的oldValue=2020-12-19,oldvalue!=currentValue,返回false,加锁失败

所以这段代码的逻辑是只会让一个线程加锁

public boolean lock(String key, String value) {
    if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
        return true;
    }
    String currentValue = redisTemplate.opsForValue().get(key);
    if (!StringUtils.isEmpty(currentValue)
        && Long.parseLong(currentValue) < System.currentTimeMillis()) {
        //下面这个逻辑又怎么理解????
        String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
        if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
            return true;
        }
    }
    return false;
}

感谢阅读,希望对你有所帮助 :) 

来源:hengheng.blog.csdn.net/article/details/107827649

最近给大家找了  通用权限系统


资源,怎么领取?


扫二维码,加我微信,回复:通用权限系统

 注意,不要乱回复 

没错,不是机器人
记得一定要等待,等待才有好东西
浏览 35
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报