MQ006——剖析生产者 SDK

生产端:生产者客户端 SDK 有哪些设计要点??

前言

部分开发者在使用某个组件或框架的时候,都希望能够做到开箱即用,作为一款成熟的产品来说,也确实应该做到。那么,在使用的过程中是否会有疑问,这些框架的 SDK 底层是如何工作的呢,由哪些功能模块所组成的呢?消息队列的客户端主要包含生产、消费、集群管控三类功能。我们先用 MQ 中的生产者为例,来进行一个浅层次的设计分析。。从客户端 SDK 实现的角度来看,生产模块包含客户端基础功能和生产相关功能两部分,其中基础功能是客户端所有功能共有的。如下图所示:

producer-framework

基础功能是蓝色部分,包括请求连接管理、心跳检测、内容构建、序列化、重试、容错处理等等。生产功能是黄色部分,包括客户端寻址、分区选择、批量发送、生产错误处理、SSL、幂等和压缩等等。

客户端基础功能

连接管理

网络模块,讲过客户端和服务端之间基本都是通过各自语言的网络库,创建 TCP 长连接进行通信的。在大部分实现中,为了避免连接数膨胀,每个客户端实例和每台 Broker 只会维护一条 TCP 连接。

建立一条 TCP 连接是简单的,关键的是,什么情况下建立连接呢??一般来说有初始化创建连接和使用时创建连接两种方式。

  • 初始化创建连接:指在实例初始化时就创建到各个 Broker 的 TCP 连接,等待数据发送。好处是提前创建好可以避免发送的时候冷启动;缺点是需要提前创建好所有的连接,可能导致连接空跑,会消耗一定的资源。
  • 使用时创建连接:指在实例初始化时不建立连接,当需要发送数据时再建立。好处是发送时再连接,连接的使用率会较高;缺点是可能出现连接冷启动,会增加一点本次请求的耗时。

因为客户端会有空闲连接回收机制,创建连接的耗时一般较短,所以在实际的架构实现中,两种方式都会有用到,优劣区别并不是很明显。不过,从资源利用率的角度考虑,建议使用晚建立连接的方式

因为连接并不是任何时候都有数据,可能出现长时间连接空闲。所以连接都会搭配连接回收机制,连接建立后如果连接长时间空闲,就会被回收。连接回收的策略一般是判断这段时间内是否有发送数据的行为,如果没有就判断是空闲,然后执行回收。

因为单个 TCP 连接发送性能存在上限,就需要在客户端启动多个生产者,提高并发读写的能力。一般情况下,每个生产者会有一个唯一的 ID 或唯一标识来标识客户端,比如 ProduceID 或客户端的 IP+Port。

单个 TCP 的瓶颈和很多因素有关,比如网路带宽、网络延迟、客户端请求端的 socketbuff 的配置、TCP 窗口大小、发送速率导致本地数据反压堆积、服务端请求队列的堆积情况、收包和回包的速度等等。

接下来继续看看客户端和服务端之间的心跳检测。

心跳检测

心跳检测是客户端和服务端之间保活的一种机制,检测服务端或者客户端的一方不可用时,另一方可以及时回收资源,避免资源浪费。一般都是通过 ping-pong 的方式来发起探测。之前的内容有提到过,消息队列一般都是基于 TCP 协议通信的。所以客户端和服务端之间的心跳机制的实现,一般有基于 TCP 的 KeepAlive 保活机制和应用层主动探测两种形式。

producer-heart

基于 TCP 的 KeepAlive 保活机制:是 TCP/IP协议层内置的功能,需要手动打开 TCP 的 KeepAlive 功能。通过这种方案实现心跳检测,优点是简单,缺点是 KeepAlive 实现是在服务器侧,需要 Server 主动发送检测包,此时如果客户端异常,可能出现很多不可用的 TCP 连接。这种连接会占用服务器内存资源,导致服务器端的性能下降。

