前言

在优化之前我们完成了基于redis分布式锁的优惠券秒杀功能,但是这个功能并不完善,虽然能用但在高并发的情况下处理速度是很慢的。为什么呢?
我们回顾一遍业务流程:用户下单-判断库存-尝试加锁-添加订单-返回订单,这样我们业务流程是单线程的,这样每次用户下单都会执行这一整套流程,速度是很慢的。如何优化呢?

我们何不将添加订单的过程交给另一些线程去完成,将判断用户是否有权添加订单的同时,为另外一些用户添加订单,判断和添加过程异步执行,这样效率明显更高。

利用lua脚本保证业务的原子性

为了保证判断业务的原子性,我们使用lua脚本编写判断业务的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1];
-- 1.2.用户id
local userId = ARGV[2];
-- 1.3.订单id
local orderId = ARGV[3];
-- 2.数据key
-- 2.1.库存key
local storeKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', storeKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2;
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', storeKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)

基于阻塞队列的异步优化

阻塞队列:不知道大伙有没有了解过生产者消费者问题,生产者不断将产品放入临界区中,这些产品称为临界资源,然后消费者不断从临界区中拿取资源消费,消息队列在里面就扮演临界区的角色。

线程池:线程池担任一个管理和重用线程的角色,当任务到来时,若存在空闲线程则调用线程完成任务,当指定的任务完成后,线程将自动返回线程池;若不存在空闲线程,则将任务放入队列中等待。

优化思路:我们何不将添加订单的过程交给另一些线程去完成,将判断用户是否有权添加订单的同时,为另外一些用户添加订单,判断和添加过程异步执行,这样效率明显更高。我们将有权添加订单的订单信息保存在阻塞队列中,然后再利用线程池创建一些线程不断获取阻塞队列中的订单,这样一来就实现了异步过程。

阻塞队列详细介绍-CSDN博客

一文带你彻底弄懂线程池-腾讯云开发者社区-腾讯云

流程图如下:

image-20250726012932391

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
private String queueName = "stream:order";

@Override
public void run() {
// 不断执行任务
while(true){
try {
// 1.从阻塞队列中获取订单
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.info("订单获取失败");
}
}
}
}

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);
}


private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 1.获取用户
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.tryLock();
// 4.判断是否获取锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}
try {
proxy.createSeckillVoucher(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}



public void createSeckillVoucher(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 尝试获取锁
boolean isLock = redisLock.tryLock();
// 判断
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}

try {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("不允许重复下单!");
return;
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足!");
return;
}

// 7.创建订单
save(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}

基于消息队列的异步优化

在使用阻塞队列时,如果入队的元素过多可能导致jvm崩溃,因为阻塞队列需要占用大量的内存空间(这里的内存空间不是指全部的内存,而是分配给jvm的那一部分内存),这不是我们所希望的。那我们有没有另一种工具来代替阻塞队列呢?

消息队列与阻塞队列:消息队列比阻塞队列的功能更加强大,下面是他们两的简单对比。

特性 阻塞队列 (Blocking Queue) 消息队列 (Message Queue)
定位与层级 线程级同步机制,属于并发编程工具 系统/进程间通信中间件,属于分布式系统基础设施
主要目的 协调同一进程内多线程间的数据交换 实现不同进程/服务/系统间的异步通信、解耦、削峰填谷
数据存储 内存中(进程重启数据丢失) 持久化到磁盘(系统崩溃可恢复)
通信范围 单一进程内的线程间通信 跨进程、跨主机、跨网络的分布式通信
通信协议 编程语言API调用(如Java的put(), take() 网络协议(AMQP, MQTT, Kafka协议等)
耦合性 生产者线程和消费者线程紧密耦合(需同时在线) 生产者和消费者完全解耦(各自独立运行)
可靠性 较低:无持久化/消息确认机制 :持久化、ACK机制、重试、死信队列
扩展性 受限于单机资源(内存/CPU) 高可扩展:支持集群化与水平扩展
技术复杂度 实现简单,语言标准库提供 架构复杂,需独立部署运维中间件集群
典型应用场景 线程池任务缓冲、高并发请求处理、流水线处理 微服务解耦、事件驱动架构、日志收集、流量削峰
典型实现 Java BlockingQueue实现类(如ArrayBlockingQueue RabbitMQ, Kafka, RocketMQ, Amazon SQS

消息队列:简单来说消息队列就是进阶的阻塞队列,不仅可以在进程内使用,还可以在两个进程之间使用,它最具有标志的特点就是可以降低模块的耦合性,一个模块崩溃了,其他模块基本不会收到波及。消息队列也是基于生产者消费者模型,不过消息队列是支持多生产者与消费者的,一份消息可能被多个消费者接收并处理。

可惜的是实现的代码里并没有体现出消息队列这些特点 😭 ,等博主学成归来再好好给大家唠唠 👊 👊

消息队列MQ快速入门-CSDN博客

实现代码:

在lua脚本中增加这一条,其他代码基本上只有任务实现上有改变。

1
2
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private class VoucherOrderHandler implements Runnable {
private String queueName = "stream:order";
@Override
public void run() {
// 不断执行任务
while(true){
// 1.获取消息队列的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order ?
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2.判断消息获取是否成功
if(list == null || list.isEmpty()){
// 2.1.获取失败,则说明没有消息,继续下一次循环
continue;
}
// 3.解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
// 4.下单
handleVoucherOrder(voucherOrder);
// 5.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
}
}

private void handlePendingList() {
while (true) {
try {
// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
// 2.判断订单信息是否为空
if (list == null || list.isEmpty()) {
// 如果为null,说明没有异常消息,结束循环
break;
}
// 解析数据
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3.创建订单
createSeckillVoucher(voucherOrder);
// 4.确认消息 XACK
stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList();
}
}
}
}

感谢观看 ❤️ ❤️