RabbitMQ 延时任务

SpringBoot 框架下,RabbitMQ 通过死信队列实现延时任务。

实战参考

https://gitee.com/renlm/MyGraph/tree/main/src/main/java/cn/renlm/graph/amqp
定义一个绑定死信路由和交换机的延时队列,延时队列接收任务消息,超时后转发消息给死信队列处理。

延时队列

https://gitee.com/renlm/MyGraph/blob/main/src/main/java/cn/renlm/graph/amqp/TtlQueue.java

package cn.renlm.graph.amqp;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 延时队列
 * 
 * @author Renlm
 *
 */
@Configuration
public class TtlQueue {

	private static final String KEY = "Ttl";

	public static final String EXCHANGE = KEY + AmqpUtil.Exchange;

	public static final String QUEUE = KEY + AmqpUtil.Queue;

	public static final String ROUTINGKEY = QUEUE + AmqpUtil.RoutingKey;

	/**
	 * 声明交换机
	 * 
	 * @return
	 */
	@Bean(name = EXCHANGE)
	public DirectExchange exchange() {
		return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
	}

	/**
	 * 声明队列
	 * 
	 * @return
	 */
	@Bean(name = QUEUE)
	public org.springframework.amqp.core.Queue queue() {
		return QueueBuilder.durable(QUEUE)
				// 死信交换机
				.deadLetterExchange(DeadLetterQueue.EXCHANGE)
				// 死信路由
				.deadLetterRoutingKey(DeadLetterQueue.ROUTINGKEY)
				// 消息过期时间(如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用)
				.ttl(AmqpUtil.maxDelayTtl)
				// 构建队列
				.build();
	}

	/**
	 * 绑定队列到交换机
	 * 
	 * @param exchange
	 * @param queue
	 * @return
	 */
	@Bean(name = ROUTINGKEY)
	public Binding binding(@Qualifier(EXCHANGE) DirectExchange exchange,
			@Qualifier(QUEUE) org.springframework.amqp.core.Queue queue) {
		return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
	}
}

死信队列

https://gitee.com/renlm/MyGraph/blob/main/src/main/java/cn/renlm/graph/amqp/DeadLetterQueue.java

package cn.renlm.graph.amqp;

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import cn.renlm.graph.amqp.AmqpUtil.DelayTask;
import cn.renlm.graph.amqp.AmqpUtil.DelayTaskParam;
import lombok.extern.slf4j.Slf4j;

/**
 * 死信队列
 * 
 * @author Renlm
 *
 */
@Slf4j
@Configuration
public class DeadLetterQueue {

	private static final String KEY = "DeadLetter";

	public static final String EXCHANGE = KEY + AmqpUtil.Exchange;

	public static final String QUEUE = KEY + AmqpUtil.Queue;

	public static final String ROUTINGKEY = QUEUE + AmqpUtil.RoutingKey;

	@Autowired
	private AmqpTemplate amqpTemplate;

