前言
在优化之前我们完成了基于redis分布式锁的优惠券秒杀功能,但是这个功能并不完善,虽然能用但在高并发的情况下处理速度是很慢的。为什么呢?
我们回顾一遍业务流程:用户下单-判断库存-尝试加锁-添加订单-返回订单,这样我们业务流程是单线程的,这样每次用户下单都会执行这一整套流程,速度是很慢的。如何优化呢?
我们何不将添加订单的过程交给另一些线程去完成,将判断用户是否有权添加订单的同时,为另外一些用户添加订单,判断和添加过程异步执行,这样效率明显更高。
利用lua脚本保证业务的原子性
为了保证判断业务的原子性,我们使用lua脚本编写判断业务的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
local voucherId = ARGV[1];
local userId = ARGV[2];
local orderId = ARGV[3];
local storeKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order' .. voucherId
if(tonumber(redis.call('get', storeKey)) <= 0) then return 1 end
if(redis.call('sismember', orderKey, userId) == 1) then return 2; end
redis.call('incrby', storeKey, -1)
redis.call('sadd', orderKey, userId)
|
基于阻塞队列的异步优化
阻塞队列:不知道大伙有没有了解过生产者消费者问题,生产者不断将产品放入临界区中,这些产品称为临界资源,然后消费者不断从临界区中拿取资源消费,消息队列在里面就扮演临界区的角色。
线程池:线程池担任一个管理和重用线程的角色,当任务到来时,若存在空闲线程则调用线程完成任务,当指定的任务完成后,线程将自动返回线程池;若不存在空闲线程,则将任务放入队列中等待。
优化思路:我们何不将添加订单的过程交给另一些线程去完成,将判断用户是否有权添加订单的同时,为另外一些用户添加订单,判断和添加过程异步执行,这样效率明显更高。我们将有权添加订单的订单信息保存在阻塞队列中,然后再利用线程池创建一些线程不断获取阻塞队列中的订单,这样一来就实现了异步过程。
阻塞队列详细介绍-CSDN博客
一文带你彻底弄懂线程池-腾讯云开发者社区-腾讯云
流程图如下:

代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); }
private class VoucherOrderHandler implements Runnable { private String queueName = "stream:order";
@Override public void run() { while(true){ try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (InterruptedException e) { log.info("订单获取失败"); } } } }
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); IVoucherOrderService proxy; @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); if (r != 0) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); }
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); RLock redisLock = redissonClient.getLock("lock:order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { log.error("不允许重复下单!"); return; } try { proxy.createSeckillVoucher(voucherOrder); } finally { redisLock.unlock(); } }
public void createSeckillVoucher(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); RLock redisLock = redissonClient.getLock("lock:order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { log.error("不允许重复下单!"); return; }
try { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { log.error("不允许重复下单!"); return; }
boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") .eq("voucher_id", voucherId).gt("stock", 0) .update(); if (!success) { log.error("库存不足!"); return; }
save(voucherOrder); } finally { redisLock.unlock(); } }
|
基于消息队列的异步优化
在使用阻塞队列时,如果入队的元素过多可能导致jvm崩溃,因为阻塞队列需要占用大量的内存空间(这里的内存空间不是指全部的内存,而是分配给jvm的那一部分内存),这不是我们所希望的。那我们有没有另一种工具来代替阻塞队列呢?
消息队列与阻塞队列:消息队列比阻塞队列的功能更加强大,下面是他们两的简单对比。
特性 |
阻塞队列 (Blocking Queue) |
消息队列 (Message Queue) |
定位与层级 |
线程级同步机制,属于并发编程工具 |
系统/进程间通信中间件,属于分布式系统基础设施 |
主要目的 |
协调同一进程内多线程间的数据交换 |
实现不同进程/服务/系统间的异步通信、解耦、削峰填谷 |
数据存储 |
内存中(进程重启数据丢失) |
持久化到磁盘(系统崩溃可恢复) |
通信范围 |
单一进程内的线程间通信 |
跨进程、跨主机、跨网络的分布式通信 |
通信协议 |
编程语言API调用(如Java的put() , take() ) |
网络协议(AMQP, MQTT, Kafka协议等) |
耦合性 |
生产者线程和消费者线程紧密耦合(需同时在线) |
生产者和消费者完全解耦(各自独立运行) |
可靠性 |
较低:无持久化/消息确认机制 |
高:持久化、ACK机制、重试、死信队列 |
扩展性 |
受限于单机资源(内存/CPU) |
高可扩展:支持集群化与水平扩展 |
技术复杂度 |
实现简单,语言标准库提供 |
架构复杂,需独立部署运维中间件集群 |
典型应用场景 |
线程池任务缓冲、高并发请求处理、流水线处理 |
微服务解耦、事件驱动架构、日志收集、流量削峰 |
典型实现 |
Java BlockingQueue 实现类(如ArrayBlockingQueue ) |
RabbitMQ, Kafka, RocketMQ, Amazon SQS |
消息队列:简单来说消息队列就是进阶的阻塞队列,不仅可以在进程内使用,还可以在两个进程之间使用,它最具有标志的特点就是可以降低模块的耦合性,一个模块崩溃了,其他模块基本不会收到波及。消息队列也是基于生产者消费者模型,不过消息队列是支持多生产者与消费者的,一份消息可能被多个消费者接收并处理。
可惜的是实现的代码里并没有体现出消息队列这些特点 😭 ,等博主学成归来再好好给大家唠唠 👊 👊
消息队列MQ快速入门-CSDN博客
实现代码:
在lua脚本中增加这一条,其他代码基本上只有任务实现上有改变。
1 2
| redis.call('xadd', 'stream.orders', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| private class VoucherOrderHandler implements Runnable { private String queueName = "stream:order"; @Override public void run() { while(true){ List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); if(list == null || list.isEmpty()){ continue; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } }
private void handlePendingList() { while (true) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create("stream.orders", ReadOffset.from("0")) ); if (list == null || list.isEmpty()) { break; } MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); createSeckillVoucher(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); handlePendingList(); } } } }
|
感谢观看 ❤️ ❤️