kafka跨集群同步 /zk的数据迁移

该方案解决Kafka跨集群同步、创建Kafka集群镜像等相关问题,主要使用Kafka内置的MirrorMaker工具实现。

Kafka镜像即已有Kafka集群的副本。下图展示如何使用MirrorMaker工具创建从源Kafka集群(source cluster)到目标Kafka集群(target cluster)的镜像。该工具通过Kafka consumer从源Kafka集群消费数据,然后通过一个内置的Kafka producer将数据重新推送到目标Kafka集群。

一、如何创建镜像

使用MirrorMaker创建镜像是比较简单的,搭建好目标Kafka集群后,只需要启动mirror-maker程序即可。其中,一个或多个consumer配置文件、一个producer配置文件是必须的,whitelist、blacklist是可选的。在consumer的配置中指定源Kafka集群的Zookeeper,在producer的配置中指定目标集群的Zookeeper(或者broker.list)。

kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceCluster1Consumer.config –consumer.config sourceCluster2Consumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=“.*”

例如,你需要创建S集群的镜像,目标集群T已经搭建好,简单的做法如下:

1. 创建consumer配置文件:sourceClusterConsumer.config

zk.connect=szk0:2181,szk1:2181,szk2:2181
groupid=test-mirror-consumer-group

2. 创建producer配置文件:targetClusterProducer.config

zk.connect=tzk0:2181,tzk1:2181

3. 创建启动脚本:start.sh

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker –consumer.config sourceClusterConsumer.config –num.streams 2 –producer.config targetClusterProducer.config –whitelist=“.*”

4. 执行脚本

执行start.sh通过日志信息查看运行状况,到目标Kafka集群的log.dir中即可看到同步过来的数据。

二、MirrorMaker的参数说明

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker –help

执行上面的命令就可以看到各个参数的说明:

1. 白名单(whitelist) 黑名单(blacklist)

mirror-maker接受精确指定同步topic的白名单和黑名单。使用java标准的正则表达式,为了方便,逗号(‘,’)被编译为java正则中的(‘|’)。

2. Producer timeout

为了支持高吞吐量,你最好使用异步的内置producer,并将内置producer设置为阻塞模式(queue.enqueueTimeout.ms=-1)。这样可以保证数据(messages)不会丢失。否则,异步producer默认的 enqueueTimeout是0,如果producer内部的队列满了,数据(messages)会被丢弃,并抛出QueueFullExceptions异常。而对于阻塞模式的producer,如果内部队列满了就会一直等待,从而有效的节制内置consumer的消费速度。你可以打开producer的的trace logging,随时查看内部队列剩余的量。如果producer的内部队列长时间处于满的状态,这说明对于mirror-maker来说,将消息重新推到目标Kafka集群或者将消息写入磁盘是瓶颈。

对于kafka的producer同步异步的详细配置请参考$KAFKA_HOME/config/producer.properties文件。关注其中的producer.type和queue.enqueueTimeout.ms这两个字段。

3. Producer 重试次数(retries)

如果你在producer的配置中使用broker.list,你可以设置当发布数据失败时候的重试次数。retry参数只在使用broker.list的时候使用,因为在重试的时候会重新选择broker。

4. Producer 数量

通过设置—num.producers参数,可以使用一个producer池来提高mirror maker的吞吐量。在接受数据(messages)的broker上的producer是只使用单个线程来处理的。就算你有多个消费流,吞吐量也会在producer处理请求的时候被限制。

5. 消费流(consumption streams)数量

使用—num.streams可以指定consumer的线程数。请注意,如果你启动多个mirror maker进程,你可能需要看看其在源Kafka集群partitions的分布情况。如果在每个mirror maker进程上的消费流(consumption streams)数量太多,某些消费进程如果不拥有任何分区的消费权限会被置于空闲状态,主要原因在于consumer的负载均衡算法。

6. 浅迭代(Shallow iteration)与producer压缩

我们建议在mirror maker的consumer中开启浅迭代(shallow iteration)。意思就是mirror maker的consumer不对已经压缩的消息集(message-sets)进行解压,只是直接将获取到的消息集数据同步到producer中。

如果你开启浅迭代(shallow iteration),那么你必须关闭mirror maker中producer的压缩功能,否则消息集(message-sets)会被重复压缩。

7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes

