Redis 跨 IDC 多向同步实践

x-pipe https://github.com/ctripcorp/x-pipe

XPipe 解决什么问题

Redis 在携程内部得到了广泛的使用,根据客户端数据统计,整个携程全部 Redis 的读写请求在每秒 2000W,其中写请求约 100W,很多业务甚至会将 Redis 当成内存数据库使用。这样,就对 Redis 多数据中心提出了很大的需求,一是为了提升可用性,解决数据中心 DR(Disaster Recovery) 问题,二是提升访问性能,每个数据中心可以读取当前数据中心的数据,无需跨机房读数据,在这样的需求下,XPipe 应运而生 。

为了方便描述,后面用 DC 代表数据中心 (Data Center)。

系统详述

整体架构

整体架构图如下所示:

design
  • Console 用来管理多机房的元信息数据,同时提供用户界面,供用户进行配置和 DR 切换等操作。
  • Keeper 负责缓存 Redis 操作日志,并对跨机房传输进行压缩、加密等处理。
  • Meta Server 管理单机房内的所有 keeper 状态,并对异常状态进行纠正。

Redis 数据复制问题

多数据中心首先要解决的是数据复制问题,即数据如何从一个 DC 传输到另外一个 DC。我们决定采用伪 slave 的方案,即实现 Redis 协议,伪装成为 Redis slave,让 Redis master 推送数据至伪 slave。这个伪 slave,我们把它称为 keeper,如下图所示:


使用 keeper 带来的优势

  • 减少 master 全量同步
    如果异地机房 slave 直接连向 master,多个 slave 会导致 master 多次全量同步,而 keeper 可以缓存 rdb 和 replication log,异地机房的 slave 直接从 keeper 获取数据,增强 master 的稳定性。
  • 减少多数据中心网络流量
    在两个数据中心之间,数据只需要通过 keeper 传输一次,且 keeper 之间传输协议可以自定义,方便支持压缩 (目前暂未支持)。
  • 网络异常时减少全量同步
    keeper 将 Redis 日志数据缓存到磁盘,这样,可以缓存大量的日志数据 (Redis 将数据缓存到内存 ring buffer,容量有限),当数据中心之间的网络出现较长时间异常时仍然可以续传日志数据。
  • 安全性提升
    多个机房之间的数据传输往往需要通过公网进行,这样数据的安全性变得极为重要,keeper 之间的数据传输也可以加密 (暂未实现),提升安全性。

机房切换

切换流程

  • 检查是否可以进行 DR 切换
    类似于 2PC 协议,首先进行 prepare,保证流程能顺利进行。
  • 原主机房 master 禁止写入
    此步骤,保证在迁移的过程中,只有一个 master,解决在迁移过程中可能存在的数据丢失情况。
  • 提升新主机房 master
  • 其它机房向新主机房同步

同时提供回滚和重试功能。回滚功能可以回滚到初始的状态,重试功能可以在 DBA 人工介入的前提下,修复异常条件,继续进行切换。

高可用

XPipe 系统高可用

如果 keeper 挂掉,多数据中心之间的数据传输可能会中断,为了解决这个问题,keeper 有主备两个节点,备节点实时从主节点复制数据,当主节点挂掉后,备节点会被提升为主节点,代替主节点进行服务。

提升的操作需要通过第三方节点进行,我们把它称之为 MetaServer,主要负责 keeper 状态的转化以及机房内部元信息的存储。同时 MetaServer 也要做到高可用:每个 MetaServer 负责特定的 Redis 集群,当有 MetaServer 节点挂掉时,其负责的 Redis 集群将由其它节点接替;如果整个集群中有新的节点接入,则会自动进行一次负载均衡,将部分集群移交到此新节点。

Redis 自身高可用

