-
正文
RocketMq死信队列,通过对正常队列中消息设置失效时间,并指定失效后策略(设置死信交换机),达到延时处理任务的目的。通过该方式能极大减少无意义的批处理任务,同时减少传统批处理任务大量sql空查询,减少资源消耗。
注:以下代码仅为部分示例,需要配置mq连接池之后才能真正运行起来。
生产者推送消息配置
package com.longlonggo.test.mq; import com.mqclient.config.ClientConfig; import com.mqclient.dto.MsgDto; import com.mqclient.dto.MsgTypeEnum; import com.mqclient.dto.SendModelEnum; import com.mqclient.util.ChannelUtil; import com.mqclient.util.MqConfig; import cn.hutool.core.date.DateUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * 生产者测试类 */ public class ProducerTest { public static void main(String[] args) throws Exception{ //正常队列 String normalExchange = "longlonggo_normal_queue"; String normalQueue = "longlonggo_normal_queue"; String normalRoutingKey = "tolonglonggoDlx"; String normalConnectionName = "longlonggo_normal_conn"; //死信队列配置,交换机名称使用默认队列 String dlxExchange = "longlonggo_dlx_exchange"; String dlxQueue = "longlonggo_dlx_queue"; String dlxRoutingKey = "#"; String dlxConnectionName = "longlonggo_dlx_conn"; //校验MQ是否初始化 ClientConfig.checkAppType(); //信道持久化模式:永久; //设置消息等待N毫秒推送至死信队列(失效时间也可以设置在队列上,如果都设置则以近的优先) AMQP.BasicProperties pros = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); // 构建消息队列入参 Map<String, String> map = new HashMap<>(1); map.put("content", DateUtil.formatDateTime(new Date())); map.put("mqType", "mqType"); // 设置队列消息过期后,丢到的死信交换机 Map<String, Object> arguments = new HashMap<String, Object>(16); arguments.put("x-dead-letter-exchange", dlxExchange); // 设置队列中的消息 10s 钟后过期【失效时间可以设置在队列上,也可以设置在消息上】 //arguments.put("x-message-ttl", 10000); //正常的队列绑定 Channel channel = ChannelUtil.getChannel(normalConnectionName); channel.exchangeDeclare(normalExchange, "topic", true, false, null); channel.queueDeclare(normalQueue, true, false, false, arguments); channel.queueBind(normalQueue, normalExchange, normalRoutingKey); // 创建死信交换器和队列 channel.exchangeDeclare(dlxExchange, "topic", true, false, null); channel.queueDeclare(dlxQueue, true, false, false, null); channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey); //执行推送 channel.basicPublish(normalExchange, normalRoutingKey, false, false, pros, map.toString().getBytes()); System.err.println("消息发送完成......"); } }
消费者配置
package com.longlonggo.test.mq; import com.mqclient.config.ClientConfig; import com.mqclient.util.ChannelUtil; import com.mqclient.util.MqConfig; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; /** * 消费者测试类 */ public class ConsumerTest { //消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了 private static final String QUEUE_NAME = "longlonggo_dlx_queue"; public static void main(String[] args) throws Exception { // 创建信道 final Channel channel = ChannelUtil.getChannel("longlonggo_dlx_conn"); System.out.println("消费者启动 .........."); com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String resultMsg = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); resultMsg += "死信队列接收到消息:"; resultMsg += new String(body); System.err.println(resultMsg); System.err.println("deliveryTag:" + envelope.getDeliveryTag()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); } }