应用层主动探测:一般是 Client 向 Server 发起的,主要解决灵活性和 TCP KeepAlive 的缺陷。探测流程一般是客户端定时发送保活心跳,当服务端连续几次没收到请求,就断开连接。这样做的好处是,可以将压力分担到各个客户端,避免服务端的过载。

错误处理

从请求的角度,有些错误是重试可以恢复的,比如连接断开、Leader 切换、发送偶尔超时和服务端某些异常等;有些错误是不可恢复的,比如 Topic / 分区不存在、服务端 Broker 不存在、集群和 Broker 长时间无响应等。所以,在客户端的处理中,也会将错误分为可重试错误和不可重试错误两类。

producer-exception

因为网络环境、架构部署的复杂性,集群可能出现短暂网络抖动、Leader 切换等异常,可重试错误就是这类通过一次或多次重试可能恢复的异常;不可重试的错误,就是不管如何重试都无法恢复的异常。

虽然实现思路很直接、很简单,但在客服端 SDK 的实现过程中,错误处理是一个包含很多细节的工作,一般需要考虑下面几个常见的点:

  • 如何定义可恢复错误和不可恢复错误;
  • 完整的错误码的定义和枚举,如何定义一个好的错误码从而提高排查问题的效率;
  • 错误后重试的代码实现方式是否合理高效;
  • 判断哪些情况需要停止客户端,向上抛出异常,以免一些错误信息一直在 SDK 内空转,提高上层感知异常和排查异常的难度;
  • 日志信息打印 debug、info 以及 error 日志时,是否包含了完整的内容。

发生错误后,客户端一般会提供重试策略,接下来一起看看重试机制的实现,

重试机制

重试策略一般会支持重试次数和退避时间的概念。当消息失败,超过设置的退避时间后,会继续重试,当超过重试次数后,就会抛出消息或者将消息投递到配置好的重试队列中。

退避时间是可以配置的,比如 1s、10s 或者 60s 等。当出现错误时,就会按照退避策略进行退避,再尝试写入。一般情况下,重试是有次数上限的,当然如果想的话也也可以配置无限重试。

退避策略影响的是重试的成功率,因为网络抖动一般来说是 ms 级,某些严重的情况下可能会抖动十几秒。此时,如果退避策略设置的太短,在退避策略和重试次数用完后,可能消息还没生产成功;反过来,如果退避时间设置太长,可能导致客户端发送堵塞消息堆积。所以消息队列生产者的重试次数和退避策略的设置都是比较讲究的,需要结合业务的场景仔细设计。

另外,客户端为了满足安全传输、性能和功能方面的需求,客户端都会支持传输加密、压缩、事务、幂等等功能。

生产相关基础功能

客户端寻址机制

MQ 作为一个分布式系统,分区会分布在集群的不同节点上。所以从客户端的角度看,往服务端写入数据的时候,服务端有那么多台节点,请求该发送給台节点呢??

看见这个问题,可能大部分开发者都会觉得这并不是什么难题,类似我们发送 HTTP 请求,手动指定目标 Broker 的 IP 就行了。就是说在生产者写数据到 Broker 的时候,在代码里面手动指定分区对应的对端的 Broker 地址,然后将数据写到目标 Broker。

这个思路没问题,但是我们手动指定对端 Broker 地址的时候,怎么知道这个分区在这台 Broker 上的对应关系存在哪里呢??为了解决这个问题,就从而提出了 Metadata 寻址机制和服务端内部转发两个思路。

1.Metadata 寻址机制

服务端会提供一个获取全量的 Metadata 的接口,客户端在启动时,首先通过接口拿到集群所有的元数据信息,本地缓存这部分数据信息。然后,客户端发送数据的时候,会根据元数据的内容,得知服务端的地址是什么,要发送的分区在哪台节点上。最后根据这两部分信息,将数据发送到服务端。

producer-seek-metastore

消息队列的元数据是指 Topic、分区、Group、节点、配置等集群维度的信息。比如 Topic 有几个分区,分区的 Leader 和 Follwer 在哪些节点上,节点的 IP 和端口是什么,有哪些 Group 等等。

