MQ005——元数据和消息数据的存储设计

如何设计元数据和消息数据的存储模块??

前言

储模块作为 MQ 高吞吐、低延时、高可靠性的基础保证,可以说是最核心的模块。从技术架构的来看,存储模块主要包含功能实现性能优化两个方面,这篇 blog 就重点来看一下其是如何实现的。

MQ 的存储模块的主流程是数据的写入、存储、读取、过期。写入和持久化存储是基本功能,但因为消息队列独有的产品特性,主要被用来当缓冲分发,它的数据存储是临时的,数据持久化存储后,在一定的时间或操作后,需要能自动过期删除。

而且 MQ 中的数据一般分为元数据消息数据。元数据指的是 Topic、Group、User、ACL 和 Config 等集群维度的资源数据信息,消息数据是指客户端写入的用户的业务数据。

元数据信息的存储

元数据信息的特点是数据量比较小,不会经常读写,但是需要保证数据的强一致和高可靠,不允许出现数据的丢失。同时,元数据信息一般需要通知到所有的 Broker 节点,Broker 会根据元数据信息执行具体的逻辑。比如创建 Topic 并生成元数据后,就需要通知对应的 Broker 执行创建分区、创建目录等操作。

所以元数据信息的存储,一般有两个思路:

  • 基于第三方组件来实现元数据的存储;
  • 在集群内部实现元数据的存储。

基于第三方组件来实现元数据的存储是目前业界的主流选择。比如 Kafka Zookeeper 版本、RocketMQ 和 Pulsar 用的都是这个思路。其中 Kafka 和 Pulsar 的元数据存储在 Zookeeper 中。

metadata01

这个方案最大的优点是集成方便,开发成本低,能满足消息队列功能层面的基本要求,因为可以直接复用第三方组件已经实现的一致性存储、高性能的读写和存储、Hook 机制等能力,而且在后续集群构建规划的过程中也可以继续复用这个组件,能极大程度降低开发难度和工作成本。

但凡事都有利弊。其缺点也很明显,那就是引入第三方组件会增加集群系统部署和运维的成本,而且第三方组件自身的稳定性问题也会增加系统风险,第三方组件和多台 Broker 之间可能会出现数据信息不一致的情况,导致读写异常。

另外一种思路,集群内部实现元数据的存储是指在集群内部完成元数据的存储和分发。也就是在集群内部实现类似第三方组件一样的元数据服务,比如 Raft 协议实现内部的元数据存储模块或依赖一些内置的数据库。目前 Kafka 去 Zookeeper 版本用的就是这个思路。

metadata02

这个方案的优缺点刚好与第一个相反。优点是部署和运维成本低,不会因为依赖第三方服务导致稳定性问题,也不会有数据不一致的问题。但是缺点是开发成本高,前期要投入大量的开发成本。

消息数据的存储

与元数据的存储相比,消息数据的存储要复杂一点。一般情况下,MQ 的存储主要是指消息数据的存储,分为存储结构、数据分段、数据存储格式和数据清理四个部分。

数据存储结构设计

我们先看数据存储目录结构设计。在消息队列中,与存储有关的主要是 Topic 和分区两个维度。用户可以将数据写入 Topic 或直接写入到分区。

不过如果写入 Topic,数据也是分发到多个分区去存储的。所以从实际数据存储的角度来看,Topic 和 Group 不承担数据存储功能,承担的是逻辑组织的功能,实际的数据存储是在分区维度完成的

messagedata01

从架构角度上看,数据的落盘也有两种思路:

  • 每个分区单独一个存储“文件”
  • 每个节点上所有分区的数据都存储在同一个“文件”

需要注意的是,这里的“文件”是一个虚指,即表示所有分区的数据是存储在一起,还是每个分区的数据分开存储的意思。在实际的存储中,这个“文件”通常以目录的形式存在,目录中会有多个分段文件。

先来看第一个思路,每个分区对应一个文件的形式去存储数据。具体实现时,每个分区上的数据顺序写到同一个磁盘文件,数据的存储是连续的。因为消息队列在大部分情况下的读写是有序的,所以这种机制在读写性能上的表现是最高的