镜像经常用在跨集群场景中,你可能希望通过一些配置选项来优化内部集群的通信延迟和特定硬件性能瓶颈。一般来说,你应该对mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer设定一个高的值。此外,mirror-maker中消费者(consumer)的fetch.size应该设定比socket.buffersize更高的值。注意,套接字缓冲区大小(socket buffer size)是操作系统网络层的参数。如果你启用trace级别的日志,你可以检查实际接收的缓冲区大小(buffer size),以确定是否调整操作系统的网络层。

三、如何检验MirrorMaker运行状况

Consumer offset checker工具可以用来检查镜像对源集群的消费进度。例如:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group KafkaMirror –zkconnect localhost:2181 –topic test-topic
KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
            Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0
  Consumer offset = 561154288
                  = 561,154,288 (0.52G)
         Log size = 2231392259
                  = 2,231,392,259 (2.08G)
     Consumer lag = 1670237971
                  = 1,670,237,971 (1.56G)
BROKER INFO
0 -> 127.0.0.1:9092

注意,–zkconnect参数需要指定到源集群的Zookeeper。另外,如果指定topic没有指定,则打印当前消费者group下所有topic的信息。

zk的数据迁移,一方面可以使用集群过半数仍然可用的这个特性,另一方面也可以通过直接拷贝元数据文件到新集群;

但是有特殊的场景,第一种就不是最佳的选择,例如公司在用的zookeeper集群,因为历史原因导致集群使用场景复杂,连接数负载很高,尤其在节假日业务活动期间,有可能非预期的将集群连接数并打满,负载过高致使整个集群崩溃,所以需要使用第二种来进行数据的迁移;

新的备份集群需要从头搭建,但是搭建好时需要注意两点:

在新集群没有启动时数据目录不存在;

原始集群的元数据文件放到新集群数据目录下面时,为了能够加载迁移数据需要重启新集群;

所以我们先启动集群,让数据目录进行加载,然后再stop掉集群,删掉备份集群中数据目录下的所有文件,包括:事务日志、快照、两个epoch文件。

手动方式(使用minos工具)

集群管理工具和自己手动操作集群在底层命令的执行上都是一样的,这里可以不必纠结。这篇文章对zookeeper集群的操作命令都是通过minos开源工具来进行集群操作,有兴趣的朋友也可以了解一下,地址为:https://github.com/XiaoMi/minos.git

1、新集群启动(假定备份集群为:backuptst)

新备份集群服务器节点名为:server01、server02、server03、server04、server05

准备好新集群的所有服务器并进行部署和启动,这样一来zookeeper服务进程启动时会自动创建数据目录。

zookeeper集群部署和启动这里就不会重点陈述了,我就还是直接使用的minos工具,在写好集群配置文件后,下列命令会一键完成新集群的部署和启动的操作。


./deploy bootstrap zookeeper backuptst 

2、新集群集群停止

为将数据迁移到集群中各个节点作准备。


./deploy stop zookeeper backuptst

3、进入新集群中各个节点上删除数据目录下的元数据文件

注意:

如果集群中的epoch当前的记录文件不删掉的话,会造成集群无法正常启动。原因是节点在启动时会识别epoch文件中记录的当前的epoch值,然后load磁盘元数据时会与事务zxid进行对比;而我们后面迁移原始数据中的epoch和当前新集群的不匹配,则集群重启时无法正常启动。故需要将新的备份集群中,各个节点的epoch文件删除,迁移数据时顺便也直接将原始集群的两个epoch拿到该数据目录下即可。


cd 自己的zookeeper集群数据目录路径/version-2/

rm -f *

4、拷贝原始集群中,leader节点下数据目录的元数据文件到备份集群中

拷贝原始集群中的leader节点,在一定程度上是因为考虑leader的数据相对其他节点来说最全。

  • 确定原始集群的leader节点ip。
  • 备份该节点下对应的数据目录下的文件:
    • 最新的log事务日志文件;
    • 最新的snapshot文件;
    • acceptedEpoch文件;
    • currentEpoch文件;

5、将备份数据分发到新集群中的各个节点对应的数据目录下

将上面的四个文件依次分发到新集群的各个节点下面对应的数据目录中。

例:

我这里事先将原始集群leader数据目录中的上述四个文件提前拷贝到本地的zk_meta_dir的目录中,四个文件如下:


