Redisson
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.5</version>
</dependency>
1
2
3
4
5
6
2
3
4
5
6
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setPassword(password);
return Redisson.create(config);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 实现延迟队列
/**
* 延迟队列执行器接口
* 所有延迟队列的具体业务处理类都需要实现此接口
*/
public interface RedisDelayQueueHandle {
/**
* 执行延迟队列任务
*/
void execute(Map<String, Object> params);
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
/**
* 订单支付超时处理类
*/
@Slf4j
@Component
public class OrderPaymentTimeout implements RedisDelayQueueHandle {
@Override
public void execute(Map<String, Object> params) {
log.info("开始处理订单支付超时任务 - 任务参数: {}", params);
// TODO 订单支付超时,自动取消订单处理业务...
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 订单超时未评价处理类
*/
@Slf4j
@Component
public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle {
@Override
public void execute(Map<String, Object> params) {
log.info("开始处理订单超时未评价任务 - 任务参数: {}", params);
// TODO 订单超时未评价,系统默认好评处理业务...
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 延迟队列业务枚举
*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {
ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT", "订单支付超时,自动取消订单", "orderPaymentTimeout"),
ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");
/**
* 延迟队列 Redis Key
*/
private String code;
/**
* 中文描述
*/
private String name;
/**
* 延迟队列具体业务实现的 Bean
*/
private String beanId;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 启动延迟队列
*/
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@Autowired
private ApplicationContext applicationContext;
@Override
public void run(String... args) {
RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
// 为每个队列枚举创建独立的处理线程
for (RedisDelayQueueEnum queueEnum : queueEnums) {
new Thread(() -> {
while (true) {
try {
Map<String, Object> params = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
RedisDelayQueueHandle redisDelayQueueHandle = (RedisDelayQueueHandle) applicationContext.getBean(queueEnum.getBeanId());
log.info("开始执行队列[{}]的延迟任务处理", queueEnum.getCode());
redisDelayQueueHandle.execute(params);
} catch (Exception e) {
log.error("Redis延迟队列处理异常 - 队列: {}, 错误信息: {}", queueEnum.getCode(), e.getMessage());
// 异常后等待一段时间再继续,避免无限循环
try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}
}
}, "DelayQueue-" + queueEnum.getCode()).start();
}
log.info("Redis延迟队列初始化完成 - 成功启动{}个队列处理线程", queueEnums.length);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* redis延迟队列工具
*/
@Slf4j
@Component
public class RedisDelayQueueUtil {
@Autowired
private RedissonClient redissonClient;
/**
* 添加延迟队列
*/
public void addDelayQueue(Map<String, Object> params, long delay, TimeUnit timeUnit, String queueCode) {
RBlockingDeque<Map<String, Object>> blockingDeque = redissonClient.getBlockingDeque(queueCode);
RDelayedQueue<Map<String, Object>> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(params, delay, timeUnit);
log.info("延迟队列任务添加成功 - 队列代码: {}, 延迟时间: {}秒, 任务参数: {}",
queueCode, timeUnit.toSeconds(delay), params);
}
/**
* 获取延迟队列
*/
public Map<String, Object> getDelayQueue(String queueCode) throws InterruptedException {
RBlockingDeque<Map<String, Object>> blockingDeque = redissonClient.getBlockingDeque(queueCode);
// 保证重启之后继续消费到期的队列
redissonClient.getDelayedQueue(blockingDeque);
return blockingDeque.take();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Anonymous
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {
@Autowired
private RedisDelayQueueUtil redisDelayQueueUtil;
@RequestMapping("/addQueue")
public AjaxResult addQueue() {
Map<String, Object> map1 = new HashMap<>();
map1.put("orderId", "100");
map1.put("remark", "其他信息");
Map<String, Object> map2 = new HashMap<>();
map2.put("orderId", "200");
map2.put("remark", "其他信息");
// 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟
redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());
// 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟
redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode());
return AjaxResult.success();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 实现分布式锁
编辑 (opens new window)
上次更新: 2025-08-04 17:59:42