但如果分区太多,会占用太多的系统 FD 资源,极端情况下有可能把节点的 FD 资源耗完,并且硬盘层面会出现大量的随机读写情况,导致写入的性能下降很多,另外管理起来也相对复杂。目前 Kafka 在存储数据的组织上用的就是这种思路。

messagedata02

具体的磁盘的组织结构一般有“目录+分区二级结构”和“目录+分区一级结构”地两种形式。不过从技术上来看,二者并没有太大的优劣区别。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
目录+分区二级结构:
├── topic1
│   ├── partrt0
│   ├── 1
│   └── 2
└── topic2
    ├── 0
    ├── 1

目录+分区一级结构:
├── topic1-0
├── topic1-1
├── topic1-2
├── topic2-0
├── topic2-1
└── topic2-2

再来看第二种思路,每个节点上所有分区的数据都存储在同一文件中,需要为每个分区维一个对应的索引文件,索引文件里会记录每条消息在 File 里面的位置信息,以便快速定位到具体的消息内容。

messagedata03

因为所有文件都在一份文件上,管理简单,也不会占用过多的系统 FD 资源,单机上的数据写入都是顺序的,写入的性能会很高。缺点是同一个分区的数据一般会在文件中的不同位置,或者不同的文件段中,无法利用到顺序读的优势,读取的性能会收到影响,但是随着 SSD 技术的发展,随机读写的性能也越来越高。最简单的体现就是固态越来越便宜咯。如果使用 SSD 或高性能 SSD,一定程度上可以缓解随机读写的性能损耗。

那么该如何选择呢?核心考虑是对读写的性能要求

  • 第一种思路,单个文件读和写都是顺序的,性能最高。但是当文件很多且都有读写的场景下,硬盘层面就会退化为随机读写,性能会严重下降;
  • 第二种思路,因为只有一个文件,不存在文件过多的情况,写入层面一直都会是顺序的,性能一直很高。但是在消费数据的时候,因为多个分区数据存储在同一个文件中,同一个分区的数据在底层存储上是不连续的,硬盘层面会出现随机读的情况,导致读取的性能降低。

不过随机读写带来的性能问题,可以通过给底层配备高性能的硬件来缓解。所以当前比较多的 MQ 选用的是第二种方案,但是 Kafka 为了保证更高的吞吐性能,选用的是第一种方案。

但是不管是方案一还是方案二,在数据存储过程中,如果单个文件过大,在文件加载、写入和检索的时候,性能就会有问题,并且 MQ 有自动过期机制,如果单个文件过大,数据清理时会很麻烦,效率很低。所以,我们的消息数据都会分段存储。

消息数据的分段实现

数据分段的规则一般是根据大小来进行的,比如默认 1G 一个文件,同时会支持配置项调整分段数据的大小。当数据段达到了规定的大小后,就会创建一个新的文件来保存数据。如果进行了分段,消息数据可能分布在不同的文件中。所以我们在读取数据的时候,就需要先定义消息数据在哪个文件中。为了满足这个需求,技术上一般有根据偏移量定位或根据索引定位两种思路。

根据偏移量(Offset)来定位消息在哪个分段文件中,是指通过记录每个数据段文件的起始偏移量、中止偏移量、消息的偏移量信息,来快速定位消息在哪个文件。

当消息数据存储时,通常会用一个自增的数值型数据(比如 Log)来表示这条数据在分区或 commitlog 中的位置,这个值就是消息的偏移量。

messagedata04

在实际的编码过程中,记录文件的起始偏移量一般有两种思路:单独记录每个数据段的起始和结束偏移量,在文件名称中携带起始偏移量的信息。因为数据是顺序存储的,每个文件都记录了其对应的起始偏移量,那么下一个文件的起始偏移量就是上一个文件的结束偏移量。

如果用索引定位,会直接存储消息对应的文件信息,而不是通过偏移量来定位到具体的文件。具体是通过维护一个单独的索引文件,记录消息在哪个文件和文件的哪个位置。读取消息的时候,先根据消息 ID 找到存储的信息,然后找到对应的文件和位置,读取数据。RocketMQ 用的就是这个思路。

