实现消费券秒杀的优化,在加入限时抢购的优惠券时,自动的将消费券的库存stock信息也加入到redis中(可设为抢购结束后过期)

抢购之前在redis中进行库存是否充足(stock)、用户是否已经抢购(set)的判断

如果条件都满足,则将订单信息加入到消息队列中

另开启一个线程将消息队列中订单信息异步地同步到数据库中,这样就缓解了直接写数据库的压力,新开启的线程可以根据数据库适应的速度进行写操作

异步秒杀业务流程

说明

Lua脚本保证一些操作在Redis执行的原子性。直接由redis中的eva "lua script" [keys] [args]命令调用

在一开始直接将优惠券的信息直接读到redis中,然后利用lua脚本直接在redis中进行用户抢购资格判断(避免了每次抢购都要直接读数据库进行判断)

根据lua脚本的返回值判断用户是否符合资格,如果符合资格则加入到消息队列(或阻塞队列)中

三种redis实现消息队列的思路

基于list实现消息队列

redis中的list数据结构是一个双向链表,可以利用LPUSH和RPOP、RPUSH和LPOP组合实现消息队列

队列中没有消息再pop会直接返回null,可以使用BLPOP、BRPOP来达到阻塞等待pop的效果

优点:基于redis存储不受限于jvm内存上限、基于redis持久化机制数据安全性有保证、可满足消息有序性

缺点:无法避免消息丢失(一个消费者获取消息后出现异常)、只适用于单消费者模式(只能pop一次)

基于PubSub实现消息队列

常用的命令

subscribe channel [channel]  #消费者订阅频道
publish [channel] [msg]  #生产者向频道发送消息
psubscribe pattern[pattern]  #订阅和pattern格式相同的频道  通配符?、*、[]、\

优点:支持多生产者、多消费者模式

缺点:不支持持久化、无法避免消息丢失(无人订阅时消息一发送就丢失)、消息堆积有上限超出时消息丢失

基于Stream消费组实现消息队列

redis5.0引入的一种数据类型(可持久化),可以实现一个功能非常完善的消息队列

单消费者模式

优点:消息可回溯、一个消息可被多个消费者获取、可阻塞读取

缺点:任然存在消息漏读的风险

消费者组模式

xgroup create [key] [groupname] [id] [mkstream]  
#key代表队列名称、id为消息索引$为最后一个消息,0为第一个消息、mkstream自动创建队列(可选)

xgroup destroy [key] [groupname]
#删除指定的消费者组

xgroup createconsumer [key] [groupname] [consumername]
#给key消息队列的groupname组中添加消费者consumername

xgroup delconsumer [key] [groupname] [consumername]
#将key消息队列的groupname组中消费者consumername删除

是redis中最为完善的消息队列:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有漏读风险、消息确认机制,每个消息至少被消费

基于Stream消息队列异步秒杀的代码实现

新增优惠券信息时将库存信息写入redis中{“voucher:stock:voucherId” : “stockValue”}

创建一个Stream类型的消息队列,名为“stream.orders”。redis创建命令:XGROUP CREATE stream.orders g1 0 MKSTREAM

修改秒杀脚本,用户在redis中获取到抢购资格后直接在redis中将订单信息加入到消息队列中

lua脚本seckill.lua

判断用户是否有购买资格,如果有则将订单信息加入到消息队列中

-- 1、参数列表
-- 1.1、优惠券id
local voucherId = ARGV[1]
-- 1.2、用户id
local userId = ARGV[2]
-- 1.3、订单id
local orderId = ARGV[3]
-- 2、数据key
-- 1.1、库存key
local stockKey = "seckill:stock:" .. voucherId
-- 1.2、订单key
local orderKey = "seckill:order:" .. voucherId
-- 3、脚本业务
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.1、判断库存是否充足,如果不足返回1
    return 1
end
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.2、判断用户是否下过单,如果下过单返回2
    return 2
