RabbitMQ 延时任务
SpringBoot 框架下,RabbitMQ 通过死信队列实现延时任务。
实战参考
https://gitee.com/renlm/MyGraph/tree/main/src/main/java/cn/renlm/graph/amqp
定义一个绑定死信路由和交换机的延时队列,延时队列接收任务消息,超时后转发消息给死信队列处理。
延时队列
https://gitee.com/renlm/MyGraph/blob/main/src/main/java/cn/renlm/graph/amqp/TtlQueue.java
package cn.renlm.graph.amqp;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 延时队列
*
* @author Renlm
*
*/
@Configuration
public class TtlQueue {
private static final String KEY = "Ttl";
public static final String EXCHANGE = KEY + AmqpUtil.Exchange;
public static final String QUEUE = KEY + AmqpUtil.Queue;
public static final String ROUTINGKEY = QUEUE + AmqpUtil.RoutingKey;
/**
* 声明交换机
*
* @return
*/
@Bean(name = EXCHANGE)
public DirectExchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
}
/**
* 声明队列
*
* @return
*/
@Bean(name = QUEUE)
public org.springframework.amqp.core.Queue queue() {
return QueueBuilder.durable(QUEUE)
// 死信交换机
.deadLetterExchange(DeadLetterQueue.EXCHANGE)
// 死信路由
.deadLetterRoutingKey(DeadLetterQueue.ROUTINGKEY)
// 消息过期时间(如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用)
.ttl(AmqpUtil.maxDelayTtl)
// 构建队列
.build();
}
/**
* 绑定队列到交换机
*
* @param exchange
* @param queue
* @return
*/
@Bean(name = ROUTINGKEY)
public Binding binding(@Qualifier(EXCHANGE) DirectExchange exchange,
@Qualifier(QUEUE) org.springframework.amqp.core.Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
}
}
死信队列
https://gitee.com/renlm/MyGraph/blob/main/src/main/java/cn/renlm/graph/amqp/DeadLetterQueue.java
package cn.renlm.graph.amqp;
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import cn.renlm.graph.amqp.AmqpUtil.DelayTask;
import cn.renlm.graph.amqp.AmqpUtil.DelayTaskParam;
import lombok.extern.slf4j.Slf4j;
/**
* 死信队列
*
* @author Renlm
*
*/
@Slf4j
@Configuration
public class DeadLetterQueue {
private static final String KEY = "DeadLetter";
public static final String EXCHANGE = KEY + AmqpUtil.Exchange;
public static final String QUEUE = KEY + AmqpUtil.Queue;
public static final String ROUTINGKEY = QUEUE + AmqpUtil.RoutingKey;
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 监听队列
*
* @param taskParam
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = QUEUE, durable = Exchange.TRUE), exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.DIRECT), key = ROUTINGKEY) })
public void receiveMessage(DelayTaskParam<Object> taskParam) {
String receiveTime = DateUtil.formatDateTime(new Date());
log.debug("=== 死信队列,接收时间:{}\r\n=== 消息内容:{}", receiveTime, JSONUtil.toJsonStr(taskParam));
String time = DateUtil.formatDateTime(taskParam.getTime());
// 本地任务(反射执行方法)
if (NumberUtil.equals(0, taskParam.getType())) {
if (StrUtil.isNotBlank(taskParam.getDelayTaskClass())) {
Class<DelayTask> delayTaskClass = ClassUtil.loadClass(taskParam.getDelayTaskClass());
if (delayTaskClass == null) {
log.error("=== 死信队列,接收时间:{}\r\n=== 无效任务:{}", receiveTime, JSONUtil.toJsonPrettyStr(taskParam));
} else {
log.debug("=== 延时任务,接收时间:{}\r\n=== 任务类型:{}\r\n=== 创建时间:{}\r\n=== 任务执行类:{}\r\n=== 任务数据:{}",
// 接收时间
receiveTime,
// 任务类型
"本地任务(反射执行方法)",
// 创建时间
time,
// 任务执行类
taskParam.getDelayTaskClass(),
// 任务数据
JSONUtil.toJsonPrettyStr(taskParam.getData()));
// 触发任务执行
DelayTask delayTask = ReflectUtil.newInstance(delayTaskClass);
ReflectUtil.invoke(delayTask, DelayTask.method, taskParam.getData());
}
} else {
log.error("=== 死信队列,接收时间:{}\r\n=== 无效任务:{}", receiveTime, JSONUtil.toJsonPrettyStr(taskParam));
}
}
// 队列任务
else if (NumberUtil.equals(1, taskParam.getType())) {
if (StrUtil.isNotBlank(taskParam.getExchange())) {
log.debug(
"=== 延时任务,接收时间:{}\r\n=== 任务类型:{}\r\n=== 创建时间:{}\r\n=== 交换机名称:{}\r\n=== 路由名称:{}\r\n=== 任务数据:{}",
// 接收时间
receiveTime,
// 任务类型
"队列任务",
// 创建时间
time,
// 交换机名称
taskParam.getExchange(),
// 路由名称
taskParam.getRoutingKey(),
// 任务数据
JSONUtil.toJsonPrettyStr(taskParam.getData()));
// 触发任务执行队列
amqpTemplate.convertAndSend(taskParam.getExchange(), taskParam.getRoutingKey(), taskParam.getData());
} else {
log.error("=== 死信队列,接收时间:{}\r\n=== 无效任务:{}", receiveTime, JSONUtil.toJsonPrettyStr(taskParam));
}
}
}
/**
* 声明交换机
*
* @return
*/
@Bean(name = EXCHANGE)
public DirectExchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
}
/**
* 声明队列
*
* @return
*/
@Bean(name = QUEUE)
public org.springframework.amqp.core.Queue queue() {
return QueueBuilder.durable(QUEUE).build();
}
/**
* 绑定队列到交换机
*
* @param exchange
* @param queue
* @return
*/
@Bean(name = ROUTINGKEY)
public Binding binding(@Qualifier(EXCHANGE) DirectExchange exchange,
@Qualifier(QUEUE) org.springframework.amqp.core.Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
}
}
工具类
https://gitee.com/renlm/MyGraph/blob/main/src/main/java/cn/renlm/graph/amqp/AmqpUtil.java
package cn.renlm.graph.amqp;
import java.io.Serializable;
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.extra.spring.SpringUtil;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.experimental.UtilityClass;
/**
* 消息队列
*
* @author Renlm
*
*/
@UtilityClass
public class AmqpUtil {
/**
* 延时任务最大时长(7天)
*/
public static final int maxDelayTtl = 1000 * 60 * 60 * 24 * 7;
/**
* 交换机名称后缀
*/
public static final String Exchange = "Exchange";
/**
* 队列名称后缀
*/
public static final String Queue = "Queue";
/**
* 路由名称后缀
*/
public static final String RoutingKey = "RoutingKey";
/**
* 添加队列
*
* @param <T>
* @param exchange
* @param routingKey
* @param data
*/
public static final <T> void createQueue(String exchange, String routingKey, T data) {
Assert.notBlank(exchange, "exchange不能为空");
Assert.notBlank(routingKey, "routingKey不能为空");
Assert.notNull(data, "data不能为空");
AmqpTemplate amqpTemplate = SpringUtil.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend(exchange, routingKey, data);
}
/**
* 创建延时任务
*
* @param taskClass 任务执行类
* @param data 任务数据
* @param delayTtl 延时ttl时长(毫秒数)
*/
public static final <T> void createDelayTask(Class<? extends DelayTask> taskClass, T data, int delayTtl) {
Assert.notNull(taskClass, "延时任务taskClass不能为空");
Date time = new Date();
long day = DateUtil.between(time, DateUtil.offsetMillisecond(time, AmqpUtil.maxDelayTtl), DateUnit.DAY);
Assert.isFalse(delayTtl > AmqpUtil.maxDelayTtl, "延时任务最大时长(" + day + "天)");
DelayTaskParam<T> taskParam = new DelayTaskParam<T>();
taskParam.setType(0);
taskParam.setTime(time);
taskParam.setDelayTaskClass(taskClass.getName());
taskParam.setData(data);
AmqpTemplate amqpTemplate = SpringUtil.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend(TtlQueue.EXCHANGE, TtlQueue.ROUTINGKEY, taskParam, message -> {
message.getMessageProperties().setExpiration(String.valueOf(delayTtl));
return message;
});
}
/**
* 创建延时任务
*
* @param exchange 任务交换机名称
* @param routingKey 任务路由名称
* @param data 任务数据
* @param delayTtl 延时ttl时长(毫秒数)
*/
public static final <T> void createDelayTask(String exchange, String routingKey, T data, int delayTtl) {
Assert.notBlank(exchange, "延时任务exchange不能为空");
Assert.notBlank(routingKey, "延时任务routingKey不能为空");
Date time = new Date();
long day = DateUtil.between(time, DateUtil.offsetMillisecond(time, AmqpUtil.maxDelayTtl), DateUnit.DAY);
Assert.isFalse(delayTtl > AmqpUtil.maxDelayTtl, "延时任务最大时长(" + day + "天)");
DelayTaskParam<T> taskParam = new DelayTaskParam<T>();
taskParam.setType(1);
taskParam.setTime(time);
taskParam.setExchange(exchange);
taskParam.setRoutingKey(routingKey);
taskParam.setData(data);
AmqpTemplate amqpTemplate = SpringUtil.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend(TtlQueue.EXCHANGE, TtlQueue.ROUTINGKEY, taskParam, message -> {
message.getMessageProperties().setExpiration(String.valueOf(delayTtl));
return message;
});
}
/**
* 延时任务(执行类接口)
*/
public static interface DelayTask {
/**
* 方法名
*/
public static final String method = "execute";
/**
* 执行任务
*
* @param data 任务数据
*/
void execute(Object data);
}
/**
* 延时任务参数
*/
@Data
@Accessors(chain = true)
public static final class DelayTaskParam<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 类型,0:本地任务(反射执行方法),1:队列任务
*/
private int type;
/**
* 添加时间
*/
private Date time;
/**
* 本地任务-任务执行类
*/
private String delayTaskClass;
/**
* 队列任务-交换机名称
*/
private String exchange;
/**
* 队列任务-路由名称
*/
private String routingKey;
/**
* 任务数据
*/
private T data;
}
/**
* 拼接交换机名称
*
* @param key
* @return
*/
public static final String exchangeName(String key) {
Assert.notBlank(key, "key不能为空");
return key + Exchange;
}
/**
* 拼接队列名称
*
* @param key
* @return
*/
public static final String queueName(String key) {
Assert.notBlank(key, "key不能为空");
return key + Queue;
}
/**
* 拼接路由名称
*
* @param key
* @return
*/
public static final String routingKeyName(String key) {
Assert.notBlank(key, "key不能为空");
return queueName(key) + RoutingKey;
}
}
使用方式
队列任务
/**
* 创建延时任务
*
* @param exchange 任务交换机名称
* @param routingKey 任务路由名称
* @param data 任务数据
* @param delayTtl 延时ttl时长(毫秒数)
*/
AmqpUtil.createDelayTask(String exchange, String routingKey, T data, int delayTtl);
本地任务
/**
* 创建延时任务
*
* @param taskClass 任务执行类
* @param data 任务数据
* @param delayTtl 延时ttl时长(毫秒数)
*/
AmqpUtil.createDelayTask(Class<? extends DelayTask> taskClass, T data, int delayTtl);