在 Metadata 寻址机制中,元数据信息主要包括 Topic 及其对应的分区信息和 Node 信息两部分。可以看一下 Kafka 的元数据信息结构;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
主题分区元数据:
{
    "test1": {
        "Topic": "test1",
        "Partitions": [
            {
                "ID": 0,
                "Error": {},
                "Leader": 101194,
                "Replicas": [
                    101194,
                    101193
                ],
                "Isrs": [
                    101194,
                    101193
                ]
            }
        ],
        "Error": {}
    }
}

节点元数据:
[
    {
        "ID": 101195,
        "Host": "9.130.62.0",
        "Port": 6097
    },
    {
        "ID": 101194,
        "Host": "9.130.62.1",
        "Port": 6096
    },
    {
        "ID": 101193,
        "Host": "9.130.62.2",
        "Port": 6095
    }
]

客户端一般通过定期全量更新 Metadata 信息和请求报错时更新元数据信息两种方式,来保证客户端的元数据信息是最新的。目前 Kafka 也是用的这个方案。

2. 服务端内部转发机制

另外一种服务端内部转发机制,客户端不需要经过寻址的过程,写入的时候是随机把数据写入到服务端任意一台 Broker。具体思路是服务端的每一台 Broker 会缓存所有节点的元数据信息,生产者将数据发送给 Broker 后,Broker 如果判断分区不在当前节点上,会先找到这个分区在哪个节点上,然后把数据转发到目标节点。

producer-seek-resend

这么做的好处是,分区寻址在服务端完成,客户端的实现成本比较低。但是生产流程多了一跳,耗时增加了。另外服务端因为转发多了一跳,会导致服务端的资源损耗多一倍,比如 CPU、内存、网卡,在大流量的场景下,这种损耗会导致集群负载变高,从而导致集群整体性能降低。所以这种方案不适合大流量、高吞吐的消息队列。

解决了请求要发送給哪个节点,下面就要思考消息数据要写入到哪个分区呢。

生产分区分配策略

我们知道,数据可以直接写入分区或者写入对应的 Topic。写入 Topic 时,最终数据还是要写入到某个分区。这个数据选择写入到哪个分区的过程,就是生产数据的分区分配过程。过程中的分配策略就是生产分区分配策略。

producer-partition

一般情况下,消息队列默认支持轮询、按 Key Hash、手动指定和自定义分区分配这四种分区分配策略。

轮询是所有消息队列的默认选项。消息通过轮询的方式依次写到各个分区中,这样可以保证每个分区的数据量是一样的,不会出现分区数据倾斜。

分区数据倾斜是指一个 Topic 的每个分区的存储的数据量不一样,有的分区数据量大,有的小,从而导致硬件的负载不均,集群性能出现问题。

既然能解决数据倾斜,那是不是使用轮询就是最优解了呢??答案是否定的,因为如果我们需要保证数据的写入是有序的,轮询就满足不了。因为在消费模型中,每个分区的消费是相互独立的,如果数据依次写入多个分区,在消费的时候就无法保持顺序。所以若要想数据有序,就需要保证 Topic 只有一个分区。这也是另外两种分配策略的思路。

按 Key Hash 是指根据消息的 Key 算出一个 Hash 值,然后与 Topic 分区数取余数,算出一个分区号,将数据写入到这个分区中。公式参考:

1
partitionSeq = hash(key) % partitionNum;

这种方案的好处是可以根据 Key 来保证数据的分区有序。比如某个用户的访问轨迹,以客户的 AppID 为 Key,按 Key Hash 存储,就可以确保客户维度的数据分区有序。缺点是分区数量不能变化,因为变化后 Hash 值就会变,导致消息乱序。并且因为每个 Key 的数据量不一样,容易导致数据倾斜。

手动指定很好理解,就是在生产数据的时候,手动指定数据写入哪个分区。这种方案的好处就是灵活,用户可以在代码逻辑中根据自己的需要,选择合适的分区,缺点是业务需要感知分区的数量和变化,代码实现相对复杂。