messagedata05

这两种方案所面临的的场景不一样。根据偏移量定位数据,通常用在每个分区各自存储一份文件的场景;根据索引定位数据,通常用在所有分区的数据存储在同一份文件的场景。因为前一种场景,每一分数据都属于同一个分区,那么通过位点来二分查找数据的效率是最高的。第二种场景,这一份数据属于多个不同分区,再使用二分查找就不是那么明智咯,可以选择使用哈希查找。

数据消息存储格式

消息数据存储格式一般包含消息写入文件的格式和消息内容的格式两个方面。

消息写入文件的格式指消息是以什么格式写入到文件中的,比如 JSON 字符串或二进制。从性能和空间冗余的角度来看,消息队列中的数据基本都是以二进制的格式写入到文件的。这部分二进制数据,我们不能直接用 vim/cat 等命令查看,需要用专门的工具读取,并解析对应的格式。

消息内容的格式是指写入到文件中的数据都包含哪些信息。对于一个成熟的消息队列来说,消息内容格式不仅关系功能维度的扩展,还牵涉性能维度的优化。

如果消息格式设计的不够精简,功能和性能都会大打折扣。比如冗余字段会增加分区的磁盘占用空间,使存储和网络开销变大,性能也会下降。如果缺少字段,则可能无法满足一些功能上的需求,导致无法实现某些功能,又或者是实现某些功能的成本较高。所以在数据的存储格式设计方面,内容的格式需要尽量完整且不要有太多冗余。

这么说可能会感觉比较抽象,我们分析一下 Kafka 的消息内容格式设计来直观的感受一下。

字段名含义
baseOffset用在 Batch 中,该批次消息的起始 offset
lastOffset用在 Batch 中,该批次消息的结束 offset
count表示该行记录包含了多少条信息
baseSequence用在 Batch 中,起始序号,用来支持幂等和事务
lastSequence用在 Batch 中,结束序号,用来支持幂等和事务
producerIdPID,用来支持幂等和事务
producerEpoch分区 leader 纪元,可以看作分区 leader 的版本号或更新次数
isTransactional是否事务信息
isControl-
position该行记录在本文件的偏移量
size该行记录的总大小
magic当前数据存储格式的版本,kafka 有 v0、v1 和 v2 三种格式
compresscodec压缩格式,NONE 表示不压缩
crcCRC 校验码,用来校验数据在传输过程中的准确性
isvalid表示数据是否可用,比如是否被删除
offset该行记录的在这个分区的偏移量
LogAppendTime数据写入到文件的时间
keysizekey 长度
valuesizepayload 长度
sequence序号,用来支持幂等和事务
headerKeys消息 header 的内容
key消息的 key
payload消息的内容

可以看到,Kafka 的消息内容包含了业务会感知到的消息的 Header、Key 和 Value,还包含了时间戳、偏移量、协议版本、数据长度和大小、校验码等基础信息,最后还包含了压缩、事务、幂等 Kafka 业务相关的信息。

需要注意的是,因为 kafka 支持 Batch 特性,所以消息格式中还包含 base 和 last 等 Batch 相关信息。

消息数据清理机制

前文提到过,消息队列的数据在持久化存储后,需要在一定策略后自动过期删除。那么数据过期机制是如何实现的呢?

消息队列中的数据最终都会删除,时间周期短的话几个小时、甚至几分钟,正常情况一天、三天、七天,长的话可能数个月,基本很少有场景需要再消息队列中存储一年的数据。

消息队列的数据过期机制一般有手动删除和自动删除两种形式,从实现上看主要有三种思路:

  • 消费完成执行 ACK 删除数据
  • 根据时间和保留大小删除
  • ACK 机制和过期机制相结合

消费完成执行 ACK 删除数据,技术上的实现思路一般是:当客户端完成消费数据后,回调客户端的 ACK 接口,告诉服务端数据已经消费成功,服务端就会标记删除该行数据,以确保消息不会被重复消费。ACK 的请求一般会有单条消息 ACK 和批量消息 ACK 两种形式。