end
-- 4、扣减库存
redis.call('incrby', stockKey, -1)
-- 5、将userId存入订单的set集合,返回0
redis.call('sadd',orderKey, userId)
-- 6、发送消息到消息队列中  xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

return 0

静态加载lua脚本

//静态加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
     //resource/seckill.lua
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); 
    //lua脚本执行后的返回类型
    SECKILL_SCRIPT.setResultType(Long.class);
}

秒杀的主业务流程

SeckillServiceImpl.seckillVoucher2()

public Result seckillVoucher2(Long voucherId){
    UserDTO user = UserHolder.getUser();
    long orderId = redisIdWorker.nextId("order");

    //1、执行lua脚本
    Long returnValue = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(),
        user.getId().toString(),
        String.valueOf(orderId)
    );
    //2、判断结果是否为0
    //3、不为0,没有资格购买
    int result = returnValue.intValue();
    if(result != 0){
        return Result.fail(result == 1 ? "库存不足" : "仅限抢购一单");
    }

    //4、如果有购买资格的话在lua脚本中就将订单加入到消息队列到中了

    return Result.ok(orderId);
}

在项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,异步的将消息队列中未处理的订单信息写入到数据库

创建一个专门处理处理消息队列的内部类SeckillServiceImpl.VoucherOrderHandler.java

private static final String STREAM_ORDER_MQ = "stream.orders";
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct  //在项目启动时,开启一个线程任务
private void init(){
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

//专门处理将订单信息写入数据库的线程
private class VoucherOrderHandler implements Runnable{
    @Override
    public void run() {
        while(true){
            try {
                //1、获取消息队列中的信息
                //xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >
                List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().block(Duration.ofSeconds(2)).count(1),
                    StreamOffset.create(STREAM_ORDER_MQ, ReadOffset.lastConsumed())
                );

                //2判断获取消息是否成功
                if(records == null || records.isEmpty()){
                    //3、如果失败,说明没有消息,继续循环
                    continue;
                }
                //String 为消息的id, <Object, Object>存的是消息队列中的键值对
                MapRecord<String, Object, Object> record = records.get(0);
                Map<Object, Object> map = record.getValue();

                //4、如果获取成功进行下单
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                handleCreateOrder(voucherOrder);

                //5、在消息队列中对此条消息进行确认
                stringRedisTemplate.opsForStream().acknowledge(STREAM_ORDER_MQ, "g1", record.getId());

            } catch (Exception e) {
                log.error("订单处理错误", e);
                //当出现异常时,从pendingList获取未处理的消息继续处理
                handlePendingList();
            }
        }
    }

    //发生异常时进行处理
    private void handlePendingList(){
        while(true){
            try {
                //1、获取pending-list中的异常消息
                //xreadgroup group g1 c1 count 1 block 2000 streams stream.orders 0
                List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create(STREAM_ORDER_MQ, ReadOffset.from("0"))
                );

                //2判断是否有异常消息
                if(records == null || records.isEmpty()){
                    //3、如果没有跳出循环,执行正常消息
                    break;
                }
                //String 为消息的id, <Object, Object>存的是消息队列中的键值对
                MapRecord<String, Object, Object> record = records.get(0);
                Map<Object, Object> map = record.getValue();

                //4、如果获取成功进行下单
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                handleCreateOrder(voucherOrder);

                //5、在消息队列中对此条消息进行确认
                stringRedisTemplate.opsForStream().acknowledge(STREAM_ORDER_MQ, "g1", record.getId());

            } catch (Exception e) {
                log.error("订单处理错误", e);
                //如果又出现异常,进入下一轮循环
                try {
                    Thread.sleep(50);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

    private void handleCreateOrder(VoucherOrder voucherOrder) {
        voucherOrderService.save(voucherOrder);
    }
}

原文地址:http://www.cnblogs.com/Gw-CodingWorld/p/16816205.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性