Partition

Partition的目标是用来提升读写的请求吞吐的,每一个Partition保留的整个数据集的一部分,针对不同数据的请求会被路由到不同的Partition中

  • MongoDB,ES,SolrCloud中的Shard
  • HBase中的 Region
  • BigTable中的 Tablet
  • Cassandra和Riak中的 VNode
  • Couchbase中的 vBucket
    1 Partition & Replication
    分区和复制通常是结合使用的,每一条记录落在特定的分区中,但是分区中有若干个Node形成一个Cluster,提升可用性 (Fault-Tolerance)。
    Leader-Follower 架构的复制 加上 Partition 构成的整体架构如下图所示:
    可以将每一个Partition理解为一个Database,Partition中的Replica可以分布在不同的Server Node上。

推荐阅读:为了更好的阅读体验,请移步至飞书 Partition

2 Partitioning of KV Data
理想状态下:如果将一个完整的数据集分为10个子集,那么吞吐量理论上来说应该是划分前的十倍。如果划分的数据倾斜(Skew),在极端情况下,一台机器可能需要承担所有的数据请求(这种机器叫做HotSpot),此时读写瓶颈仍然收敛到单台机器的性能上,分区失去了意义。
最简单的分区策略是随机分配(Random),能够将数据均匀的分布到每一个Partition上,但是如果需要Read,则必须遍历所有的Partition,找到对应的Key之后才能读Value。事实上,有比随机分配更好的策略。
2.1 Partition By Key Range
将一段连续的Key划分到一个分区中,如果能够知道待查询key属于的范围,就能够确定key的分区;确定了key所在的分区之后,就可以直接向分区所在的Node发起请求。
直接按照字典序对Key进行分区的话,可能会造成数据倾斜,比如在下图的分区1中,包含的是a-b开头的key,而在分区12中包含的是t-z的key;假设a-b开头的单词大于t-z开头的单词,就会导致分区1中的数据量大于分区12中的数据量。因此,为了让数据更均匀的分布,需要让分区的边界能够根据Data进行自适应。
分区的边界设置通常由两种方法:

  • DBA手动设置;
  • 由DataBase自动设置(Reparation);
    应用实例:
    包括:HBase, RethinkDB, MongoDB, BigTable。

在每一个分区中,可以按序组织Key(SSTables、LSM-Trees),这样有利于范围查询。但是在某些场景下,存在一个弊端,比如:存储网络传感器数据,按照TimeStamp对Key进行排序,那么在追加写的时候,可能只会写当天的数据,那么保存当天数据的Partition就会成为HotSpot,而其他的Partition都是IDLE状态。
为了防止这种情况,可以考虑使用其他的Key作为“主键”,比如:按照传感器的ID进行分片,查询的时候,按照不同的传感器进行范围查询,然后聚合。
2.2 Partition By Key Hash

可以使用Hash函数对上面的String Key Range进行优化,使用Hash函数将一个String映射为一个无符号整型,然后将整数划分为不同的Partition。MongoDB,Cassandra 使用MD5作为哈希函数,Voldemort使用Fowler-Noll-Vo函数作为哈希函数。Java的hashcode和Rust的 Object#hash有一个问题:在不同的进程下返回的结果不同。
使用hash 做了散列之后,丢失了原本按照字典序划分的Partition的天然排序性,也即使用了Hash散列之后,Partition不再高效地支持范围查询了。在MongoDB中,所有的范围查询会被发送给所有Partition,Riak,CouchBase和Voldemort都不支持按照主键进行查询。
Cassandra中使用了一种复合主键 primary_key = (key1, key2, key3),第一项用于分片,其他几项用于索引来对数据进行排序(SSTable)。针对主键key1的范围查询是不支持的(因为分布在不同的shard上),但是如果指定了key1,就能定位到一个分片,然后根据其他列就能够支持范围查询。
2.3 Skewed Workloads & Relieve Hot Spots
极端情况下的数据不平衡
实际中的场景:某一个大V的动作或者评论,可能引起很多粉丝去访问同一个ID(Action或者User),这个时候即使对ID做了Hash处理,但因为同一个ID的哈希值是一样的,所以粉丝们大量的请求还是会打到同一个Partition上。
解决:使用一些先验。
如果我们事先知道一些key可能会非常hot,我们可以在Key的前后添加一些随机值,只需要两个decimal
Random number就能让数据分布到100多个分片上。(将同一个Key的数据分布到不同的Partition上。)
3 Partitioning And Secondary Indexes
讲的主要是ES模型的分片原理
次级索引通常不是唯一定义一条元素的标识,DDIA 中指出次级索引是关系型数据库的bread and butter,许多KV数据库 (HBase, Voldemort) 没有实现次级索引,因为这会极大地增加实现复杂度。但有一些NoSQL如(Riak)开始使用次级索引,因为次级索引更有利于数据建模。次级索引的存在是为了支持更复杂的查询请求,它也是Solr and ElasticSearch引擎存在的意义。
针对次级索引的分片可以分为两种类型:

  • 基于document分片
  • 基于term分片
    3.1 Partitioning Secondary Key By Document
    根据主键分片,次级索引在不同的Shard中是独立存在的,即每条次级索引只会关联到该Shard中的数据,而不会跨Shard关联。基于主键分片的次级索引的组织方式也称为:Lcoal Index