ls /zk_meta_dir/

    log.25bcc3ab96

    snapshot.25bcc3ab95

    acceptedEpoch

    currentEpoch

开始向新备份集群各个节点分发该四个文件:


scp /zk_meta_dir/* work@server01:/home/work/zookeeper/backuptst/zookeeper/version-2/

scp /zk_meta_dir/* work@server02:/home/work/zookeeper/backuptst/zookeeper/version-2/

scp /zk_meta_dir/* work@server03:/home/work/zookeeper/backuptst/zookeeper/version-2/

scp /zk_meta_dir/* work@server04:/home/work/zookeeper/backuptst/zookeeper/version-2/

scp /zk_meta_dir/* work@server05:/home/work/zookeeper/backuptst/zookeeper/version-2/

6、重新启动新集群


./deploy start zookeeper backuptst

到这里,可以通过zkCli.sh命令登陆一台zookeeper进行查看数据是否恢复,或者通过mntr四字命令输出集群信息,对比原始集群的数据信息也可以进行检查。

Kafka MirrorMaker is a stand-alone tool for copying data between two Apache Kafka® clusters. It is little more than a Kafka consumer and producer hooked together. Data will be read from topics in the origin cluster and written to a topic with the same name in the destination cluster.

Important

Confluent Replicator is a more complete solution that handles topic configuration as well as data and integrates with Kafka Connect and Control Center to improve availability, scalability and ease of use.

This topic provides examples of how to migrate from an existing datacenter that is using MirrorMaker to Replicator. In these examples, messages are replicated from a specific point in time, not from the beginning. This is helpful if you have a large number of legacy messages that you do not want to migrate.

Assume there are two datacenters, DC1 (Active) and DC2 (Passive), that are each running an Apache Kafka® cluster. There is a single topic in DC1 and it has been replicated to DC2 with the same topic name. The topic name is inventory.

Example 1: Same Number of Partitions in DC1 and DC2

In this example, you migrate from MirrorMaker to Replicator and keep the same number of partitions for inventory in DC1 and DC2. Prerequisites:

  • Confluent Platform 5.0.0 or later is installed.
  • You must have the same number of partitions for inventory in DC1 and DC2 to use this method.
  • The src.consumer.group.id in Replicator must match group.id in MirrorMaker.
  1. Stop the running MirrorMaker instance in DC1, where <mm pid> is the MirrorMaker process ID: kill <mm pid> Copy
  2. Configure and start Replicator. In this example, Replicator is run as an executable from the command line or from a Docker image.
    1. Add these values to <path-to-confluent>/etc/kafka-connect-replicator/replicator_consumer.properties. Replace localhost:9082 with the bootstrap.servers of DC1, the source cluster: bootstrap.servers=localhost:9082 topic.preserve.partitions=true Copy
    2. Add this value to <path-to-confluent>/etc/kafka-connect-replicator/replicator_producer.properties. Replace localhost:9092 with the bootstrap.servers of DC2, the destination cluster: bootstrap.servers=localhost:9092 Copy
    3. Ensure the replication factors are set to 2 or 3 for production, if they are not already: echo “confluent.topic.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties echo “offset.storage.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties echo “config.storage.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties echo “status.storage.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties Copy
    4. Start Replicator: <path-to-confluent>/bin/replicator –cluster.id <new-cluster-id> \ –producer.config replicator_producer.properties \ –consumer.config replicator_consumer.properties \ –replication.config ./etc/kafka-connect-replicator/quickstart-replicator.properties Copy

Replicator will use the committed offsets by MirrorMaker from DC1 and start replicating messages from DC1 to DC2 based on these offsets.

Example 2: Different Number of Partitions in DC1 and DC2

In this example, you migrate from MirrorMaker to Replicator and have a different number of partitions for inventory in DC1 and DC2. Prerequisite:

  • Confluent Platform 5.0.0 or later is installed.
  • The src.consumer.group.id in Replicator must match group.id in MirrorMaker.
  1. Stop the running MirrorMaker instance from DC1.
  2. Configure and start Replicator. In this example, Replicator is run as an executable from the command line or from a Docker image.
    1. Add this value to <path-to-confluent>/etc/kafka-connect-replicator/replicator_consumer.properties. Replace localhost:9082 with the bootstrap.servers of DC1, the source cluster: bootstrap.servers=localhost:9082 topic.preserve.partitions=false Copy
    2. Add this value to <path-to-confluent>/etc/kafka-connect-replicator/replicator_producer.properties. Replace localhost:9092 with the bootstrap.servers of DC2, the destination cluster: bootstrap.servers=localhost:9092 Copy
    3. Ensure the replication factors are set to 2 or 3 for production, if they are not already: echo “confluent.topic.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties echo “offset.storage.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties echo “config.storage.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties echo “status.storage.replication.factor=3” >> ./etc/kafka-connect-replicator/quickstart-replicator.properties Copy
    4. Start Replicator: <path-to-confluent>/bin/replicator –cluster.id <new-cluster-id> \ –producer.config replicator_producer.properties \ –consumer.config replicator_consumer.properties \ –replication.config ./etc/kafka-connect-replicator/quickstart-replicator.properties Copy

Replicator will use the committed offsets by MirrorMaker from DC1 and start replicating messages from DC1 to DC2 based on these offsets.

Replicator Overview

Replicator

Confluent Replicator allows you to easily and reliably replicate topics from one Kafka cluster to another. In addition to copying the messages, Replicator will create topics as needed preserving the topic configuration in the source cluster. This includes preserving the number of partitions, the replication factor, and any configuration overrides specified for individual topics. Replicator is implemented as a connector.

Features

Replicator supports the following features:

  • Topic selection using whitelists, blacklists, and regular expressions.
  • Dynamic topic creation in the destination cluster with matching partition counts, replication factors, and topic configuration overrides.
  • Automatic resizing of topics when new partitions are added in the source cluster.
  • Automatic reconfiguration of topics when topic configuration changes in the source cluster.
  • Timestamp Preservation, Using Provenance Headers to Prevent Duplicates or Cyclic Message Repetition, and Consumer Offset Translation (supported on Confluent Platform 5.0.1 and later).
  • You can migrate from MirrorMaker to Replicator on existing datacenters (Confluent Platform 5.0.0 and later). Migration from MirrorMaker to Replicator is not supported in earlier versions of Confluent Platform (pre 5.5.0).
  • At least once delivery, meaning the Replicator connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Multi-Datacenter Use Cases

Replicator can be deployed across clusters and in multiple datacenters. Multi-datacenter deployments enable use-cases such as:

  • Active-active geo-localized deployments: allows users to access a near-by datacenter to optimize their architecture for low latency and high performance
  • Active-passive disaster recover (DR) deployments: in an event of a partial or complete datacenter disaster, allow failing over applications to use Confluent Platform in a different datacenter.
  • Centralized analytics: Aggregate data from multiple Apache Kafka® clusters into one location for organization-wide analytics
  • Cloud migration: Use Kafka to synchronize data between on-prem applications and cloud deployments

Replication of events in Kafka topics from one cluster to another is the foundation of Confluent’s multi datacenter architecture.

Replication can be done with Confluent Replicator or using the open source Kafka MirrorMaker. Replicator can be used for replication of topic data as well as migrating schemas in Schema Registry.

This documentation focuses on Replicator, including architecture, quick start tutorial, how to configure and run Replicator in different contexts, tuning and monitoring, cross-cluster failover, and more. A section on how to migrate from MirrorMaker to Replicator is also included.

Some of the general thinking on deployment strategies can also apply to MirrorMaker, but if you are primarily interested in MirrorMaker, see Mirroring data between clusters in the Kafka documentation.

Architecture

The diagram below shows the Replicator architecture. Replicator uses the Kafka Connect APIs and Workers to provide high availability, load-balancing and centralized management.../../_images/replicator_components.png

Replicator Architecture

Tip

You can deploy Replicator near the destination cluster or the origin cluster, and it will work either way. However, a best practice is to deploy Replicator closer to the destination cluster for reliability and performance over networks. Therefore, if the destination cluster is Confluent Cloud, we recommend that you deploy Replicator on an instance in the same region as your Confluent Cloud cluster. However, if the origin cluster does not permit external connections, you may deploy Replicator in the origin cluster. (See also Migrate Topics on Confluent Cloud Clusters.)

Example Deployment

In a typical multi-datacenter deployment, data from two geographically distributed Kafka clusters located in separate datacenters is aggregated in a separate cluster located in another datacenter. The origin of the copied data is referred to as the “source” cluster while the target of the copied data is referred to as the “destination.”

Each source cluster requires a separate instance of Replicator. For convenience you can run them in the same Connect cluster, located in the aggregate datacenter.

Replication to an Aggregate Cluster

Guidelines for Getting Started

Follow these guidelines to configure a multi-datacenter deployment using Replicator:

  1. Use the Replicator quick start to set up replication between two Kafka clusters.
  2. Learn how to install and configure Replicator and other Confluent Platform components in multi datacenter environments.
  3. Before running Replicator in production, make sure you read the monitoring and tuning guide.
  4. For a practical guide to designing and configuring multiple Kafka clusters to be resilient in case of a disaster scenario, see the Disaster Recovery white paper. This white paper provides a plan for failover, failback, and ultimately successful recovery.

Demos and Examples

After completing the Replicator quick start, explore these hands-on working examples of Replicator in multi-datacenter deployments, for which you can download the demo from GitHub and run yourself. Refer to the diagram below to determine the Replicator examples that correspond to your deployment scenario.

../../_images/replicator-demos.png
  1. Kafka on-premises to Kafka on-premises
    • Replicator Demo on Docker: fully-automated example of an active-active multi-datacenter design with two instances of Replicator copying data bidirectionally between the datacenters
    • Schema translation: showcases the transfer of schemas stored in Schema Registry from one cluster to another using Replicator
    • Confluent Platform demo: deploy a Kafka streaming ETL, along with Replicator to replicate data
  2. Kafka on-premises to Confluent Cloud
  3. Kafka in GKE to Confluent Cloud
  4. Confluent Cloud to Confluent Cloud

Topic Renaming

By default, the replicator is configured to use the same topic name in both the source and destination clusters. This works fine if you are only replicating from a single cluster. When copying data from multiple clusters to a single destination (i.e. the aggregate use case), you should use a separate topic for each source cluster in case there are any configuration differences between the topics in the source clusters.

It is possible to use the same Kafka cluster as the source and destination as long as you ensure that the replicated topic name is different. This is not a recommended pattern since generally you should prefer Kafka’s built-in replication within the same cluster, but it may be useful in some cases (e.g. testing).

Starting with Confluent Platform 5.0, Replicator protects against circular replication through the use of provenance headers. This guarantees that if two Replicator instances are configured to run, one replicating from DC1 to DC2 and the second instance configured to replicate from DC2 to DC1, Replicator will ensure that messages replicated to DC2 are not replicated back to DC1, and vice versa. As a result, Replicator safely runs in each direction.

Although Replicator can enable applications in different datacenters to access topics with the same names, you should design client applications with a topic naming strategy that takes into consideration a number of factors.

If you plan to have the same topic name span datacenters, be aware that in this configuration:

  • Producers do not wait for commit acknowledgment from the remote cluster, and Replicator asynchronously copies the data between datacenters after it has been committed locally.
  • If there are producers in each datacenter writing to topics of the same name, there is no “global ordering”. This means there are no message ordering guarantees for data that originated from producers in different datacenters.
  • If there are consumer groups in each datacenter with the same group ID reading from topics of the same name, in steady state, they will be reprocessing the same messages in each datacenter.

In some cases, you may not want to use the same topic name in each datacenter. For example, in cases where:

  • Replicator is running a version less than 5.0.1
  • Kafka brokers are running a version prior to Kafka 0.11 that does not yet support message headers
  • Kafka brokers are running Kafka version 0.11 or later but have less than the minimum required log.message.format.version=2.0 for using headers
  • Client applications are not designed to handle topics with the same name across datacenters

In these cases, refer to the appendix on “Topic Naming Strategies to Prevent Cyclic Repetition” in the Disaster Recovery white paper.

Periodic Metadata Updates

The replicator periodically checks topics in the source cluster to tell whether there are any new topics which need to be replicated, and whether there are any configuration changes (e.g. increases in the number of partitions). The frequency of this checking is controlled with the metadata.max.age.ms setting in the connector configuration. The default is set to 2 minutes, which is intended to provide reasonable responsiveness to configuration changes while ensuring that the connector does not add any unnecessary load on the source cluster. You can lower this setting to detect changes quicker, but it’s probably not advisable as long as topic creation/reconfiguration is relatively rare (as is most common).

Leave a Reply

Your email address will not be published. Required fields are marked *