Redis Stream在SpringBoot中实现消息队列

前言

之前一直打算通过消息队列优化自己项目的流程,这段时间终于开始研究消息队列了;最开始尝试了RabbitMQ,但是随后感觉这东西用在我的小项目中着实有点太重了,于是就开始研究用Redis实现消息队列。

关于消息队列的用处这里就不多介绍了,想了解的可以自行Google,这里就主要是讲如何在SpringBoot中实现。

因为我们需要使用Redis 5中新加入的Stream类型,所以建议使用的Redis的版本在Redis 6及以上。

开始

添加依赖

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
</dependencies>

配置Redis连接

在application.yml中添加redis配置

spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.database=0
spring.redis.connect-timeout=2s
spring.redis.timeout=3s

创建工具类

@Component
public class RedisUtil {
    private static final String prefix = "Example:"; // Redis中消息的前缀

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 判断key是否存在
     *
     * @param key 键
     * @return true 存在 false不存在
     */
    public Boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(prefix + key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 添加一条Stream数据
     * @param key 键
     * @param message 内容
     * @return RecordId
     */
    public RecordId addStream(String key, Object message) {
        ObjectRecord<String, Object> record = StreamRecords.objectBacked(message)
                .withStreamKey(prefix + key)
                .withId(RecordId.autoGenerate());
        return redisTemplate.opsForStream().add(record);
    }

    /**
     * 当Group不存在时添加消息组
     * @param key 键
     * @param groupName 消息组名
     */
    public void addGroup(String key, String groupName) {
        StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(prefix + key);
        if (groups.stream().noneMatch(xInfoGroup -> groupName.equals(xInfoGroup.groupName()))) {
            redisTemplate.opsForStream().createGroup(prefix + key, groupName);
        }
    }

    /**
     * 删除Stream中的一条数据
     * @param key 键
     * @param fieldId ID
     */
    public void delStream(String key, String fieldId) {
        redisTemplate.opsForStream().delete(prefix + key, fieldId);
    }

    /**
     * 获得带前缀的键名
     * @param key 不带前缀的键名
     */
    @Contract(pure = true)
    public static @NotNull String getFullKey(String key) {
        return prefix + key;
    }
}

创建监听类

当有消息队列中有数据待处理的时候就会调用onMessage()

@Slf4j
@Component
public class QueueListener implements StreamListener<String, ObjectRecord<String, Object>> {
    @Override
    public void onMessage(@NotNull ObjectRecord<String, Object> data) {
        int id = Integer.parseInt((String) data.getValue());
        log.info("Redis MQ, id:{}", id);
    }
}

创建线程池

线程池的详细配置方法可以看这篇文章 线程池中各个参数如何合理设置

因为我消息队列要处理的消息量很少,所以这里CorePoolSize设置得很小。

@Configuration
public class RedisTaskConfig {
    @Bean("redisTaskExecutor")
    public ThreadPoolTaskExecutor redisTaskExecutor() {
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(processors);
        executor.setThreadNamePrefix("MQ-Thread-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(10);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

需要注意的是,这里如果不配置setWaitForTasksToCompleteOnShutdown(true),那么可能会在程序关闭的时候报“io.lettuce.core.RedisCommandInterruptedException: Command interrupted”错误,这是因为监听的进程还在运行,没有正确关闭;所以这里需要设置等待线程关闭。

配置错误处理

@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        log.error("发生了异常", t);
    }
}

配置消息队列

@Slf4j
@Configuration
public class RedisStreamConfig implements DisposableBean {
    private RedisUtil redisUtil;
    private QueueListener queueListener;

    @Resource(name = "redisTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    public void setRedisUtil(RedisUtil redisUtil) {
        this.redisUtil = redisUtil;
    }

    @Autowired
    public void setQueueListener(QueueListener queueListener) {
        this.queueListener = queueListener;
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> streamMessageListenerContainer() {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(10)
                        // 运行 Stream 的 poll task
                        .executor(executor)
                        .serializer(new StringRedisSerializer())
                        // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 时,将对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map
                        .objectMapper(new ObjectHashMapper())
                        // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                        .errorHandler(new CustomErrorHandler())
                        .build();

        // 初始化消息组
        initStream("TestMessage", "inner-group");

        StreamMessageListenerContainer<String, ObjectRecord<String, Object>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        StreamMessageListenerContainer.ConsumerStreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer
                .StreamReadRequest
                .builder(StreamOffset.create(RedisUtil.getFullKey("TestMessage"), ReadOffset.lastConsumed()))
                .consumer(Consumer.from("inner-group", "inner-consumer"))
                .autoAcknowledge(true) // 自动ACK
                .cancelOnError(throwable -> false) // 错误时不关闭消费者
                .build();
        listenerContainer.register(streamReadRequest, queueListener); // 注册监听
        return listenerContainer;
    }

    private void initStream(String key, String group){
        boolean hasKey = redisUtil.hasKey(key);
        if (!hasKey) {
            RecordId recordId = redisUtil.addStream(key, 0);
            redisUtil.addGroup(key, group);
            // 将初始化的值删掉
            redisUtil.delStream(key, recordId.getValue());
            log.info("Redis Stream: {}-{} initialize success", key, group);
        }
    }

    @Override
    public void destroy() {
        executor.shutdown(); // 当程序关闭的时候关闭任务线程
    }
}

(1)首先来看看@Bean(initMethod = "start", destroyMethod = "stop")

initMethoddestroyMethod将分别在创建资源和销毁资源的时候执行,它们对应了 StreamMessageListenerContainer.start()StreamMessageListenerContainer.stop() 这两个方法,如果我们改成@Bean,那么我们就需要在return listenerContainer之前执行start(),并在destroy()方法内执行stop()。

(2)然后来看看initStream("TestMessage", "inner-group")Consumer.from("inner-group", "inner-consumer")这两行。

  • TestMessage是消息队列的名称
  • inner-group是消费组的名称
  • inner-consumer是消费者的名称

需要注意的是,Redis中接受消息时按照消费组来做区分,不同的消费组之间没有竞争关系,各自独立,可以消费同一个消息;同一个消费组中的消费者是竞争关系,当一个消费者分配到一条消息后,同一个消费组的其他消费者就不能获得这条消息了。

消费组是需要事先创建的,不然会报错,所以我这里直接通过代码创建了消费组,避免了需要手动在CLI上输入命令创建的问题。

(3)最后看看cancelOnError(throwable -> false)

默认情况下,当任务中有错误发生的时候Redis会自动关闭这个消费者,这可能会导致大量的消息堆积,所以通过这行就能让任务中出现错误时也不关闭这个消费者。(你也可以把消费者中的代码用try catch包起来)

添加消息

在适当位置调用方法就能测试消费者是否正常了

public void sendSyncMessage(int id) {
        RecordId recordId = redisUtil.addStream("TestMessage", id);
        log.info("返回的record-id:[{}]", recordId);
}

适配 Spring 3.2 及以上版本

spring-boot-starter-data-redis 在3.2更新之后使用了 SmartLifecycle 来管理 redisConnectionFactory 的生命周期,这就引出了一个问题—— RedisStreamConfig 的 destroy() 在 redisConnectionFactory 关闭连接后才会调用,导致了 RedisChannelHandler: Connection is already closed 错误。
Redis Connection Factory Earlier Graceful Shutdown after Spring 3.2 Upgrade – LettuceConnectionFactory SmartLifecycle Backward Compatibility 中,得知解决办法是同样使用SmartLifecycle来管理StreamMessageListenerContainer的生命周期即可。

@Slf4j
@Configuration
public class RedisStreamConfig implements SmartLifecycle {
    private boolean isRunning = false;
    private StreamMessageListenerContainer<String, ObjectRecord<String, Object>> listenerContainer;

    @Resource(name = "redisTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    // ……

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> streamMessageListenerContainer() {
        // ……
        return listenerContainer;
    }

    @Override
    public boolean isRunning() {
        // 只有当isRunning()返回false时,start()才会调用
        // 返回true时,stop()才会调用
        return isRunning;
    }

    @Override
    public void start() {
        initStream("TestMessage", "inner-group");
        listenerContainer.start();
        isRunning = true;
    }

    @Override
    public void stop() {
        listenerContainer.stop();
        executor.shutdown();
        isRunning = false;
    }

    @Override
    public int getPhase() {
        // getPhase()大的启动时后调用start(),关闭时先调用stop()
        // 要比RedisConnectionFactory的phrase大
        // 保证在redis连接关闭之前关闭stream监听
        return Integer.MAX_VALUE;
    }
}

后续优化

处理消息组消失的问题

在后续的使用中,我观察到我创建的消息组(即上文中提到的TestMessage)会莫名其妙地消失,从而导致消息队列无法正常读取。再因为我关闭了cancelOnError,导致整个服务都被消息队列不断的报错信息给阻塞了,所以这里的解决思路是当出现相关报错的时候就自动创建消息组。

首先创建个工具类用来获取Spring创建好的组件

@Component
public class SpringContextUtil implements ApplicationContextAware {
    private static ApplicationContext context;

    @Override
    public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
        if (context == null) {
            context = applicationContext;
        }
    }

    public static ApplicationContext getApplicationContext() {
        return context;
    }

    public static <T> @NotNull T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }
}

然后把上文中的initStream方法移动到RedisService中

@Slf4j
@Service
public class RedisService {
    private RedisUtil redisUtil;

    /**
     * 初始化Stream用户组
     * @param key 键名
     * @param group 用户组
     */
    public void initStream(String key, String group) {
        boolean hasKey = redisUtil.hasKey(key);
        if (!hasKey) {
            RecordId recordId = redisUtil.addStream(key, 0);
            redisUtil.addGroup(key, group);
            redisUtil.delStream(key, recordId.getValue());
            log.info("Redis Stream: {} {} initialize success", key, group);
        }
    }
}

然后修改ErrorHandler

@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (t.getMessage().contains("NOGROUP")) {
            log.warn("Redis Stream对应用户组失效!");
            RedisService redisService = SpringContextUtil.getBean(RedisService.class);
            redisService.initStream("TestMessage", "inner-group");
            return;
        }
        log.error("发生了异常", t);
    }
}

参考文章

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