目录

Redisson

<!-- Redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.23.5</version>
</dependency>
1
2
3
4
5
6
@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://" + host + ":" + port)
                .setPassword(password);
        return Redisson.create(config);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 实现延迟队列

/**
 * 延迟队列执行器接口
 * 所有延迟队列的具体业务处理类都需要实现此接口
 */
public interface RedisDelayQueueHandle {

    /**
     * 执行延迟队列任务
     */
    void execute(Map<String, Object> params);
}
1
2
3
4
5
6
7
8
9
10
11
/**
 * 订单支付超时处理类
 */
@Slf4j
@Component
public class OrderPaymentTimeout implements RedisDelayQueueHandle {
    @Override
    public void execute(Map<String, Object> params) {
        log.info("开始处理订单支付超时任务 - 任务参数: {}", params);
        // TODO 订单支付超时,自动取消订单处理业务...

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * 订单超时未评价处理类
 */
@Slf4j
@Component
public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle {
    @Override
    public void execute(Map<String, Object> params) {
        log.info("开始处理订单超时未评价任务 - 任务参数: {}", params);
        // TODO 订单超时未评价,系统默认好评处理业务...

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * 延迟队列业务枚举
 */
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {

    ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT", "订单支付超时,自动取消订单", "orderPaymentTimeout"),
    ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");

    /**
     * 延迟队列 Redis Key
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延迟队列具体业务实现的 Bean
     */
    private String beanId;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
 * 启动延迟队列
 */
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {

    @Autowired
    private RedisDelayQueueUtil redisDelayQueueUtil;

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public void run(String... args) {
        RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
        // 为每个队列枚举创建独立的处理线程
        for (RedisDelayQueueEnum queueEnum : queueEnums) {
            new Thread(() -> {
                while (true) {
                    try {
                        Map<String, Object> params = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
                        RedisDelayQueueHandle redisDelayQueueHandle = (RedisDelayQueueHandle) applicationContext.getBean(queueEnum.getBeanId());
                        log.info("开始执行队列[{}]的延迟任务处理", queueEnum.getCode());
                        redisDelayQueueHandle.execute(params);
                    } catch (Exception e) {
                        log.error("Redis延迟队列处理异常 - 队列: {}, 错误信息: {}", queueEnum.getCode(), e.getMessage());
                        // 异常后等待一段时间再继续,避免无限循环
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException ignored) {
                        }
                    }
                }
            }, "DelayQueue-" + queueEnum.getCode()).start();
        }

        log.info("Redis延迟队列初始化完成 - 成功启动{}个队列处理线程", queueEnums.length);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * redis延迟队列工具
 */
@Slf4j
@Component
public class RedisDelayQueueUtil {
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加延迟队列
     */
    public void addDelayQueue(Map<String, Object> params, long delay, TimeUnit timeUnit, String queueCode) {
        RBlockingDeque<Map<String, Object>> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        RDelayedQueue<Map<String, Object>> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(params, delay, timeUnit);
        log.info("延迟队列任务添加成功 - 队列代码: {}, 延迟时间: {}秒, 任务参数: {}",
                queueCode, timeUnit.toSeconds(delay), params);
    }

    /**
     * 获取延迟队列
     */
    public Map<String, Object> getDelayQueue(String queueCode) throws InterruptedException {
        RBlockingDeque<Map<String, Object>> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        // 保证重启之后继续消费到期的队列
        redissonClient.getDelayedQueue(blockingDeque);
        return blockingDeque.take();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Anonymous
@RestController
@RequestMapping("/test")
public class TestController extends BaseController {
    @Autowired
    private RedisDelayQueueUtil redisDelayQueueUtil;

    @RequestMapping("/addQueue")
    public AjaxResult addQueue() {
        Map<String, Object> map1 = new HashMap<>();
        map1.put("orderId", "100");
        map1.put("remark", "其他信息");

        Map<String, Object> map2 = new HashMap<>();
        map2.put("orderId", "200");
        map2.put("remark", "其他信息");

        // 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟
        redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());

        // 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟
        redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode());
        return AjaxResult.success();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 实现分布式锁

集成redisson实现redis分布式锁 (opens new window)

上次更新: 2025-08-04 17:59:42