An introduction to state-based CRDTs
原文链接: https://www.bartoszsypytkowski.com/the-state-of-a-state-based-crdts/
作者: Bartosz Sypytkowski
(翻译)基于状态的CRDT简介
这是一个在以往的一些分享中已经谈论过的主题,但却从未写过。 这里我想写一点关于CRDT(冲突无关复制数据类型)的内容: 它要解决什么样的问题,并通过一些基本的代码实现(F#)来帮助你理解怎样去使用CRDT。
动机
CRDT是解决一个常见的分布式环境下数据同步问题的答案。 尽管人们熟知分布式领域中的此类问题,在过去已经进行了许多尝试来解决, 通常是去中心化的两阶段提交事物的变种。 但他们无一例外有以下的问题:
- 我们的一个假设是,所有的参与者总是在事务发生期间是可用的,而经常这个假设恰恰是不成立的(比如在边缘计算和移动应用的场景中)
- 多阶段提交和使用议会制的方案需要在参与者之间进行多次往返通信。这附带带来延迟和吞吐量的开销。而且,当机器增多或者通信距离增加时,这种方式有降低可扩展性的趋势。多数场景中,一旦跨越单个数据中心机房,使用分布式事务都是行不通的
- 这些方案通常确定一个单一的主节点来进行排他写操作,或者作为单一的数据源。在某些情况下,比如上面提到的场景中,都是不可接受的。
这并非是说先有的解决方案不好。 我们期望在一个更广阔的可能的解决方案的范围中, 找到解决特定条件下的问题的更佳方式。 这其中我期望呈现的一个问题是:
试想你需要建造一个全球范围的视频流服务。
每当用户上传一个视频文件的时候,
都需要复制到位于其他大陆的数据中心中,以便维持良好的延迟和吞吐量,
从而得到更好的用户体验。
不仅如此,
我们还希望把视频的播放次数展示给用户。
视频上传是一个主-从复制的很好的例子。 但是仅仅当添加一个播放数量的小小功能时,事情却变得复杂起来。 在整个地球许多并发用户、又是写操作多的特点下,使用诸如计数器的方式恐怕不是一个好主意, 因为对于热点视频可能会导致阻塞。 这个问题的本质允许我们降低强一致性来换取更高的可用性,因此并不需要事务。 但是如先前所述, 先有方案大多数在复制资源的写操作上又是采取排他的方式实现的。 正因如此,CRDT和多主场景有了发挥的空间。
使用场景和实现
除开这些简单的例子外,有许多我们在现在的工业应用中遇到的例子:
- 亚马逊使用CRDT来保持购物车的同步。他们也发布了名为Dynamo的数据库来允许他们的受众来使用CRDT
- Riak是这个领域最著名的解决方案。他们一个最知名的客户是Riot游戏公司(英雄联盟背后的公司),使用Riak来实现游戏中的聊天
- Rovio(愤怒的小鸟背后的公司)在他们的广告平台中使用冲突无关的计数器来确保展示量在离线的场景下也能正确工作
- SoundClound有他们自己的使用Go和Redis的最后写成功集合(Last-Write-Wins Set)实现,叫做
Roshi,他们用它来做观察者管理
- TomTom[1]使用CRDT来管理导航数据
- CREAustralia使用CRDT来做点击流分析
除此之外,还有一些其他的解决方案:
- AntdoteDB是另一个具有创意的最终一致性数据库。它有一些独特的功能,其中之一是其事务支持最终一致性环境
- Akka.DistributedData是一个Akka(以及Akka.NET)编程模型的插件,在Akka集群上暴露了多个CRDT类型
- Redislabs提供了CRDB作为他们企业化Redis解决方案的一部分
- Cassandra 和 ScyllaDB支持最终一致性的计数器
冲突无关意味着什么
冲突无关是一个比较模糊的描述,但它表达了一个简单的态度: 我们是操作在一个无需排他写入,可以检测并发更新并执行确定的、自动的冲突解决的数据结构之上。 这并不意味着冲突永远不会发生, 而是我们总是能根据这个数据结构维护的元数据本身来得到一个确定的输出。 这里的核心数据结构包括计数器、寄存器和集合, 从他们我们又可以组合出更多复杂的类型,例如字典(map)、图甚至JSON。
CRDT的类型
我们可以将CRDT分为两大类:基于状态的(convergent——聚合型)和基于操作的(commutative——交换型)。 不论谈到哪种类型, 均包含两个重要的部分:数据复制协议和状态转换算法。
在实践上,两种类型在实现上存在巨大差异, 他们的重心也关注在不同的地方。 由于我们需要一些额外的元数据来产生自动化的冲突解决方法, 基于状态的CRDT将其作为数据结构的一部分, 而基于操作的CRDT倾向于将其更多放到复制协议本身。 这里,我们将介绍其中一种简单的基于状态的方案。
聚合复制数据类型
基于状态的CRDT中,最最重要的操作是Merge(合并)
方法。
它所做的仅仅是拿到相关逻辑实体的两个副本,
将更新后的状态作为输出。
如果发生了任何冲突,
需要由merge操作来解决。
除此之外,merge操作必须满足下面三个性质,才能使我们使用时更加方便:
- 交换律[math]\displaystyle{ x \cdot y = y \cdot x }[/math]和结合律[math]\displaystyle{ (x \cdot y) \cdot z = x \cdot (y \cdot z) }[/math],以便我们即使乱序执行merge操作也可以正确得到相同的最终状态
- 幂等性[math]\displaystyle{ x \cdot x = x }[/math],以使我们无需关心数据复制层潜在的重复发送问题
这些性质并不容易得到保证,但是你将可以看到在符合条件的情况下,我们仅用以下两个基本的操作就可以走多远:
- 合并两个集合
- 取两个值中的最大值
有了这些操作在手,我们数据复制层唯一的要求就是保证所有的状态变更最终分发到所有的副本中。 这样问题变成: 为了应用merge操作,我们需要每次变化时携带完整的数据结构信息。 试想仅仅是因为某人添加了一个额外的元素,我们就需要发送包含1000个元素的整个集合到传输线路中。这个问题可以通过一种叫做增量状态CRDT(delta-state CRDTs)的方式解决,后续我将再另行讨论。
现在,让我们来看看一些基本的数据结构,以及如何将其组装成更高级的类型。 值得注意的是,这些示例是为向你展示问题的解决方案而有意简化为之。
计数器
计数器是第一种CRDT类型,我们现在来进行介绍。它提供在多个不同机器上并发进行读取、增加和减少(非必需)计数器值的基本能力,而无需加锁。
单纯增长(Growing-only)计数器
也称为GCounter。它是一种值只能增加的计数器。 它的其中一种应用场景是作为页面查看次数计数,我们在动机一节中曾提及。 简单来说,它是一个副本ID/部分计数值的映射。
为了计算整个计数器的值,我们需要将所有已知的副本中的计数器中的值进行累加。
如果我们期望增加计数器的值,只需要简单将当时运行的某个副本中对应的计数器值增加即可。 单个副本永远不能增加其他副本的值,这是至关重要的。
// 译者注:
// 这里问题简化为每一个副本实例一个值,因此GCounter实质是所有副本和对应值的Map
// 根据设定,只有副本自己可以修改自己值,因此合并冲突时最大的值,一定是最后修改(increment)的那个值
module GCounter =
type GCounter = Map<ReplicaId, int64>
let zero: GCounter = Map.empty
let value (c: GCounter) =
c |> Map.fold (fun acc _ v -> acc + v) 0L
let inc r (c: GCounter) =
match Map.tryFind r c with
| Some x -> Map.add r (x + 1) c
| None -> Map.add r 1 c
let merge (a: GCounter) (b: GCounter): GCounter =
a |> Map.fold (fun acc ka va ->
match Map.tryFind ka acc with
| Some vb -> Map.add ka (max va vb) acc
| None -> Map.add ka va acc) b
对于merge操作,我们简单将两个计数器中的key/value键值对进行合并。 当我们检测到两个计数器中包含一个副本的不同值时,我们简单取最大的值即可。 这是一个正确的行为, 因为我们知道计数器的值只能被增加。 也正因为如此,这种类型的CRDT是不支持减少操作的。
可增/减计数器
下面,我们来看看可增/减计数器,又称之为PNCounter,它同时支持增加和减少操作。 我觉得,它可以作为我们展示如何通过简单的CRDT构建高级CRDT的一个小例子。
这里关键的技巧在于,PNCounter是由两个GCounter所组成, 其中一个用来记录增加,而另外一个用来记录减少。 因此,减少操作仅仅是用增长计数器来记录减少次数的部分用途而已。 我们的输出值则是两者的差值:
module PNCounter =
type PNCounter = GCounter.GCounter * GCounter.GCounter
let zero: PNCounter = (GCounter.zero, GCounter.zero)
let value (inc, dec) = GCounter.value inc - GCounter.value dec
let inc replica (inc, dec) = (GCounter.inc replica inc, dec)
let dec replica (inc, dec) = (inc, GCounter.inc replica dec)
let merge (inc1, dec1) (inc2, dec2) =
(GCounter.merge inc1 inc2, GCounter.merge dec1 dec2)
如你所见,合并操作又一次是显而易见的:分别将两个PNCounter中对应的GCounter执行merge即可。
除此之外还有另外的计数器类型,但我不打算在这里展开。 其中一个十分有趣的例子是有界计数器(Bounded Counters), 你可以为计数器提供任意的上下界来决定是否达到特定的阈值。
关于向量时钟的说明
我们已经提到了计数器的实现。在更进一步之前, 我想是时候聊一下向量时钟(vector clocks)和时间的概念了。
在许多系统中,使用时间戳是定义因果律(一种happend-before关系)的一种标准做法。 但在高频的分布式系统中使用时间戳是存在一些问题的;
- 操作系统,尤其是在不同机房的不同机器上,是可能产生时钟偏移(clock skew)的,在写频繁的场景下十分蛋疼。此外另一种异常也可能发生:闰秒bug或者甚至两个线程间出现不合法的时间值
- 尽管时间戳有可能可以给我们决定最后一次修改(我们将在一分钟内使用)提供必要信息,他们无法告诉我们在修改发生那一刻“世界的状态”。这意味着,我们无法得知一个更新是已经包含另一个,还是他们是并发发生的。
这就是向量时钟存在的价值。他们是一种形式上的逻辑时钟, 由每个副本对应的单调增长的值来表示。听起来很像上面说的GCounter。
我们为什么要在这里讨论他们呢?是因为向量时钟内部的实现看起来非常接近GCounter。 他们主要的区别在于向量时钟具备部分比较的能力。
不同于标准的比较操作,部分比较允许我们在无法决定两个值是更大、更小或者相等的关系时得到第四个可能的结果——一个”不明确“的结果。我们可以利用这一点来识别两个时钟上发生的并发更新。
这里我们是这样定义向量时钟的:
type Ord =
| Lt = -1 // lower
| Eq = 0 // equal
| Gt = 1 // greater
| Cc = 2 // concurrent
type VTime = GCounter.GCounter
module VClock =
let zero = GCounter.zero
let inc = GCounter.inc
let merge = GCounter.merge
let compare (a: VTime) (b: VTime): Ord =
let valOrDefault k map =
match Map.tryFind k map with
| Some v -> v
| None -> 0L
let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq
let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq
(akeys + bkeys)
|> Seq.fold (fun prev k ->
let va = valOrDefault k a
let vb = valOrDefault k b
match prev with
| Ord.Eq when va > vb -> Ord.Gt
| Ord.Eq when va < vb -> Ord.Lt
| Ord.Lt when va > vb -> Ord.Cc
| Ord.Gt when va < vb -> Ord.Cc
| _ -> prev ) Ord.Eq
比较的算法看起来很长,但其实十分简单——我们一对对比较两者VTime中的条目(如果某个条目在另一方不存在,则计为0):
- 如果副本中对应的的所有值都是相等的,那么时钟也相等
- 如果左边所有的值都小于或者等于右边对应的值,则左边小于右边
- 如果左边所有的值都大于或者等于右边对应的值,则左边大于右边
- 任何同时包含大于/小于情况被视为并发更新
如果你对这个分布式系统的时钟更感兴趣,我推荐你去看Kavya Joshi的一个非常好的分享:在现实系统中记录时间
寄存器
集合
接下来是什么?
- ↑ 译者注:TomTom是是一家荷兰的地图厂商,包括个人导航设备、高精地图和自动驾驶业务等。