除了这三种默认策略,消息队列也支持自定义分区分配策略,让用户灵活使用。通常会在内核提供 interface 机制,用户如果需要指定自定义分区的分区分配策略,可以实现对应的接口,然后配置分区分配策略。比如 Kafka 可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口实现自定义分区策略。

批量语义

为了提高写入性能,有的生产者客户端会提供批量(Batch)写入的语义。客户端支持批量写入数据的前提是,需要在协议层支持批量的语义。否则就只能在业务中自定义将多条消息组成一条消息。

批量发送的实现思路一般是在客户端内存中维护一个队列,数据写入的时候,先将其写入到这个内存队列,然后通过某个策略从内存队列读取数据,发送到服务端。

producer-batch

批量发送数据的策略和存储模块的刷盘策略很像,都是根据数据条数或时间聚合后,汇总发送到服务端,一般是满足时间或者条数的条件后触发发送操作,也会有立即发送的配置项。

Kafka 是按照时间的策略批量发送的,提供了 linger.ms、max.request.size、batch.size 三个参数,来控制数据批量发送。

linger.ms:设置消息延迟发送的时间,这样可以等待更多的消息组成 Batch 发送。默认为 0 表示立即发送。

max.request.size:生产者能够发送的请求包大小上限,默认为 1MB。

batch.size:生产者会尝试将业务发送到相同的 Partition 的消息合包后再进行发送,它设置了合包的大小上限。

为了支持对于性能和可靠性有不同需求的业务场景,客户端一般会支持多种数据发送方式。

数据发送方式

消息队列一般也会提供同步发送、异步发送和发送即忘三种形式。

同步和异步更多是语言语法的实现,同步发送主要解决数据发送的即时性和顺序性,一步发送主要考虑性能。下面,我们来重点看一下发送即忘(这个不太好理解)。

发送即忘指消息发送后不关心请求返回的结果,立即发送下一跳。这种方式因为不用关心发送结果,发送性能会提升很多。缺点是当数据发送失败时无法感知,可能会有数据丢失的情况,所以通常适用在发送不重要的日志等场景。Kafka 提供了 ack = 0 来支持这种模式。

讲完了发送相关的功能设计,接下来我们看一下管控操作在客户端中的实现方式。

集群管控操作

集群管控操作一般是用来完成资源的创建、查询、修改和删除等集群管理动作。资源主要包括主题、分区、配置以及消费分组等等。

命令行工具是最基础的支持方式。如下图所示,它的底层主要通过包装客户端 SDK 和服务端的相关功能接口进行交互。程序编码上一般由命令行、参数包装和底层 SDK 调用三部分组成。主要流程是接收参数、处理参数和调用 SDK 等相关操作。

producer-cluster

有的消息队列也会支持 HTTP 接口形式的管控操作。好处是因为 HTTP 协议的通用性,业务可以从各个环节发起管控的调用,不是强制使用 admin SDK。另外客户端封装 HTTP 接口实现命令行工具的成本也比较低。

总结

消息队列生产者客户端的设计,主要关注下面三个部分:

  • 网络模块的开发和管理。这部分是为了完成和服务端的通信,比如请求和返回的构建、心跳检测、错误处理和重试机制等;
  • 根据服务端提供的各个接口的协议结构,构建请求,完成序列化和反序列化后,通过网络模块发起请求并获得返回;
  • 在前面两步的基础上,添加各个业务层面的功能,比如生产、消费、事务、幂等和 SSL 这类。

producer-summary

客户端和服务端交互的过程中,一般要经过元数据寻址,以正确找到分区所在的 Broker。如果我们想避免客户端寻址,只能在服务端内进行转发,但有性能和资源的损耗。所以在主打吞吐的消息队列组件中,转发的方案用的很少。

从生产者的的角度来看,需要重点关注分区分配策略、批量语义和发送方式三个方面。请求内容构建和序列化属于协议设计的内容,主要取决于协议的具体设计和序列化 / 反序列化框架的选择。