RocketMQ消息过滤与查询的实现

作者:徘徊笔记(同公众号) 时间:2023-06-26 10:04:25 

消息过滤

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。

RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。

其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。

RocketMQ消息过滤与查询的实现

主要支持如下2种的过滤方式

(1) Tag过滤方式:

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。

其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。

Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

(2) SQL92的过滤方式:

这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。

每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

消息查询

RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。

按照MessageId查询消息

RocketMQ中的MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。

“按照MessageId查询消息”在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。

Broker端走的是QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

按照Message Key查询消息

“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。

索引文件的具体结构如下:

RocketMQ消息过滤与查询的实现

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是:$HOME\store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040个字节大小。

如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。

如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。

NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。

Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4\*500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。

20\*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

来源:https://blog.csdn.net/ph3636/article/details/89066990

标签:RocketMQ,消息,过滤,查询
0
投稿

猜你喜欢

  • c#模拟银行atm机示例分享

    2023-04-01 10:59:45
  • Android开发实战闹钟项目

    2022-02-28 19:48:38
  • 基于spring @Cacheable 注解的spel表达式解析执行逻辑

    2023-07-03 19:46:45
  • SpringBoot整合Shiro实现登录认证的方法

    2022-03-23 01:12:19
  • 一篇文章带你Java Spring开发入门

    2021-06-25 10:04:15
  • C语言 超详细总结讲解二叉树的概念与使用

    2023-11-08 20:24:56
  • Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

    2023-04-28 03:15:42
  • Struts2返回json格式数据代码实例

    2023-10-12 13:15:32
  • jar包手动添加到本地maven仓库的步骤详解

    2023-11-23 05:09:37
  • Android实现折线图小工具

    2023-08-03 10:17:22
  • Android Studio 运行按钮灰色的完美解决方法

    2023-08-16 05:59:42
  • C# 获取硬件参数的实现方法

    2023-11-04 21:30:38
  • java实现简易超市管理系统 附源码下载

    2021-11-05 18:58:30
  • Android 模仿QQ侧滑删除ListView功能示例

    2023-10-27 21:03:43
  • spring boot与ktor整合的实现方法

    2022-01-18 04:20:19
  • c# WinForm制作图片编辑工具(图像拖动、缩放、旋转、抠图)

    2022-05-20 12:32:40
  • Android颜色配置器配置方法

    2022-06-10 23:39:33
  • Android如何获取QQ与微信的聊天记录并保存到数据库详解

    2023-04-12 06:54:03
  • Java实现的简单网页截屏功能示例

    2021-05-25 13:50:35
  • Java简单从文件读取和输出的实例

    2023-04-04 20:16:45
  • asp之家 软件编程 m.aspxhome.com