前言
之前一直打算通过消息队列优化自己项目的流程,这段时间终于开始研究消息队列了;最开始尝试了RabbitMQ,但是随后感觉这东西用在我的小项目中着实有点太重了,于是就开始研究用Redis实现消息队列。
关于消息队列的用处这里就不多介绍了,想了解的可以自行Google,这里就主要是讲如何在SpringBoot中实现。
因为我们需要使用Redis 5中新加入的Stream类型,所以建议使用的Redis的版本在Redis 6及以上。
开始
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
配置Redis连接
在application.yml中添加redis配置
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.database=0
spring.redis.connect-timeout=2s
spring.redis.timeout=3s
创建工具类
@Component
public class RedisUtil {
private static final String prefix = "Example:"; // Redis中消息的前缀
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public Boolean hasKey(String key) {
try {
return redisTemplate.hasKey(prefix + key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 添加一条Stream数据
* @param key 键
* @param message 内容
* @return RecordId
*/
public RecordId addStream(String key, Object message) {
ObjectRecord<String, Object> record = StreamRecords.objectBacked(message)
.withStreamKey(prefix + key)
.withId(RecordId.autoGenerate());
return redisTemplate.opsForStream().add(record);
}
/**
* 当Group不存在时添加消息组
* @param key 键
* @param groupName 消息组名
*/
public void addGroup(String key, String groupName) {
StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(prefix + key);
if (groups.stream().noneMatch(xInfoGroup -> groupName.equals(xInfoGroup.groupName()))) {
redisTemplate.opsForStream().createGroup(prefix + key, groupName);
}
}
/**
* 删除Stream中的一条数据
* @param key 键
* @param fieldId ID
*/
public void delStream(String key, String fieldId) {
redisTemplate.opsForStream().delete(prefix + key, fieldId);
}
/**
* 获得带前缀的键名
* @param key 不带前缀的键名
*/
@Contract(pure = true)
public static @NotNull String getFullKey(String key) {
return prefix + key;
}
}
创建监听类
当有消息队列中有数据待处理的时候就会调用onMessage()
@Slf4j
@Component
public class QueueListener implements StreamListener<String, ObjectRecord<String, Object>> {
@Override
public void onMessage(@NotNull ObjectRecord<String, Object> data) {
int id = Integer.parseInt((String) data.getValue());
log.info("Redis MQ, id:{}", id);
}
}
创建线程池
线程池的详细配置方法可以看这篇文章 线程池中各个参数如何合理设置
因为我消息队列要处理的消息量很少,所以这里CorePoolSize
设置得很小。
@Configuration
public class RedisTaskConfig {
@Bean("redisTaskExecutor")
public ThreadPoolTaskExecutor redisTaskExecutor() {
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(processors);
executor.setThreadNamePrefix("MQ-Thread-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
需要注意的是,这里如果不配置setWaitForTasksToCompleteOnShutdown(true)
,那么可能会在程序关闭的时候报“io.lettuce.core.RedisCommandInterruptedException: Command interrupted”错误,这是因为监听的进程还在运行,没有正确关闭;所以这里需要设置等待线程关闭。
配置错误处理
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
log.error("发生了异常", t);
}
}
配置消息队列
@Slf4j
@Configuration
public class RedisStreamConfig implements DisposableBean {
private RedisUtil redisUtil;
private QueueListener queueListener;
@Resource(name = "redisTaskExecutor")
private ThreadPoolTaskExecutor executor;
@Resource
private RedisConnectionFactory redisConnectionFactory;
@Autowired
public void setRedisUtil(RedisUtil redisUtil) {
this.redisUtil = redisUtil;
}
@Autowired
public void setQueueListener(QueueListener queueListener) {
this.queueListener = queueListener;
}
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> streamMessageListenerContainer() {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(10)
// 运行 Stream 的 poll task
.executor(executor)
.serializer(new StringRedisSerializer())
// Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
.pollTimeout(Duration.ofSeconds(1))
// ObjectRecord 时,将对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map
.objectMapper(new ObjectHashMapper())
// 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
.errorHandler(new CustomErrorHandler())
.build();
// 初始化消息组
initStream("TestMessage", "inner-group");
StreamMessageListenerContainer<String, ObjectRecord<String, Object>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer
.StreamReadRequest
.builder(StreamOffset.create(RedisUtil.getFullKey("TestMessage"), ReadOffset.lastConsumed()))
.consumer(Consumer.from("inner-group", "inner-consumer"))
.autoAcknowledge(true) // 自动ACK
.cancelOnError(throwable -> false) // 错误时不关闭消费者
.build();
listenerContainer.register(streamReadRequest, queueListener); // 注册监听
return listenerContainer;
}
private void initStream(String key, String group){
boolean hasKey = redisUtil.hasKey(key);
if (!hasKey) {
RecordId recordId = redisUtil.addStream(key, 0);
redisUtil.addGroup(key, group);
// 将初始化的值删掉
redisUtil.delStream(key, recordId.getValue());
log.info("Redis Stream: {}-{} initialize success", key, group);
}
}
@Override
public void destroy() {
executor.shutdown(); // 当程序关闭的时候关闭任务线程
}
}
(1)首先来看看@Bean(initMethod = "start", destroyMethod = "stop")
initMethod
和destroyMethod
将分别在创建资源和销毁资源的时候执行,它们对应了 StreamMessageListenerContainer.start() 和 StreamMessageListenerContainer.stop() 这两个方法,如果我们改成@Bean
,那么我们就需要在return listenerContainer
之前执行start(),并在destroy()
方法内执行stop()。
(2)然后来看看initStream("TestMessage", "inner-group")
和Consumer.from("inner-group", "inner-consumer")
这两行。
TestMessage
是消息队列的名称inner-group
是消费组的名称inner-consumer
是消费者的名称
需要注意的是,Redis中接受消息时按照消费组来做区分,不同的消费组之间没有竞争关系,各自独立,可以消费同一个消息;同一个消费组中的消费者是竞争关系,当一个消费者分配到一条消息后,同一个消费组的其他消费者就不能获得这条消息了。
消费组是需要事先创建的,不然会报错,所以我这里直接通过代码创建了消费组,避免了需要手动在CLI上输入命令创建的问题。
(3)最后看看cancelOnError(throwable -> false)
默认情况下,当任务中有错误发生的时候Redis会自动关闭这个消费者,这可能会导致大量的消息堆积,所以通过这行就能让任务中出现错误时也不关闭这个消费者。(你也可以把消费者中的代码用try catch包起来)
添加消息
在适当位置调用方法就能测试消费者是否正常了
public void sendSyncMessage(int id) {
RecordId recordId = redisUtil.addStream("TestMessage", id);
log.info("返回的record-id:[{}]", recordId);
}
适配 Spring 3.2 及以上版本
spring-boot-starter-data-redis 在3.2更新之后使用了 SmartLifecycle 来管理 redisConnectionFactory 的生命周期,这就引出了一个问题—— RedisStreamConfig 的 destroy()
在 redisConnectionFactory 关闭连接后才会调用,导致了 RedisChannelHandler: Connection is already closed 错误。
在 Redis Connection Factory Earlier Graceful Shutdown after Spring 3.2 Upgrade – LettuceConnectionFactory SmartLifecycle Backward Compatibility 中,得知解决办法是同样使用SmartLifecycle来管理StreamMessageListenerContainer的生命周期即可。
@Slf4j
@Configuration
public class RedisStreamConfig implements SmartLifecycle {
private boolean isRunning = false;
private StreamMessageListenerContainer<String, ObjectRecord<String, Object>> listenerContainer;
@Resource(name = "redisTaskExecutor")
private ThreadPoolTaskExecutor executor;
@Resource
private RedisConnectionFactory redisConnectionFactory;
// ……
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> streamMessageListenerContainer() {
// ……
return listenerContainer;
}
@Override
public boolean isRunning() {
// 只有当isRunning()返回false时,start()才会调用
// 返回true时,stop()才会调用
return isRunning;
}
@Override
public void start() {
initStream("TestMessage", "inner-group");
listenerContainer.start();
isRunning = true;
}
@Override
public void stop() {
listenerContainer.stop();
executor.shutdown();
isRunning = false;
}
@Override
public int getPhase() {
// getPhase()大的启动时后调用start(),关闭时先调用stop()
// 要比RedisConnectionFactory的phrase大
// 保证在redis连接关闭之前关闭stream监听
return Integer.MAX_VALUE;
}
}
后续优化
处理消息组消失的问题
在后续的使用中,我观察到我创建的消息组(即上文中提到的TestMessage
)会莫名其妙地消失,从而导致消息队列无法正常读取。再因为我关闭了cancelOnError
,导致整个服务都被消息队列不断的报错信息给阻塞了,所以这里的解决思路是当出现相关报错的时候就自动创建消息组。
首先创建个工具类用来获取Spring创建好的组件
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext context;
@Override
public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
if (context == null) {
context = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return context;
}
public static <T> @NotNull T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
}
然后把上文中的initStream
方法移动到RedisService中
@Slf4j
@Service
public class RedisService {
private RedisUtil redisUtil;
/**
* 初始化Stream用户组
* @param key 键名
* @param group 用户组
*/
public void initStream(String key, String group) {
boolean hasKey = redisUtil.hasKey(key);
if (!hasKey) {
RecordId recordId = redisUtil.addStream(key, 0);
redisUtil.addGroup(key, group);
redisUtil.delStream(key, recordId.getValue());
log.info("Redis Stream: {} {} initialize success", key, group);
}
}
}
然后修改ErrorHandler
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
if (t.getMessage().contains("NOGROUP")) {
log.warn("Redis Stream对应用户组失效!");
RedisService redisService = SpringContextUtil.getBean(RedisService.class);
redisService.initStream("TestMessage", "inner-group");
return;
}
log.error("发生了异常", t);
}
}