Redis 也可能会挂,Redis 本身提供哨兵 (Sentinel) 机制保证集群的高可用。但是在 Redis4.0 版本之前,提升新的 master 后,其它节点连到此节点后都会进行全量同步,全量同步时,slave 会处于不可用状态;master 将会导出 rdb,降低 master 的可用性;同时由于集群中有大量数据 (RDB) 传输,将会导致整体系统的不稳定。

截止当前文章书写之时,4.0 仍然没有发布 release 版本,而且携程内部使用的 Redis 版本为 2.8.19,如果升到 4.0,版本跨度太大,基于此,我们在 Redis3.0.7 的版本基础上进行优化,实现了 psync2.0 协议,实现了增量同步。下面是 Redis 作者对协议的介绍:psync2.0

携程内部 Redis 地址链接

测试数据

test

延时测试

测试方案

测试方式如下图所示。从 client 发送数据至 master,并且 slave 通过 keyspace notification 的方式通知到 client,整个测试延时时间为 t1+t2+t3。

测试数据

首先我们测试 Redis master 直接复制到 slave 的延时,为 0.2ms。然后在 master 和 slave 之间增加一层 keeper,整体延时增加 0.1ms,到 0.3ms。

在携程生产环境进行了测试,生产环境两个机房之间的 ping RTT 约为 0.61ms,经过跨数据中心的两层 keeper 后,测试得到的平均延时约为 0.8ms,延时 99.9 线为 2ms。

跨公网部署及架构

详情参考 — 跨公网部署及架构

docker快速启动

详情参考 — docker quick start

深入了解

redis-shake数据同步&迁移工具

2019-02-28 39653 简介: redis-shake是阿里云Redis&MongoDB团队开源的用于redis数据同步的工具。

  redis-shake是阿里云Redis&MongoDB团队开源的用于redis数据同步的工具。下载地址:这里

基本功能

  redis-shake是我们基于redis-port基础上进行改进的一款产品。它支持解析恢复备份同步四个功能。以下主要介绍同步sync。

  • 恢复restore:将RDB文件恢复到目的redis数据库。
  • 备份dump:将源redis的全量数据通过RDB文件备份起来。
  • 解析decode:对RDB文件进行读取,并以json格式解析存储。
  • 同步sync:支持源redis和目的redis的数据同步,支持全量和增量数据的迁移,支持从云下到阿里云云上的同步,也支持云下到云下不同环境的同步,支持单节点、主从版、集群版之间的互相同步。需要注意的是,如果源端是集群版,可以启动一个RedisShake,从不同的db结点进行拉取,同时源端不能开启move slot功能;对于目的端,如果是集群版,写入可以是1个或者多个db结点。
  • 同步rump:支持源redis和目的redis的数据同步,仅支持全量的迁移。采用scan和restore命令进行迁移,支持不同云厂商不同redis版本的迁移。

基本原理

Screen_Shot_2019_03_29_at_6_23_58_PM

  redis-shake的基本原理就是模拟一个从节点加入源redis集群,首先进行全量拉取并回放,然后进行增量的拉取(通过psync命令)。如下图所示:
Screen_Shot_2019_03_29_at_6_23_58_PM

  如果源端是集群模式,只需要启动一个redis-shake进行拉取,同时不能开启源端的move slot操作。如果目的端是集群模式,可以写入到一个结点,然后再进行slot的迁移,当然也可以多对多写入。
  目前,redis-shake到目的端采用单链路实现,对于正常情况下,这不会成为瓶颈,但对于极端情况,qps比较大的时候,此部分性能可能成为瓶颈,后续我们可能会计划对此进行优化。另外,redis-shake到目的端的数据同步采用异步的方式,读写分离在2个线程操作,降低因为网络时延带来的同步性能下降。

高效性

   全量同步阶段并发执行,增量同步阶段异步执行,能够达到毫秒级别延迟(取决于网络延迟)。同时,我们还对大key同步进行分批拉取,优化同步性能。

监控

   用户可以通过我们提供的restful拉取metric来对redis-shake进行实时监控:curl 127.0.0.1:9320/metric