这种方式存在一个问题,如果仅根据颜色或者制造商进行查询,而不知道数据的主键,则需要对所有分片进行查询然后聚合,这种方式叫做: Scatter/Gather(可以使用Map-Reduce)。
应用实例:
MongoDB, Riak, Cassandra, ElasticSearch, SolrCloud, VoltDB.
3.2 Partitioning Secondary Key By Term
按照次级索引进行分片,比如:color按照字典序划分:

  • a-r在第一个分区
  • s-z在第二个分区
    这种组织次级索引分片的方式叫做term-based,term这个名词来源于全文索引,其中的term是text中的每一个单词。term-based的方式更适合用于进行某些有实际意义的范围查询,因为主键通常是没有语义信息的,比如:想寻找价格在 (x, y) 范围内的车辆,如果希望流量在不同节点中分布的更加均匀,则可以使用Hash对term进行处理。

Term-based 的组织方式有利于多读写少的场景,因为在读的时候,Client能够根据term定位到一个分片;然而在处理写请求的时候,可能需要涉及多个分片,这时候需要开启分布式事务,但大多数分布式数据库都不支持分布式事务。
在实际应用中,term-based 索引的更新通常都是异步地,如果对于一条索引的读请求距离上一次写请求很短,是有可能读到旧数据的。例如:Amazon-DynamoDB官方声明了他们global-index的更新会存在秒级延迟,如果基架出问题了则可能延迟更长的时间。

4 Rebalancing Partitions
Rebalancing (重分区 / 重平衡):指的是数据和请求从一个服务节点(Replicas)移动到另一个服务节点上的过程。

触发重分区的时机

  • 查询量上涨,所以需要更多的CPU来处理请求。
  • 数据大小上涨,需要更多的Disk 和 RAM存储空间。
  • 一个节点挂了,需要拉起另一个保存了数据副本的节点继续服务。
    重平衡的目标
    这三点基本在Implementation部分的设计文档中都实现了,负载均衡使用的是Hash,为了保证第三点,使用了多次平均算法。
  • 保证负载均衡。假设重平衡之后出现了HotSpot,极端情况下,所有请求都往HotSpot打,HotSpot会被请求撑爆,然后进行重平衡,又出现HotSpot……一台机器一台机器被打挂,导致整个Cluster不可用。
  • 在重平衡的过程中,仍然需要支持读写请求。
  • 尽可能少的移动数据,使用多次平均算法来保证尽可能少的移动数据。
    High-Level 思想是每次寻找包含Shard最多的节点Max和包含Shard最少的节点Min,然后从Max节点找一个Shard移动给Min节点。
    while (true) {
    int source = getGidWithMaximumShards(gID2ShardIDs);
    int target = getGidWithMinimumShards(gID2ShardIDs);
    if (source != 0 && gID2ShardIDs.get(source).size() - gID2ShardIDs.get(target).size() <= 1) {
    break;
    }
    Integer swapShard = gID2ShardIDs.get(source).stream().findAny().orElse(null);
    gID2ShardIDs.get(source).remove(swapShard);
    gID2ShardIDs.get(target).add(swapShard);
    }

4.1 重平衡算法
4.1.1 Mod N
最容易想到的重平衡算法是使用Mod N算法,但是在这个场景下非常不推荐使用Mod N算法,因为Mod N算法在N改变的时候,大多数的Key都需要移动,带来的开销巨大。
所以一般都是将key的哈希值分配到一个范围中,将某一个范围分配给一个节点存储。

上图这种情况下,n从三增加到了四,需要移动两个节点,读者可以将k设置得更大,可以观察到这种 Mod N的算法,会在重平衡的时候移动很多数据。
对比之下,将key按照Hash值进行范围映射的方法,也是我们熟知的Consistency Hashing。

所以,经过以上分析,几乎不会有infra产品选择Mod N这种重平衡算法。
4.1.2 Fixed Partition Number
思路:在数据库创建的时候,就将数据库划分为N个Partition 并且之后不会再改变,当有新节点加入的时候,每一个老节点都会分配一定数量的Partition给它。当有一个节点failover的时候,它的Partition会被均匀的分配给仍然存活的Node。
应用案例:Riak, ElasticSearch, CouchBase, Voldemort都使用了这种重平衡方法。以及在我自己实现的KV Store中也是使用的这种方式,Partition的大小一开始就被设置为10,随着Server的进出,重新分配Partition 即可。

有什么问题?
Partition的数量不能设置的过大,也不能设置得过小。如果Partition数量设置得过小,未来随着数据增长,Partition也会随之膨胀。如果Partition数量设置得过大,那么ShardMaster 就需要维护更多的元数据信息,效率不是很高。
事实上,选择合适的PartitionNumber是一件十分困难的事情,因为数据量会根据业务的变化而变化(即:数据量是不固定的)。如果一个Partition承载了过多的数据,那么故障恢复和重平衡的成本将会很大(全量复制一个大Partition)。如果Partition设置得过小,有点不划算,类似用牛刀杀鸡的感觉。
4.1.3 Dynamic Partitioning
Fixed Partition Number对于使用Key Range (refer 2.1)进行分片的数据库是非常不友好的,因为某一个范围的Key只能被分配到特定的Partition上,在刚开始的时候0-100,000都被分配到Partition-1上,此时Partition-1满负载运行而其他Partition几乎没有流量。

思路:动态扩展和合并分区,当分区大小超过设定阈值的时候进行分裂,小于设定阈值的时候进行合并。(有点类似B树)。HBase 和 RethinkDB使用的是这种分区策略。
对于Fixed Partition Number出现的问题,HBase 和 MongoDB使用了一种叫做pre-splitting的策略对其进行划分,即将一部分key set分配到idle的机器上,但这种操作需要DBA对数据的分布有一定的先验。
4.1.4 Partition proportionally to Nodes
4.1.2的Partition 大小 正比于数据集大小
4.1.3的Partition 数量 正比于数据集的大小

Cassandra 和 Ketama 提出了另一种分区方式,让Partition的数量正比于Server Node的数量,即:每个Server Node上保存X个Partition,Server Node增加的时候Partition也增加,反之减少。
这个算法的神奇之处在于,如果Server Node不增加,那么Partition的大小会随着数据量的增加而增加,在DBA手动扩容之后,每个Partition的大小又会下降到平稳的水平。整体来看是结合了4.1.2和4.1.3的优势,并且能够使CPU和存储资源保持稳定的一种算法。
Cassandra中设定每个服务节点上拥有256个Partition,当一个新的服务节点加入后,会随机选择256个Partition进行分裂,然后由新节点管理分裂出来的Partition数据。这种方式要求Key 是按照Hash的方式进行分片的,整体的算法有点类似Consistency Hashing。
4.2 Automatic / Manual Rebalancing
关于手动重平衡或是自动重平衡的问题,在我们系统中使用的是Manual 扩容的方式,可能考虑到重平衡是一项比较危险的操作,因为有可能一台HotSpot的Delay时间突然增加,此时其他节点将其判定为Dead并开启重平衡,这时候网络上会因为有更多的数据流动而变得更加拥塞。(TCP拥塞避免就是这么来的)。
所以笔者认为Rebalancing的操作是一定需要人工卡点的,即使系统自动判断应该进行重平衡,也应该加入人工校验的流程,比如:不能在直播的高峰期对Partition进行重平衡,否则整个系统的稳定性和可用性都会受到冲击。
4.3 Requesting Routing
问题是重平衡之后,Client怎么知道我的请求应该往哪个Socket上发呢?
服务发现:不仅是DB系统,基本上任何Software Application后台都需要服务发现的功能,以此来保证系统的高可用。基本上大厂都会有自己的服务发现系统,比如TikTok的Consul,阿里的Dubbo (? Not quite sure).

三种服务发现的方式:

  • 允许Client访问任一Node(Round Robin),如果路由到的节点能处理请求则直接处理,否则将Client的请求进行转发。
  • 添加一层Routing Tier,来决定Client的请求应该发送给哪个Node
  • Client感知Partition信息,并且直接向对应的Node发起请求。

问题是Node之间的一致性 -> Distributed Consensus。

  • 一种方法是使用一个分布式协调服务来管理元数据信息,向Kafka,HBase,SolrCloud使用ZooKeeper。

  • 另一种方式是在Cassandra 和 Riak中使用的gossip protocal,每次检测到集群状态变化的时候就进行广播,然后Client按照方式1进行请求。

  • CouchDB使用了方式2来进行请求路由,Routing Tier的实现组件为Moxi。
    Conclusion
    Partition 的目标是让Data和查询请求在不同的机器上更加均匀的分布,避免HotSpot的出现。这对系统设计提出的要求是:

  • 选择合适的 Partition Schema:By Range, By Hashing.

    • By Range:
      当Key是按照某种顺序排序的时候,可以按照Key Range进行Partition,1-100映射到Partition-1,101-200 映射到Partition-2。但是By Range这种方式可能会产生HotSpot,假设高热主播都是注册较早的用户,他们都被划分在Partition-1中,当他们同时开启直播的时候,Partition-1 的请求量暴增,而其他的Server Node基本都处于Cold的状态,显然是不均衡的。
    • By Hashing:
      先将Key进行一次Hash,将Hash值按照Range进行分区,这样做虽然破坏了有序性,但是能够使数据更加均匀的分布。
      还可以使用以上两种分区方式并用的方式,按照某个Key Hashing进行分区,分区内按照另一个Key进行排序。
      比如在我们Global Live的系统中,通常是按照User-ID进行分片,然后每个分区内,Primary-Key是有序的。
  • Rebalancing Partition.

    • Mod N:一般不用。将Key Hashing分配给固定的节点。
    • Fixed Partition Number:一开始设置好分片的大小,在使用过程中不可改变。
    • Dynamic Partitioning:当分区大小扩张当一定程度的时候进行分裂,当分区大小小于一定值的时候进行合并。
    • Partition Proportionally to Nodes: 每个Node管理固定数量的Partition。
      Partition and Secondary Key
      分区和次级索引的关系
  • 按照主键分区,次级索引伴随主键进行分片;适合写多的次级索引。

  • 按照次级索引分区。适合读多的次级索引。
    Implemention
    笔者也按照NUS 5223的Syllabus实现了一个Sharded KV Store,能够支持分片在不同的Replicas之间移动,负载均衡算法使用的是普通Hash。设计文档可以参考:
    Sharded Transactional KV 设计文档

Some Question
看了一下Reference Title,抛出几个问题:
Java’s HashCode is not Safe for Distributed System. Why?
3% of Twitter’s Servers Dedicated to Justin Biber. Any Strategy to fix it?

Reference
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-general-nosql-design.html#bp-general-nosql-design-concepts
https://cassandra.apache.org/_/index.html
DDIA


Partition
https://garygky.github.io/2022/05/22/Partition/
Author
Kaiyuan Gan
Posted on
May 22, 2022
Licensed under