当前位置: 首页>资讯 >

RabbitMq TTL+死信队列 延迟消息问题记录

来源: 腾讯云 | 时间: 2023-02-28 06:05:23 |

延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

利用RabbitMqTTL和死信队列 来实现延时消费。

如果设置的是队列统一过期时间放到死信队列,没有什么问题。


(资料图片)

如果是延时时间设置到每条消息上的。而不是给队列的。

实现方式为消息存活时间为动态用户页面可配置的。

这就导致了一个问题:

先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。

结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。

原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。

它不会检测每一条消息是否过期。而是顺序检测。

如果first in的消息过期时间很长,会导致它阻塞后进的消息。

不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。

问题解决

这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange

一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送

插件安装

需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0,对应的插件版本就是:3.11.1

基于Linux

--1、cd到rabbitmq默认安装位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通过ftp工具将插件上传到此目录下--3、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重启MQ服务systemctl restart rabbitmq-server

基于Docker

--1、通过ftp工具将插件上传到Linux服务器的根目录下--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、进入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)cd pluginsls |grep delay--5、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重启MQ服务docker restart 容器ID

安装成功

web界面新建交换机选择类型出现红框标注即表示成功

image.png

代码实现

1:springBoot配置

@Configurationpublic class DelayRabbitmqConfig {     /**     * 声明延迟队列     * @return     */    @Bean    public Queue delayQueue(){        return new Queue(QueueConstant.DelayQueue,                true,false,false);    }     /**     * 声明延迟自定义交换机类型     * @return     */    @Bean    public CustomExchange delayCustomExchange(){        HashMap args = new HashMap<>();//        设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递        args.put("x-delayed-type","direct");        return new CustomExchange(ExchangeConstant.DelayCustomerExchange,                "x-delayed-message",true,false,args);    }     /**     * 绑定延迟交换机和队列     * @return     */    @Bean    public Binding delayQueueAndCustomExchange(){        return BindingBuilder.bind(delayQueue())                .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();    }}

springMvc配置

引入依赖:    xmlns:util="http://www.springframework.org/schema/util"    http://www.springframework.org/schema/util    http://www.springframework.org/schema/util/spring-util-4.0.xsd                                                                                                                

代码实现

//消息发送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每条消息时间配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延迟消息处理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor {    /**     * 消息延迟时间,单位:毫秒     */    private final Integer TTL;    public MyMessagePostProcessor(final Integer ttl) {        this.TTL = ttl;    }    @Override    public Message postProcessMessage(Message message) throws AmqpException {        message.getMessageProperties().setDelay(TTL);        return message;    }}

关键词: RabbitMQ

 

热文推荐

RabbitMq TTL+死信队列 延迟消息问题记录

原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。

2023-02-28

SAMA四p门照片下载

1、miss的在她淘宝店买东西似乎送签名照。2、不行就找个视频截图!伊夫雷亚也截图哈哈哈。本文到此分享完

2023-02-28

天天亮点!奇数年限定!自2011年起,梅西百球里程碑的年份均为奇数年

巴黎3-0战胜马赛的比赛中,梅西俱乐部生涯进球数来到了700球。OPTA也更新社媒动态,晒出了梅西的百球年份。200球——2011年300球——2013年40

2023-02-28

外交部发言人回应美方以涉俄因素制裁中企:将予以坚决反制

新华社北京2月27日电(记者成欣、冯歆然)针对美国以涉俄因素为由制裁中国企业一事,外交部发言人毛宁27日在例行记者会上表示,美方行径严重损

2023-02-27

环球播报:价格锚点是什么意思_锚点是什么意思

1、锚点:使用命名锚记可以在文档中设置标记,这些标记通常放在文档的特定主题处或顶部。2、然后可以创建到这些命名锚记的链接

2023-02-27

洁美科技: 第四届董事会第二次会议决议公告

洁美科技:第四届董事会第二次会议决议公告

2023-02-27

董明珠:今年两会将再度建议提高个税起征点

董明珠再度建议提高个税起征点

2023-02-27

昆明市公务员无偿献血月开启|环球聚看点

2月24日,昆明市第八届公务员无偿献血月市直机关爱心献血活动在昆明市级行政中心启动。来自昆明市级机关和呈贡区级机关的近6

2023-02-27

观天下!上横沥大桥

1、上横沥大桥桥梁总长965米,主桥为(57 5+100+57 5米)三跨连续刚构桥+引桥现浇连续梁,设置双向6车道

2023-02-27

须和需的区别大全_须和需的区别_世界热门

1、需:主要是指需要,比如说本我的需求,即我想要的或彼想要的,需一般搭配物,而需虽然也有应当的意思,不过其中包含了因为有

2023-02-27

切诺尔贝利-资讯

1、《切尔诺贝利》是丹尼拉·科兹洛夫斯基执导的,丹尼拉·科兹洛夫斯基、奥莎娜·阿金什那、菲利普·阿德耶夫主演的灾难片。2

2023-02-26

我的逆天神器_我的逆天神器第二季|全球播资讯

1、这个动漫很久了但第二季始终没有可能是这动漫反响不高赚不到钱应该不出第二季了吧。本文就为大家分享到这里,希望小

2023-02-26

中国舞蹈文化说——舞蹈基本问题及中国舞蹈文化观研究

1、中国舞蹈文化说——舞蹈基本问题及中国舞蹈文化观研究。2、主讲人罗斌。文章到此就分享结束,希望对大家有所帮助。

2023-02-25

02月25日从杭州出发到吉安的防疫政策_天天快资讯

02月25日从杭州出发到吉安的防疫政策(数据来源:本地宝)1、出杭州-:正常通行2、到吉安-:除规定的重点岗位人群外,不

2023-02-25

临沂焦庄电动汽车充电站 世界时快讯

1、临沂焦庄电动汽车充电站,于2010年6月23日,山东省首座大型电动汽车充电站在临沂焦庄竣工投用。2、可同时为2

2023-02-25

快资讯丨湖南维财金

1、“维财金”是湖南维财大宗贵金属交易所推出的实物黄金投资金条,面向投资者开展金条的“卖出+回购”业务。2、金条成色为9

2023-02-25

速讯:国家控烟办到大方县调研指导成人烟草流行监测工作

国家控烟办到大方县调研指导成人烟草流行监测工作

2023-02-24

海新能科:山东三聚本次停车检修是年度生产计划工作安排 4月10日前恢复产出合格产品 世界短讯

海新能科在互动平台表示,山东三聚自去年8月30日以来连续开车,运行平稳;本次停车检修是年度生产计划工作安排,预计时间为45-50天,4月10日前

2023-02-24

老默我想吃鱼了被抢注商标,老默吃鱼公司扎堆成立

日前,热播剧《狂飙》中的一句台词“老默,我想吃鱼了”刷屏网络。企查查APP显示,近日,多家名为“老默吃鱼”公司成立,涉及湖北、上海、东莞

2023-02-24

却的笔顺的读法

一、关于却的笔顺的读法1、读法:[què]二、关于却的笔顺的释义2、基础释义:1 后退:退~。~步。2 使退却:~敌

2023-02-24