	/**
	 * 监听队列
	 * 
	 * @param taskParam
	 */
	@RabbitListener(bindings = {
			@QueueBinding(value = @Queue(value = QUEUE, durable = Exchange.TRUE), exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.DIRECT), key = ROUTINGKEY) })
	public void receiveMessage(DelayTaskParam<Object> taskParam) {
		String receiveTime = DateUtil.formatDateTime(new Date());
		log.debug("=== 死信队列,接收时间:{}\r\n=== 消息内容:{}", receiveTime, JSONUtil.toJsonStr(taskParam));
		String time = DateUtil.formatDateTime(taskParam.getTime());
		// 本地任务(反射执行方法)
		if (NumberUtil.equals(0, taskParam.getType())) {
			if (StrUtil.isNotBlank(taskParam.getDelayTaskClass())) {
				Class<DelayTask> delayTaskClass = ClassUtil.loadClass(taskParam.getDelayTaskClass());
				if (delayTaskClass == null) {
					log.error("=== 死信队列,接收时间:{}\r\n=== 无效任务:{}", receiveTime, JSONUtil.toJsonPrettyStr(taskParam));
				} else {
					log.debug("=== 延时任务,接收时间:{}\r\n=== 任务类型:{}\r\n=== 创建时间:{}\r\n=== 任务执行类:{}\r\n=== 任务数据:{}",
							// 接收时间
							receiveTime,
							// 任务类型
							"本地任务(反射执行方法)",
							// 创建时间
							time,
							// 任务执行类
							taskParam.getDelayTaskClass(),
							// 任务数据
							JSONUtil.toJsonPrettyStr(taskParam.getData()));
					// 触发任务执行
					DelayTask delayTask = ReflectUtil.newInstance(delayTaskClass);
					ReflectUtil.invoke(delayTask, DelayTask.method, taskParam.getData());
				}
			} else {
				log.error("=== 死信队列,接收时间:{}\r\n=== 无效任务:{}", receiveTime, JSONUtil.toJsonPrettyStr(taskParam));
			}
		}
		// 队列任务
		else if (NumberUtil.equals(1, taskParam.getType())) {
			if (StrUtil.isNotBlank(taskParam.getExchange())) {
				log.debug(
						"=== 延时任务,接收时间:{}\r\n=== 任务类型:{}\r\n=== 创建时间:{}\r\n=== 交换机名称:{}\r\n=== 路由名称:{}\r\n=== 任务数据:{}",
						// 接收时间
						receiveTime,
						// 任务类型
						"队列任务",
						// 创建时间
						time,
						// 交换机名称
						taskParam.getExchange(),
						// 路由名称
						taskParam.getRoutingKey(),
						// 任务数据
						JSONUtil.toJsonPrettyStr(taskParam.getData()));
				// 触发任务执行队列
				amqpTemplate.convertAndSend(taskParam.getExchange(), taskParam.getRoutingKey(), taskParam.getData());
			} else {
				log.error("=== 死信队列,接收时间:{}\r\n=== 无效任务:{}", receiveTime, JSONUtil.toJsonPrettyStr(taskParam));
			}
		}
	}

	/**
	 * 声明交换机
	 * 
	 * @return
	 */
	@Bean(name = EXCHANGE)
	public DirectExchange exchange() {
		return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
	}

	/**
	 * 声明队列
	 * 
	 * @return
	 */
	@Bean(name = QUEUE)
	public org.springframework.amqp.core.Queue queue() {
		return QueueBuilder.durable(QUEUE).build();
	}

	/**
	 * 绑定队列到交换机
	 * 
	 * @param exchange
	 * @param queue
	 * @return
	 */
	@Bean(name = ROUTINGKEY)
	public Binding binding(@Qualifier(EXCHANGE) DirectExchange exchange,
			@Qualifier(QUEUE) org.springframework.amqp.core.Queue queue) {
		return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
	}
}

工具类

https://gitee.com/renlm/MyGraph/blob/main/src/main/java/cn/renlm/graph/amqp/AmqpUtil.java

package cn.renlm.graph.amqp;

import java.io.Serializable;
import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;

import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.extra.spring.SpringUtil;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.experimental.UtilityClass;

/**
 * 消息队列
 * 
 * @author Renlm
 *
 */
@UtilityClass
public class AmqpUtil {

	/**
	 * 延时任务最大时长(7天)
	 */
	public static final int maxDelayTtl = 1000 * 60 * 60 * 24 * 7;

	/**
	 * 交换机名称后缀
	 */
	public static final String Exchange = "Exchange";

	/**
	 * 队列名称后缀
	 */
	public static final String Queue = "Queue";

	/**
	 * 路由名称后缀
	 */
	public static final String RoutingKey = "RoutingKey";

	/**
	 * 添加队列
	 * 
	 * @param <T>
	 * @param exchange
	 * @param routingKey
	 * @param data
	 */
	public static final <T> void createQueue(String exchange, String routingKey, T data) {
		Assert.notBlank(exchange, "exchange不能为空");
		Assert.notBlank(routingKey, "routingKey不能为空");
		Assert.notNull(data, "data不能为空");
		AmqpTemplate amqpTemplate = SpringUtil.getBean(AmqpTemplate.class);
		amqpTemplate.convertAndSend(exchange, routingKey, data);
	}