校验

   如何校验同步的正确性?可以采用我们开源的redis-full-check,具体原理可以参考这篇博客

支持

  • 支持2.8-5.0版本的同步。
  • 支持codis。
  • 支持云下到云上,云上到云上,云上到云下(阿里云目前支持主从版),其他云到阿里云等链路,帮助用户灵活构建混合云场景。

开发中的功能

  1. 断点续传。支持断开后按offset恢复,降低因主备切换、网络抖动造成链路断开重新同步拉取全量的性能影响。

说明

多活支持的支持,需要依赖内核的能力,目前单单依赖通道层面无法解决该问题。redis内核层面需要提供gtid的概念,以及CRDT的解决冲突的方式才可以。

参数解释:

启动示例(以sync举例):
vinllen@ ~/redis-shake/bin$ ./redis-shake -type=sync -conf=../conf/redis-shake.conf
conf路径下的redis-shake.conf存放的是配置文件,其内容及部分中文注释如下

# this is the configuration of redis-shake.

# id
id = redis-shake
# log file,日志文件,不配置将打印到stdout
log_file =

# pprof port
system_profile = 9310
# restful port,查看metric端口
http_profile = 9320

# runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU()
ncpu = 0

# parallel routines number used in RDB file syncing.
parallel = 4

# input RDB file. read from stdin, default is stdin ('/dev/stdin').
# used in `decode` and `restore`.
# 如果是decode或者restore,这个参数表示读取的rdb文件
input_rdb = local_dump

# output RDB file. default is stdout ('/dev/stdout').
# used in `decode` and `dump`.
# 如果是decode或者dump,这个参数表示输出的rdb
output_rdb = local_dump

# source redis configuration.
# used in `dump` and `sync`.
# ip:port
# 源redis地址
source.address = 127.0.0.1:20441
# password.
source.password_raw = kLNIl691OZctWST
# auth type, don't modify it
source.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0)
source.version = 6

# target redis configuration. used in `restore` and `sync`.
# used in `restore` and `sync`.
# ip:port
# 目的redis地址
target.address = 10.101.72.137:20551
# password.
target.password_raw = kLNIl691OZctWST
# auth type, don't modify it
target.auth_type = auth
# version number, default is 6 (6 for Redis Version <= 3.0.7, 7 for >=3.2.0)
target.version = 6
# all the data will come into this db. < 0 means disable.
# used in `restore` and `sync`.
target.db = -1

# use for expire key, set the time gap when source and target timestamp are not the same.
# 用于处理过期的键值,当迁移两端不一致的时候,目的端需要加上这个值
fake_time =

# force rewrite when destination restore has the key
# used in `restore` and `sync`.
# 当源目的有重复key,是否进行覆写
rewrite = true

# filter db or key or slot
# choose these db, e.g., 5, only choose db5. defalut is all.
# used in `restore` and `sync`.
# 支持过滤db,只让指定的db通过
filter.db =
# filter key with prefix string. multiple keys are separated by ';'.
# e.g., a;b;c
# default is all.
# used in `restore` and `sync`.
# 支持过滤key,只让指定的key通过,分号分隔
filter.key =
# filter given slot, multiple slots are separated by ';'.
# e.g., 1;2;3
# used in `sync`.
# 指定过滤slot,只让指定的slot通过
filter.slot =

# big key threshold, the default is 500 * 1024 * 1024. The field of the big key will be split in processing.
# 我们对大key有特殊的处理,此处需要指定大key的阈值
big_key_threshold = 524288000

# use psync command.
# used in `sync`.
# 默认使用sync命令,启用将会使用psync命令
psync = false

# enable metric
# used in `sync`.
# 是否启用metric
metric = true
# print in log
# 是否将metric打印到log中
metric.print_log = true

