帮助中心 >  行业资讯 >  开发 >  从代码分析事务消息&消息延时

从代码分析事务消息&消息延时

2021-04-26 15:07:30 7910

代码分析事务消息

分布式事务模型图如下:



1.png


我们先看事务消息客户端的实现。即上图的MQClient.

我们先看代码的整体封装。

下面代码段做了两件事:

1.本地数据库写入 2.事务消息客户端发送消息。

@Transactional表示开启事务。在注释下,开启一个新的Order。用OrderDao将数据写入本地数据库。然后调用transactionMsgClient.sendMsg将消息发出去(这是静态方法调用,transactionMsgClient是一个类,sendMsg是它的方法)。这样,本地数据库写入和发消息这两件事,就是个原子事务,也就是说,两件事要么一起成功,要么一起失败。


2.png




上面代码用到了MybatisTransactionMsgClient。MyBatis是一个Java持久化框架,它通过XML描述符或注解把对象与存储过程或SQL语句关联起来,映射成数据库内对应的记录。

上面提到过,transactionMsgClient.sendMsg是个类,这个类继承了TransactionMsgClient。下面代码中,transactionMsgClient.sendMsg调用了其父类的TransactionsendClient的sendMsg方法,写事务消息表,并且发送消息。


3.png




我们接下来看sendMsg这个方法到底做了什么:

1、消息内容落数据库  2、发送消息

下面代码会先做一个判断,在if字段里:con.getAutoCommit。也就是说,只有当没有开启自动commit的时候(有自动提交就破坏了事务的原子性),把信息写在数据库表里,然后构造一个messages,发消息。而发消息的方法是将消息放到消息队列中。


4.png




在事务消息设计中,后台发送消息队列设计,如下图所示:


5.png



参照上图,我们可以看到后台发送消息队列有两个:


SendMsg队列的消息消费很快,基本上放进去很快就会被消费掉。这样重试才能有效,否则一直重试,没有意义。


6.png



下面代码段的的作用:是发送消息时,从队列里中取消息,看是否到期,将到期的消息取出来:


7.png



后台优先队列的维护。


8.png



最后,事务消息表的设计;


9.png



代码分析事务消息

我们知道,RocketMQ支持延时消息。我们先看一下延时消息的应用场景。

延时需求是在当前时间后的某一时间点触发指定的业务逻辑或操作。

  • 例如微信发消息,过一段时间没发送成功的话,提示重新发送(如下图的小红点提示)。



  • 订单状态流转:如延时支付。京东上超过24小时的订单会被自动取消。

实际上,我们在分布式事务的终极模型中,也用到了延时消息。

RocketMQ支持18个级别的延时等级:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h。也就是说,消息延时发送有这18个级别。如果业务的延时消息需求与这18个级别不匹配,就需要自行基于RocketMQ进行二次开发。


10.png



接下来,我们看一下18个延时的实现原理。

RocketMQ的18个延时级别,每个级别对应一个Queue,根据Level参数,将消息放到对应的18个队列中的等级。每个队列都对应了到时出队。例如1s队列就是1s出队,2小时队就是2小时出队。消息出队以后,当成正常消息投递。然后被投递到了消费队列,被消费者消费掉。


11.png




提交成功!非常感谢您的反馈,我们会继续努力做到更好!

这条文档是否有帮助解决问题?

非常抱歉未能帮助到您。为了给您提供更好的服务,我们很需要您进一步的反馈信息:

在文档使用中是否遇到以下问题: