An introduction to state-based CRDTs
原文链接: https://www.bartoszsypytkowski.com/the-state-of-a-state-based-crdts/
作者: Bartosz Sypytkowski
(翻译)基于状态的CRDT简介
这是一个在以往的一些分享中已经谈论过的主题,但却从未写过。 这里我想写一点关于CRDT(冲突无关复制数据类型)的内容: 它要解决什么样的问题,并通过一些基本的代码实现(F#)来帮助你理解怎样去使用CRDT。
动机
CRDT是解决一个常见的分布式环境下数据同步问题的答案。 尽管人们熟知分布式领域中的此类问题,在过去已经进行了许多尝试来解决, 通常是去中心化的两阶段提交事物的变种。 但他们无一例外有以下的问题:
- 我们的一个假设是,所有的参与者总是在事务发生期间是可用的,而经常这个假设恰恰是不成立的(比如在边缘计算和移动应用的场景中)
- 多阶段提交和使用议会制的方案需要在参与者之间进行多次往返通信。这附带带来延迟和吞吐量的开销。而且,当机器增多或者通信距离增加时,这种方式有降低可扩展性的趋势。多数场景中,一旦跨越单个数据中心机房,使用分布式事务都是行不通的
- 这些方案通常确定一个单一的主节点来进行排他写操作,或者作为单一的数据源。在某些情况下,比如上面提到的场景中,都是不可接受的。
这并非是说先有的解决方案不好。 我们期望在一个更广阔的可能的解决方案的范围中, 找到解决特定条件下的问题的更佳方式。 这其中我期望呈现的一个问题是:
试想你需要建造一个全球范围的视频流服务。
每当用户上传一个视频文件的时候,
都需要复制到位于其他大陆的数据中心中,以便维持良好的延迟和吞吐量,
从而得到更好的用户体验。
不仅如此,
我们还希望把视频的播放次数展示给用户。
视频上传是一个主-从复制的很好的例子。 但是仅仅当添加一个播放数量的小小功能时,事情却变得复杂起来。 在整个地球许多并发用户、又是写操作多的特点下,使用诸如计数器的方式恐怕不是一个好主意, 因为对于热点视频可能会导致阻塞。 这个问题的本质允许我们降低强一致性来换取更高的可用性,因此并不需要事务。 但是如先前所述, 先有方案大多数在复制资源的写操作上又是采取排他的方式实现的。 正因如此,CRDT和多主场景有了发挥的空间。
使用场景和实现
除开这些简单的例子外,有许多我们在现在的工业应用中遇到的例子:
- 亚马逊使用CRDT来保持购物车的同步。他们也发布了名为Dynamo的数据库来允许他们的受众来使用CRDT
- Riak是这个领域最著名的解决方案。他们一个最知名的客户是Riot游戏公司(英雄联盟背后的公司),使用Riak来实现游戏中的聊天
- Rovio(愤怒的小鸟背后的公司)在他们的广告平台中使用冲突无关的计数器来确保展示量在离线的场景下也能正确工作
- SoundClound有他们自己的使用Go和Redis的最后写成功集合(Last-Write-Wins Set)实现,叫做
Roshi,他们用它来做观察者管理
- TomTom[1]使用CRDT来管理导航数据
- CREAustralia使用CRDT来做点击流分析
除此之外,还有一些其他的解决方案:
- AntdoteDB是另一个具有创意的最终一致性数据库。它有一些独特的功能,其中之一是其事务支持最终一致性环境
- Akka.DistributedData是一个Akka(以及Akka.NET)编程模型的插件,在Akka集群上暴露了多个CRDT类型
- Redislabs提供了CRDB作为他们企业化Redis解决方案的一部分
- Cassandra 和 ScyllaDB支持最终一致性的计数器
冲突无关意味着什么
冲突无关是一个比较模糊的描述,但它表达了一个简单的态度: 我们是操作在一个无需排他写入,可以检测并发更新并执行确定的、自动的冲突解决的数据结构之上。 这并不意味着冲突永远不会发生, 而是我们总是能根据这个数据结构维护的元数据本身来得到一个确定的输出。 这里的核心数据结构包括计数器、寄存器和集合, 从他们我们又可以组合出更多复杂的类型,例如字典(map)、图甚至JSON。
CRDT的类型
我们可以将CRDT分为两大类:基于状态的(convergent——聚合型)和基于操作的(commutative——交换型)。 不论谈到哪种类型, 均包含两个重要的部分:数据复制协议和状态转换算法。
在实践上,两种类型在实现上存在巨大差异, 他们的重心也关注在不同的地方。 由于我们需要一些额外的元数据来产生自动化的冲突解决方法, 基于状态的CRDT将其作为数据结构的一部分, 而基于操作的CRDT倾向于将其更多放到复制协议本身。 这里,我们将介绍其中一种简单的基于状态的方案。
聚合复制数据类型
基于状态的CRDT中,最最重要的操作是Merge(合并)
方法。
它所做的仅仅是拿到相关逻辑实体的两个副本,
将更新后的状态作为输出。
如果发生了任何冲突,
需要由merge操作来解决。
除此之外,merge操作必须满足下面三个性质,才能使我们使用时更加方便:
- 交换律[math]\displaystyle{ x \cdot y = y \cdot x }[/math]和结合律[math]\displaystyle{ (x \cdot y) \cdot z = x \cdot (y \cdot z) }[/math],以便我们即使乱序执行merge操作也可以正确得到相同的最终状态
- 幂等性[math]\displaystyle{ x \cdot x = x }[/math],以使我们无需关心数据复制层潜在的重复发送问题
这些性质并不容易得到保证,但是你将可以看到在符合条件的情况下,我们仅用以下两个基本的操作就可以走多远:
- 合并两个集合
- 取两个值中的最大值
有了这些操作在手,我们数据复制层唯一的要求就是保证所有的状态变更最终分发到所有的副本中。 这样问题变成: 为了应用merge操作,我们需要每次变化时携带完整的数据结构信息。 试想仅仅是因为某人添加了一个额外的元素,我们就需要发送包含1000个元素的整个集合到传输线路中。这个问题可以通过一种叫做增量状态CRDT(delta-state CRDTs)的方式解决,后续我将再另行讨论。
现在,让我们来看看一些基本的数据结构,以及如何将其组装成更高级的类型。 值得注意的是,这些示例是为向你展示问题的解决方案而有意简化为之。
计数器
计数器是第一种CRDT类型,我们现在来进行介绍。它提供在多个不同机器上并发进行读取、增加和减少(非必需)计数器值的基本能力,而无需加锁。
单纯增长(Growing-only)计数器
也称为GCounter。它是一种值只能增加的计数器。 它的其中一种应用场景是作为页面查看次数计数,我们在动机一节中曾提及。 简单来说,它是一个副本ID/部分计数值的映射。
为了计算整个计数器的值,我们需要将所有已知的副本中的计数器中的值进行累加。
如果我们期望增加计数器的值,只需要简单将当时运行的某个副本中对应的计数器值增加即可。 单个副本永远不能增加其他副本的值,这是至关重要的。
// 译者注:
// 这里问题简化为每一个副本实例一个值,因此GCounter实质是所有副本和对应值的Map
// 根据设定,只有副本自己可以修改自己值,因此合并冲突时最大的值,一定是最后修改(increment)的那个值
module GCounter =
type GCounter = Map<ReplicaId, int64>
let zero: GCounter = Map.empty
let value (c: GCounter) =
c |> Map.fold (fun acc _ v -> acc + v) 0L
let inc r (c: GCounter) =
match Map.tryFind r c with
| Some x -> Map.add r (x + 1) c
| None -> Map.add r 1 c
let merge (a: GCounter) (b: GCounter): GCounter =
a |> Map.fold (fun acc ka va ->
match Map.tryFind ka acc with
| Some vb -> Map.add ka (max va vb) acc
| None -> Map.add ka va acc) b
对于merge操作,我们简单将两个计数器中的key/value键值对进行合并。 当我们检测到两个计数器中包含一个副本的不同值时,我们简单取最大的值即可。 这是一个正确的行为, 因为我们知道计数器的值只能被增加。 也正因为如此,这种类型的CRDT是不支持减少操作的。
可增/减计数器
下面,我们来看看可增/减计数器,又称之为PNCounter,它同时支持增加和减少操作。 我觉得,它可以作为我们展示如何通过简单的CRDT构建高级CRDT的一个小例子。
这里关键的技巧在于,PNCounter是由两个GCounter所组成, 其中一个用来记录增加,而另外一个用来记录减少。 因此,减少操作仅仅是用增长计数器来记录减少次数的部分用途而已。 我们的输出值则是两者的差值:
module PNCounter =
type PNCounter = GCounter.GCounter * GCounter.GCounter
let zero: PNCounter = (GCounter.zero, GCounter.zero)
let value (inc, dec) = GCounter.value inc - GCounter.value dec
let inc replica (inc, dec) = (GCounter.inc replica inc, dec)
let dec replica (inc, dec) = (inc, GCounter.inc replica dec)
let merge (inc1, dec1) (inc2, dec2) =
(GCounter.merge inc1 inc2, GCounter.merge dec1 dec2)
如你所见,合并操作又一次是显而易见的:分别将两个PNCounter中对应的GCounter执行merge即可。
除此之外还有另外的计数器类型,但我不打算在这里展开。 其中一个十分有趣的例子是有界计数器(Bounded Counters), 你可以为计数器提供任意的上下界来决定是否达到特定的阈值。
关于向量时钟的说明
我们已经提到了计数器的实现。在更进一步之前, 我想是时候聊一下向量时钟(vector clocks)和时间的概念了。
在许多系统中,使用时间戳是定义因果律(一种happend-before关系)的一种标准做法。 但在高频的分布式系统中使用时间戳是存在一些问题的;
- 操作系统,尤其是在不同机房的不同机器上,是可能产生时钟偏移(clock skew)的,在写频繁的场景下十分蛋疼。此外另一种异常也可能发生:闰秒bug或者甚至两个线程间出现不合法的时间值
- 尽管时间戳有可能可以给我们决定最后一次修改(我们将在一分钟内使用)提供必要信息,他们无法告诉我们在修改发生那一刻“世界的状态”。这意味着,我们无法得知一个更新是已经包含另一个,还是他们是并发发生的。
这就是向量时钟存在的价值。他们是一种形式上的逻辑时钟, 由每个副本对应的单调增长的值来表示。听起来很像上面说的GCounter。
我们为什么要在这里讨论他们呢?是因为向量时钟内部的实现看起来非常接近GCounter。 他们主要的区别在于向量时钟具备部分比较的能力。
不同于标准的比较操作,部分比较允许我们在无法决定两个值是更大、更小或者相等的关系时得到第四个可能的结果——一个”不明确“的结果。我们可以利用这一点来识别两个时钟上发生的并发更新。
这里我们是这样定义向量时钟的:
type Ord =
| Lt = -1 // lower
| Eq = 0 // equal
| Gt = 1 // greater
| Cc = 2 // concurrent
type VTime = GCounter.GCounter
module VClock =
let zero = GCounter.zero
let inc = GCounter.inc
let merge = GCounter.merge
let compare (a: VTime) (b: VTime): Ord =
let valOrDefault k map =
match Map.tryFind k map with
| Some v -> v
| None -> 0L
let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq
let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq
(akeys + bkeys)
|> Seq.fold (fun prev k ->
let va = valOrDefault k a
let vb = valOrDefault k b
match prev with
| Ord.Eq when va > vb -> Ord.Gt
| Ord.Eq when va < vb -> Ord.Lt
| Ord.Lt when va > vb -> Ord.Cc
| Ord.Gt when va < vb -> Ord.Cc
| _ -> prev ) Ord.Eq
比较的算法看起来很长,但其实十分简单——我们一对对比较两者VTime中的条目(如果某个条目在另一方不存在,则计为0):
- 如果副本中对应的的所有值都是相等的,那么时钟也相等
- 如果左边所有的值都小于或者等于右边对应的值,则左边小于右边
- 如果左边所有的值都大于或者等于右边对应的值,则左边大于右边
- 任何同时包含大于/小于情况被视为并发更新
如果你对这个分布式系统的时钟更感兴趣,我推荐你去看Kavya Joshi的一个非常好的分享:在现实系统中记录时间
寄存器
CRDT的下一个数据类型就是寄存器了。你可以理解为储值的格子,可以存储任意定义的类型并提供CRDT语义。 记住,我们仍然需要遵循交换律/结合律/幂等性的约束。 正因如此,我们必须加入一些额外的元数据来使我们可以在冲突发生时解决任意的冲突问题。
最后写优先寄存器(Last Write Wins Register)
我们前面已经提过,最明显的解决冲突的办法其实就是使用时间戳。这也正是我们实现LWWReg时所使用的。
module LWWReg =
type LWWReg<'a> = 'a * DateTime
let zero: LWWReg<'a> = (Unchecked.defaultof<'a>, DateTime.MinValue)
let value (v, _) = v
let set c2 v2 (v1, c1) = if c1 < c2 then (v2, c2) else (v1, c1)
let merge (v1, c1) (v2, c2) = if c1 < c2 then (v2, c2) else (v1, c1)
很明显,我们的set和merge操作简简单单比较了两个寄存器的时间戳,并选择时间戳更高的值。
如同前面的案例一样,我们可以通过组合LWW寄存器来实现更高级的操作。
集合
了解完计数器和寄存器之后,是时候来讨论一下集合了。 这里最自然符合设定的选项时不同的Set的变体——只因为Set满足前面提到的交换律/结合律/幂等性。 然后,我们可以利用它来定义映射(map),图甚至有序的线性序列(比如在协作文本编辑的场景中有用)。
只增集合(Growing-only Set)
如同计数器的例子一样,只增集合(又称为GSet)是集合中最基本的例子。 它可以用在比如说投票系统中,我们期望判断一个用户是否参与了投票, 同时仍然保持用户选票匿名(这个例子中投票的总结果可以是GCounter本身)。
module GSet =
type GSet<'a when 'a: comparison> = Set<'a>
let zero: GSet<'a> = Set.empty
let value (s: GSet<'a>) = s
let add v (s: GSet<'a>) = Set.add v s
let merge (a: GSet<'a>) (b: GSet<'a>) = a + b
这并不是在没事整活。这是一个标准的集合!:)这里唯一的不同在于,我们约束我们自己不去做任何的删除操作。 这样做的原因是,我们merge时只是做了标准的合并,如果我们从任意一个副本中删除任意元素, 一旦它跟其他副本(这个副本还没有删除过)merge后,删除的元素又会被自动放回到合并结果中。
这里还有一课:因为我们从集合中删除了一个元素,我们会损失一些信息。 这恰恰是我们通常在CRDT中无法承受的,因此我们经常需要附加一些额外的元数据, 即使它们看起来好像跟结果值没有显式的联系。
两阶段集合(2-Phase Set)
接下来是两阶段集合。如同PNCounter的例子一样, 我们可以把两个GSet简单结合起来——一个为添加元素,一个为删除元素(通常会叫做墓碑tombstone)。 添加/删除元素和merge操作也跟PNCounter/GCounter十分像。
module PSet =
type PSet<'a when 'a: comparison> = GSet.GSet<'a> * GSet.GSet<'a>
let zero: PSet<'a> = (GSet.zero, GSet.zero)
// (add, rem) is a single PSet instance
let value (add, rem) = add - rem
let add v (add, rem) = (GSet.add v add, rem)
let rem v (add, rem) = (add, GSet.add v rem)
let merge (add1, rem1) (add2, rem2) =
(GSet.merge add1 add2, GSet.merge rem1 rem2)
上述的实现存在几个问题:
- 墓碑机制(逻辑删除)的一个常见情况是删除的集合事实上会无限增长下去,因此为了保持数据一致性最终的值仅只占用实际元数据一部分的大小。也有另外的算法——墓碑清理(tombstone pruning)——来缓解这一问题
- 虽然我们的确可以删除已经添加的元素,但如果我们尝试添加一个已经删除的元素是会出现问题。因为我们不具备任何从底层GSet中删除数据的语义,一旦删除之后,元素就永远呆在墓碑之下了。这将使得元素从最终结果中被删除。所以说大哥,没法重新添加元素哟。再次强调,我们需要额外的元数据来允许我们追溯因果关系从而决定发生的是添加还是删除操作。
监视删除集合(Observed Remove Set)
如果你已经看到这里,恭喜你!我们终于要来实现一个半高级的场景了:一个监视删除集合(也叫ORSet), 它将允许我们自由添加和删除元素,在从不同地区的副本merge时仍然可以合并。
它的原理是什么呢?我们将以可添加/删除的集合来展示我们的ORSet,但是这次我们将使用映射(Map)而不是集合(Set)。 map中的key将是我们的元素,而value则会是一个(部分)可比较的时间戳用来标记最后一次添加/删除操作发生的时间。
具体的ORSet定义依赖于所使用的时间戳和冲突解决算法:
- 你可以使用DateTime作为时间戳并在冲突时优先使用最新的值。这也附带给我们最后写优先的语义(如LWWReg一般)而不是对于集合中特定的元素。这会极大简化事情,但我们会做的比这更好:)
- 另一个途径是使用向量时钟,在文章前面已经进行了定义。这将允许我们检测两个副本什么时候同时添加/删除了同一个元素而不需要关心其他参与方是否做了同样的事情。当检测到这种情况的时候,我们需要知道任意情况下冲突解决算法的结果是什么。最常见的做法是通常优先选择添加而不是删除。这也被称之为添加优先监视删除集合(Add-Wins Observed Remove Set,简称AWORSet)。我们这就来实现它。
module ORSet =
type ORSet<'a when 'a: comparison> = Map<'a, VTime> * Map<'a, VTime>
let zero: ORSet<'a> = (Map.empty, Map.empty)
为了得到结果集合,我们的value方法需要对添加集合进行遍历,跟删除集合中的记录进行比较,并删除所有时间戳小于删除记录的值(意味着如果两个操作并发发生,我们保留结果)。
let value (add, rem) =
rem |> Map.fold(fun acc k vr ->
match Map.tryFind k acc with
| Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
| _ -> acc) add
如PNCounter一样我们的添加/删除操作需要在一个确定的副本r的上下文环境下工作——这是使用向量时钟作为时间戳的结果。 这里,我们简单将元素添加到对应的底层map中并增加其时间戳。
let add r e (add, rem) =
// 译者注:
// Case 1: If e is found in the add map (Some v)
// Case 2: If e is found in the rem map (Some v)
// Case 3: If e is not found in either map
match Map.tryFind e add, Map.tryFind e rem with
| Some v, _ -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
| _, Some v -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
| _, _ -> (Map.add e (VClock.inc r VClock.zero) add, rem)
let remove r e (add, rem) =
match Map.tryFind e add, Map.tryFind e rem with
| Some v, _ -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
| _, Some v -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
| _, _ -> (add, Map.add e (VClock.inc r VClock.zero) rem)
这里有一点需要指出,你可能已经注意到我们使用了Map.remove。在这个场景下是安全的,因为我们同时添加了值——时间戳的键值对到相反的map中,仍然将数据的信息保存在对象中。
最复杂的则是merge操作了。我们先从简单地挤压添加/删除集合开始(万一时间戳冲突,我们仍然简单合并到一起)。然后我们所需要的是通过从添加集合中删除所有时间错小于对应删除集合的元素的方式来合并添加/删除集合(这已经覆盖到并发更新的场景,我们已经在一开始的时候决定采取添加优先于删除的方式)。 从删除集合中,我们只需要把时间错小于或者等于添加集合中对应元素的值删掉。从而保持数据的精简。
let merge (add1, rem1) (add2, rem2) =
let mergeKeys a b =
b |> Map.fold (fun acc k vb ->
match Map.tryFind k acc with
| Some va -> Map.add k (VClock.merge va vb) acc
| None -> Map.add k vb acc ) a
let addk = mergeKeys add1 add2
let remk = mergeKeys rem1 rem2
let add = remk |> Map.fold (fun acc k vr ->
match Map.tryFind k acc with
| Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
| _ -> acc ) addk
let rem = addk |> Map.fold (fun acc k va ->
match Map.tryFind k acc with
| Some vr when VClock.compare va vr = Ord.Lt -> acc
| _ -> Map.remove k acc ) remk
(add, rem)
你可以从这里看出,我们使用一个向量时钟(也是一个map)的map, 它是一个次优的解决方案。有这些方法可以来改进:
- 最简单的方式是使用L4Z或者GZip算法压缩二进制的ORSet负载
- 更高级的方式是使用点状向量修改现有的实现
这些我们聊待下次再表。
接下来是什么?
- ↑ 译者注:TomTom是是一家荷兰的地图厂商,包括个人导航设备、高精地图和自动驾驶业务等。