# heartbeat
# send heartbeat to this url
# used in `sync`.
# 心跳的url地址,redis-shake将会发送到这个地址
heartbeat.url = http://127.0.0.1:8000
# interval by seconds
# 心跳保活周期
heartbeat.interval = 3
# external info which will be included in heartbeat data.
# 在心跳报文中添加额外的信息
heartbeat.external = test external
# local network card to get ip address, e.g., "lo", "eth0", "en0"
# 获取ip的网卡
heartbeat.network_interface =

# sender information.
# sender flush buffer size of byte.
# used in `sync`.
# 发送缓存的字节长度,超过这个阈值将会强行刷缓存发送
sender.size = 104857600
# sender flush buffer size of oplog number.
# used in `sync`.
# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送
sender.count = 5000
# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also stored in this delay queue. this timestamp will be used to calculate the time delay when receiving ack from target redis.
# used in `sync`.
# 用于metric统计时延的队列
sender.delay_channel_size = 65535

# ----------------splitter----------------
# below variables are useless for current opensource version so don't set.

# replace hash tag.
# used in `sync`.
replace_hash_tag = false

# used in `restore` and `dump`.
extra = false

开源地址

redis-shake: https://github.com/aliyun/redis-shake

一、前言

跨 DC(数据中心)的数据同步是企业提升容灾实力的必备手段。随着携程业务向海外发展的速度越来越快,应用架构能够快速全球部署的能力也愈发重要。对于服务而言,我们可以尽量做到无状态的部署架构,来达到灵活拓展,快速部署的目的,比如 server-less。

然而,对于业务应用来讲,大量的业务逻辑是有状态的,仅仅做到服务的灵活拓展还不够,需要数据也拥有多站点共享的能力。

携程在一年前已经可以实现 Redis 单向跨区域同步,从上海将数据复制到美国加州,德国法兰克福,以及新加坡等众多海外站点,延迟稳定在 180ms 左右,支持单个 Redis 5MB/s 的传输带宽。

关于跨公网传输以及跨区域单向复制,详情参考这篇文章:携程Redis海外机房数据同步实践

但是,技术人的目标就是不断突破自己。在 Redis 出海同步一年之后,业务上有了新的需求:能否在每个站点都可以独立地写入和读取,所有数据中心之间互相同步,而跨区域复制的一致性等问题,也可以由底层存储来解决?

单向同步的 Redis 固然可以解决问题,然而,大量的海外数据需要先回流上海,再从上海同步至各个数据中心,这一来一去, 不仅给业务开发带来了额外的复杂性和代码的冗余性,也给数据本身的时效性以及跨区域传输的费用带来了问题(携程的数据回源需要走专线,而 Redis 同步通过公网传输,由携程的框架中间件 – XPipe 来解决跨公网传输的安全和不稳定性因素)。[1]

二、技术选型

所以,我们需要的是一个分布式的 Redis 存储,能够实现跨区域多向同步。面对大型分布式系统,不免要讨论 CAP 理论,在跨区域多活的场景下如何取舍?

显然 P(网络分区)是首要考虑因素。其次,跨区域部署就是为了提高可用性,而且对于常见的一致性协议,不管是 2PC、Paxos 还是 raft,在此场景下都要做跨区域同步更新,不仅会降低用户体验,在网络分区的时候还会影响可用性。

对于 Redis 这种毫秒级相应的数据库,应用希望能够在每个站点都可以“如丝般顺滑”地使用,因此 C 必定被排在最后。

那是不是 C 无法被满足了呢?事实并非如此,退而求其次,最终一致也是一种选择。经过调研,我们决定选用“强最终一致性”的理论模型来满足一致性的需求。[2]

关于“最终一致性”(Eventually Consistency) 和“强最终一致性”(Strong Eventually Consistency),大家可以参考 wiki 百科给出的释义:

(Strong Eventually Consistency) Whereas eventual consistency is only a liveness guarantee (updates will be observed eventually), strong eventual consistency (SEC) adds the safety guarantee that any two nodes that have received the same (unordered) set of updates will be in the same state. If, furthermore, the system is monotonic, the application will never suffer rollbacks.