messagedata06

因为消息队列的 ACK 一般是顺序的,如果前一条消息无法被正确处理并 ACK,就无法消费下一条数据,导致消费卡住。此时就需要死信队列的功能,把这条数据先写入到死信队列,等待后续的结果。然后 ACK 这条消息,确保消费正确进行。

这种方案,优点是不会出现重复消费,一条消息只会被消费一次。缺点是 ACK 成功后消息被删除,无法满足需要消息重放的场景。

根据时间和保留大小删除指消息在被消费后不会被删除,只会通过提交消费位点的形式标记消费进度。

实现思路一般是服务端提供偏移量提交的接口,当客户端消费成功数据后,客户端会回调偏移量接口,告诉服务端这个偏移量的数据已经消费成功了,让服务端吧偏移量记录起来。然后服务端会根据消息保留的策略,比如保留时间或保留大小来清理数据。一般通过一个常驻的异步线程来清理数据。

messagedata07

这个方案,一条消息可以重复消费多次。不管有没有被成功消费,消息都会根据配置的时间规则或大小规则进行删除。优点是消息可以多次重放,适用于需要多次进行重放的场景。缺点是在某些情况下(比如客户端使用不当)会出现大量的重复消费。

结合前两个方案,就有了 ACK 机制和过期机制相结合的方案。实现的核心逻辑和方案二比较类似,但保留了 ACK 的概念,不过 ACK 是相对于 Group 概念的。

当消息完成后,在 Group 维度 ACK 消息,此时消息不会被删除,只是这个 Group 也不会再重复消费到这个消息,而新的 Group 可以重新消费订阅这些数据。所以在 Group 维度避免了重复消费的情况,也可以运行重复订阅。

messagedata08

前面我们虽然反复提到“删除”,但数据实际怎么删除也有讲究。我们知道消息数据是顺序存储在文件中的,会有很多分段数据,一个文件可能会有很多行数据。那么在 ACK 或者数据删除的时候,一个文件中可能既存在可删除数据,也存在不可删除数据。如果我们每次都立即删除数据,需要不断执行“读取文件、找到记录、删除记录、写入文件”的过程,即使批量操作,降低频率,还是得不断地重复这个过程,会导致性能明显下降。

当前主流的思路是延时删除,以段数据为单位清理,降低频繁修改文件内容和频繁随机读写文件的操作。

messagedata09

只有该段里面的数据都允许删除后,才会把数据删除。而删除该段数据中的某条数据时,会先对数据进行标记删除,比如在内存或 Backlog 文件中记录待删除数据,然后在消费的时候感知这个标记,这样就不会重复消费这些数据。

总结

消息队列的存储分为元数据存储和消息数据存储两方面。

元数据的存储主要依赖第三方组件实现,比如 ZooKeeper、etcd 或者自研的简单元数据存储服务等等。在成熟的消息队列架构中,基于简化架构和提升稳定性的考虑,都会考虑在集群内部完成元数据的存储和管理。

消息数据的存储在功能层面包含数据存储结构设计、数据分段存储、数据存储格式、数据清理机制四个方面。

消息数据的存储主要包含 Topic 和分区两个维度。Topic 起逻辑组织作用,实际的数据存储是在分区维度完成的。所以在数据存储目录结构上,我们都以分区为最小粒度去设计,至于选择每个分区单独一个存储文件,还是将每个节点上所有分区的数据都存储在同一个文件,方案各有优劣,你可以根据实际情况去选择。

因为大文件存在性能和资源占用、数据清理成本等问题,一般情况下,我们都需要对数据文件进行分段处理,分段的策略一般都是按照文件大小进行的。

数据存储格式可以分为基础信息和业务信息两个维度,数据格式需要遵循极简原则,以达到性能和成本的最优。数据的过期策略一般有三种,ACK 删除、根据时间和保留大小删除数据、两者结合。目前业界的实现比较多样,从选择上来看,两者结合的方案更合理。