	/**
	 * 创建延时任务
	 * 
	 * @param taskClass 任务执行类
	 * @param data      任务数据
	 * @param delayTtl  延时ttl时长(毫秒数)
	 */
	public static final <T> void createDelayTask(Class<? extends DelayTask> taskClass, T data, int delayTtl) {
		Assert.notNull(taskClass, "延时任务taskClass不能为空");
		Date time = new Date();
		long day = DateUtil.between(time, DateUtil.offsetMillisecond(time, AmqpUtil.maxDelayTtl), DateUnit.DAY);
		Assert.isFalse(delayTtl > AmqpUtil.maxDelayTtl, "延时任务最大时长(" + day + "天)");
		DelayTaskParam<T> taskParam = new DelayTaskParam<T>();
		taskParam.setType(0);
		taskParam.setTime(time);
		taskParam.setDelayTaskClass(taskClass.getName());
		taskParam.setData(data);
		AmqpTemplate amqpTemplate = SpringUtil.getBean(AmqpTemplate.class);
		amqpTemplate.convertAndSend(TtlQueue.EXCHANGE, TtlQueue.ROUTINGKEY, taskParam, message -> {
			message.getMessageProperties().setExpiration(String.valueOf(delayTtl));
			return message;
		});
	}

	/**
	 * 创建延时任务
	 * 
	 * @param exchange   任务交换机名称
	 * @param routingKey 任务路由名称
	 * @param data       任务数据
	 * @param delayTtl   延时ttl时长(毫秒数)
	 */
	public static final <T> void createDelayTask(String exchange, String routingKey, T data, int delayTtl) {
		Assert.notBlank(exchange, "延时任务exchange不能为空");
		Assert.notBlank(routingKey, "延时任务routingKey不能为空");
		Date time = new Date();
		long day = DateUtil.between(time, DateUtil.offsetMillisecond(time, AmqpUtil.maxDelayTtl), DateUnit.DAY);
		Assert.isFalse(delayTtl > AmqpUtil.maxDelayTtl, "延时任务最大时长(" + day + "天)");
		DelayTaskParam<T> taskParam = new DelayTaskParam<T>();
		taskParam.setType(1);
		taskParam.setTime(time);
		taskParam.setExchange(exchange);
		taskParam.setRoutingKey(routingKey);
		taskParam.setData(data);
		AmqpTemplate amqpTemplate = SpringUtil.getBean(AmqpTemplate.class);
		amqpTemplate.convertAndSend(TtlQueue.EXCHANGE, TtlQueue.ROUTINGKEY, taskParam, message -> {
			message.getMessageProperties().setExpiration(String.valueOf(delayTtl));
			return message;
		});
	}

	/**
	 * 延时任务(执行类接口)
	 */
	public static interface DelayTask {

		/**
		 * 方法名
		 */
		public static final String method = "execute";

		/**
		 * 执行任务
		 * 
		 * @param data 任务数据
		 */
		void execute(Object data);

	}

	/**
	 * 延时任务参数
	 */
	@Data
	@Accessors(chain = true)
	public static final class DelayTaskParam<T> implements Serializable {

		private static final long serialVersionUID = 1L;

		/**
		 * 类型,0:本地任务(反射执行方法),1:队列任务
		 */
		private int type;

		/**
		 * 添加时间
		 */
		private Date time;

		/**
		 * 本地任务-任务执行类
		 */
		private String delayTaskClass;

		/**
		 * 队列任务-交换机名称
		 */
		private String exchange;

		/**
		 * 队列任务-路由名称
		 */
		private String routingKey;

		/**
		 * 任务数据
		 */
		private T data;

	}

	/**
	 * 拼接交换机名称
	 * 
	 * @param key
	 * @return
	 */
	public static final String exchangeName(String key) {
		Assert.notBlank(key, "key不能为空");
		return key + Exchange;
	}

	/**
	 * 拼接队列名称
	 * 
	 * @param key
	 * @return
	 */
	public static final String queueName(String key) {
		Assert.notBlank(key, "key不能为空");
		return key + Queue;
	}

	/**
	 * 拼接路由名称
	 * 
	 * @param key
	 * @return
	 */
	public static final String routingKeyName(String key) {
		Assert.notBlank(key, "key不能为空");
		return queueName(key) + RoutingKey;
	}
}

使用方式

队列任务

/**
 * 创建延时任务
 * 
 * @param exchange   任务交换机名称
 * @param routingKey 任务路由名称
 * @param data       任务数据
 * @param delayTtl   延时ttl时长(毫秒数)
 */
AmqpUtil.createDelayTask(String exchange, String routingKey, T data, int delayTtl);

本地任务

/**
 * 创建延时任务
 * 
 * @param taskClass 任务执行类
 * @param data      任务数据
 * @param delayTtl  延时ttl时长(毫秒数)
 */
AmqpUtil.createDelayTask(Class<? extends DelayTask> taskClass, T data, int delayTtl);