三、问题

有了目标,自然是开始计划和设计,那么在开始之前有什么问题呢?

3.1 跨数据中心双向同步共同的问题

各种数据库在设计双向数据同步时,均会遇到的问题:

1)复制回源:A -> B -> A

数据从 A 复制到 B,B 收到数据后,再回源复制给 A 的问题。

2)环形复制:A -> B -> C -> A

我们可以通过标记来解决上一问题,然而系统引入更多节点之后,A 发送到 B 的数据,可能通过 C 再次回到 A。

3)数据一致性

网络传输具有延迟性和不稳定性,多节点的并发写入会造成数据不一致的问题。

4)同步时延(鉴于是跨国的数据同步,这一项我们先忽略)

3.2 Redis 的问题

1)Redis 原生的复制模型,是不能够支持 Multi-Master 的理论架构的。

开源版的 Redis 只能支持 Master-Slave 的架构,并不能支持 Redis 之间互相同步数据。

2)Redis 特殊的同步方式(全量同步+增量同步),给数据一致性带来了更多挑战。

Redis 全量同步和增量同步都基于 replicationId + offset 的方式来做,在引入多个节点互相同步之后,如何对齐互相之间全量同步和增量同步的 offset 是一个巨大的问题。

3)同时支持原生的 Master-Slave 系统

在新版系统上,同时兼容现存的 Master-Slave 架构,两种同步方式和策略的异同,也带来了新的挑战。

四、解决方案

由于篇幅的限制,这里只对数据一致性的解决方案做下介绍,对于分布式系统或者是双向同步感兴趣的同学,可以关注我们后续的技术文章和技术大会。

4.1 一致性的解决方案

CRDT(Conflict-Free Replicated Data Type)[3] 是各种基础数据结构最终一致算法的理论总结,能根据一定的规则自动合并,解决冲突,达到强最终一致的效果。

2012 年 CAP 理论提出者 Eric Brewer 撰文回顾 CAP[4]时也提到,C 和 A 并不是完全互斥,建议大家使用 CRDT 来保障一致性。自从被大神打了广告,各种分布式系统和应用均开始尝试 CRDT,redislabs[5]和 riak[6]已经实现多种数据结构,微软的 CosmosDB[7]也在 azure 上使用 CRDT 作为多活一致性的解决方案。

携程的框架部门最终也选择站在巨人的肩膀上,通过 CRDT 这种数据结构,来实现自己的 Redis 跨区域多向同步。

4.2 CRDT

CRDT 同步方式有两种:

1)state-based replication

发送端将自身的“全量状态”发送给接收端,接收端执行“merge”操作,来达到和发送端状态一致的结果。state-base replication 适用于不稳定的网络系统,通常会有多次重传。要求数据结构能够支持 associative(结合律)/commutative(交换律)/idempotent(幂等性)。

2)operation-based replication

发送端将状态的改变转换为“操作”发送给接收端,接收端执行“update”操作,来达到和发送端状态一直的结果。op-based replication 只要求数据结构满足 commutative 的特性,不要求 idempotent(大家可以想一想为什么)。op-based replication 在接收到 client 端的请求时,通常分为两步进行操作:

a. prepare 阶段:将 client 端操作转译为 CRDT 的操作;

b. effect 阶段:将转译后的操作 broadcast 到其他 server;

两者之间在实现上,界限比较模糊。一方面,state-based replication 可以通过发送 delta 减少网络流量,从而做到和 op-based replication 比较接近的效果;另一方面,op-based replication 可以通过发送 compact op-logs 将操作全集发送过去,来解决初始化的时候同步问题,从而达到类似于 state-based replication 的效果。

我们的系统需要借助两种同步方式,以适用于不同的场景中:

  1. state-based replication 通常是基于“全量状态”进行同步,这样的结果是造成的网络流量太大,且同步的效率低下。在同步机制已经建立的系统中,我们更倾向于使用 op-based replication,以达到节省流量和快速同步的目的。
  2. op-based replication 是基于 unbounded resource 的假设上进行论证的学术理念,在实践过程中,不可能有无限大的存储资源,将某个站点的全部数据缓存下来。

这样就带来一个问题,如果新加节点或者网络断开过久时,我们的存储资源不足以缓存所有值钱的操作,从而使得复制操作无法进行。此时,我们需要借助 state-based replication 进行多个站点之间,状态的 merge 操作。

4.3 CRDT 的数据结构

Redis 的 String 类型对应的操作有 SET, DEL, APPEND, INCRBY 等等,这里我们只讲一下 SET 操作(INCRBY 会是不同的数据类型)。

Register

先来讲一下,CRDT 理论中如何处理 Redis String 类型的同步问题。

Redis 的 String 类型对应于 CRDT 里面的 Register 数据结构,对应的具体实现有两种比较符合我们的应用场景:

  • MV(Multi-Value) Register:数据保留多份副本,客户端执行 GET 操作时,根据一定的规则返回值,这种类型比较适合 INCRBY 的整型数操作。
  • LWW(Last-Write-Wins) Register:数据只保留一份副本,以时间戳最大的那组数据为准,SET 操作中,我们使用这种类型。

还记得上文提到的两种不同的同步方式么,关于两种不同的同步方式,对于 LWW Register,实现方式会稍有不同。

Op-based LWW Register [8]
State-based LWW Register [8]

4.4 CRDT Register 在 Redis 中的落地

讲完了 CRDT 的传输类型和一个基本的数据结构,那么具体这样的理论是如何落地到 Redis 中间的呢?

在最终的实现中,我们采用了 OR-Set(Observed-Remove Set) + LWW(Last-Write-Wins) Register 来实现 Redis 中的 String 操作。

4.4.1 Redis K/V

以下是理论上的数据结构,并不是 redis 中真正的结构体,仅仅作为说明使用。

struct CRDT.Register {       string key;       string val;       TAG delete-tag;       int timestamp; }

1)key 既是 SET 操作中的 key

2)val 用来存储相应的 value

3)delete-tag 是逻辑删除的标记位,具体的理论来源是 OR(Observed-Remove) Set

4)timestamp 用于 LWW(Last Write Wins)机制,来解决并发冲突

由于目前我们的多写 Redis 还没有开源,这里我们拿 Java 程序举个栗子[9] 详细可以访问 github

LWW Register

processCommand 是这个 CRDT 框架的核心函数,基本定义了每一种类型,是如何进行 merge/update 等操作的。

在这里我们可以看到,每一个 command 过来时,会携带一个自身的时钟,由本地的程序进行判定,如果时钟符合偏序(partial ordered),就进行 merge 操作,并储存元素。

javapublic class LWWRegister extends AbstractCrdt<LWWRegister, LWWRegister.SetCommand> {
private T value;private StrictVectorClock clock;
public LWWRegister(String nodeId, String crdtId) { super(nodeId, crdtId, BehaviorProcessor.create()); this.clock = new StrictVectorClock(nodeId);}

protected Option<SetCommand<T>> processCommand(SetCommand<T> command) { if (clock.compareTo(command.getClock()) < 0) { clock = clock.merge(command.getClock()); doSet(command.getValue()); return Option.of(command); } return Option.none();}

public T get() { return value;}
public void set(T newValue) { if (! Objects.equals(value, newValue)) { doSet(newValue); commands.onNext(new SetCommand<>( crdtId, value, clock )); }}
private void doSet(T value) { this.value = value; clock = clock.increment();}
}

OR-SET

而 Observed-Remove SET 相较 LWW-Register 就复杂一些,主要涉及两个概念,一个是正常的原生储存的地方,在 Set<Element<E>> elements 中。而另一个重要的概念,是 tombstone(墓地),用来存放会删除的元素 Set<Element<E>> tombstone OR-SET 的 tombstone,就可以解决并发删除的问题,而 LWW-Register 则可以解决并发添加的问题。

java ublic class ORSet extends AbstractSet implements Crdt<ORSet, ORSet.ORSetCommand> /*, ObservableSet */ {private final String crdtId;private final Set<Element<E>> elements = new HashSet<>();private final Set<Element<E>> tombstone = new HashSet<>();private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create();
public ORSet(String crdtId) { this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");}
@Overridepublic String getCrdtId() { return crdtId;}
@Overridepublic void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) { commands.subscribe(subscriber);}
@Overridepublic void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) { Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> { final Option<ORSetCommand<E>> newCommand = processCommand(command); newCommand.peek(commands::onNext);
});}
private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) { if (command instanceof AddCommand) { return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none(); } else if (command instanceof RemoveCommand) { return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none(); } return Option.none();}
@Overridepublic int size() { return doElements().size();}
@Overridepublic Iterator<E> iterator() { return new ORSetIterator();}
@Overridepublic boolean add(E value) { final boolean contained = doContains(value); prepareAdd(value); return !contained;}
private static <U> Predicate<Element<U>> matches(U value) { return element -> Objects.equals(value, element.getValue());}
private synchronized boolean doContains(E value) { return elements.parallelStream().anyMatch(matches(value));}
private synchronized Set<E> doElements() { return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet());}
private synchronized void prepareAdd(E value) { final Element<E> element = new Element<>(value, UUID.randomUUID()); commands.onNext(new AddCommand<>(getCrdtId(), element)); doAdd(element);}
private synchronized boolean doAdd(Element<E> element) { return (elements.add(element) | elements.removeAll(tombstone)) && (!tombstone.contains(element));}
private synchronized void prepareRemove(E value) { final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet()); commands.onNext(new RemoveCommand<>(getCrdtId(), removes)); doRemove(removes);}
private synchronized boolean doRemove(Collection<Element<E>> removes) { return elements.removeAll(removes) | tombstone.addAll(removes);}
}

4.4.2 用户视角

正常同步的场景

Data Type: Strings

Use Case: Common SETs Conflict Resolution: None

并发冲突的场景

Data Type: Strings

Use Case: Concurrent SETs

Conflict Resolution: Last Write Wins (LWW)

五、未完待续

由于篇幅限制,我们详细介绍了 CRDT 同步的理论基础以及一个 Redis 的 K/V 数据结构在 CRDT 中是如何展现出来的。对 CRDT 或分布式数据库感兴趣的同学,请关注携程的公众号,也可以支持一下我们目前已经开源的的 redis 同步产品 – XPipe (https://github.com/ctripcorp/x-pipe)。

附录

[1] 携程Redis海外机房数据同步实践

[2] CRDT——解决最终一致问题的利器

[3] CRDT: https://hal.inria.fr/inria-00609399/document

[4] Eric Brewer: https://www.infoq.com/articles/cap-twelve-years-later-how-the-rules-have-changed

[5] redislabs, Developing Applications with Geo-replicated CRDBs on Redis Enterprise Software(RS): https://redislabs.com/redis-enterprise-documentation/developing/crdbs/

[6] riak: https://docs.basho.com/riak/kv/2.0.0/developing/data-types/

[7] cosmosDB: https://docs.microsoft.com/en-us/azure/cosmos-db/multi-region-writers

[8] A comprehensive study of Convergent and Commutative Replicated Data Types(https://links.jianshu.com/go?to=http%3A%2F%2Fhal.upmc.fr%2Ffile%2Findex%2Fdocid%2F555588%2Ffilename%2Ftechreport.pdf)

[9] CRDT Java: https://github.com/netopyr/wurmloch-crdt

Leave a Reply

Your email address will not be published. Required fields are marked *