使用 INFINI Console 实现 Elasticsearch 的增量数据迁移

功能介绍 #在 INFINI Console 1.3.0 版本里,数据迁移功能增加了对增量迁移的支持。这篇文章将会介绍增量迁移的具体使用方法和实现原理。


【资料图】

场景介绍 #以常见的日志场景为例,假设 A 集群有一个用来记录线上 HTTP 请求记录的索引 request-logs,数据结构如下:

{"request_body": {...},"request_header": {...},"method": "POST","request_time": "2023-06-09 12:30:09+800" // 客户端记录请求的时间"@timestamp": "2023-06-09 12:30:11+800" // 请求写入ES的时间}我们希望完整导入这个索引的数据到另外一个集群 B 的 request-logs。要想确保导入的数据完整,我们首先需要考虑数据写入的延迟问题:

A 集群的数据写入可能会有延迟。日志往往是从不同的节点收集异步上传的,考虑到网络环境波动等情况,最终日志写入 Elasticserach 的时间会有差异。

写入 Elasticsearch 的数据并不会立刻对查询请求可见,Elasticsearch 会异步刷新写入的数据。

也就是说,我们假设每一条请求日志从采集到写入 ES 到最终可以被查询的延迟为d,每次进行增量迁移的时候,我们可以完整迁移的数据范围就是[当前时间 - 上次迁移的时间 - d, 当前时间 - d)。只要数据写入的延迟不超过d,我们就可以从集群 A 查询到完整的数据集写入集群 B。

集群 A 的数据有更新操作? #在上述的日志场景里,我们通常不会对写入的日志文档进行后续的更新操作,每一条文档写入后都是不可变的,我们只需要筛选@timestamp字段就可以找到需要迁移的数据了,而且每条数据只需要迁移一次就可以确保目标集群的数据一致。

如果源数据有更新,那我们应该如何进行增量迁移呢?通常情况下,每次更新操作我们都会记录文档更新的时间到update_time字段,这样我们就可以使用update_time字段来进行增量数据的迁移。

假设在第一次迁移的时候,索引 A 存有以下数据,我们在进行第一次迁移操作后,数据可以完整写入目标索引:

如果在第二次迁移之前,索引 A 中有一条旧的记录被更新,这个时候,迁移流程可以通过update_time字段检测到这一条数据,复制并覆盖目标集群的旧数据记录:

可以看到,即使源数据有更新,只要我们记录了每一条数据的更新时间,迁移过程最后写入集群 B 的数据依然是完整且一致的。

集群 A 的数据有删除操作? #如果源集群的数据有删除操作,基于上述的数据迁移逻辑,第二次迁移过程是无法判断已经迁移的数据是否被删除的:

如果我们希望迁移的数据完整,需要避免对源集群的数据进行删除操作。我们可以标记文档为deleted,但是不做实际删除操作,这样我们就可以通过文档删除(更新)操作的时间来进行完整的数据迁移:

使用 INFINI Console 来进行增量数据的迁移 #在 INFINI Console 里,我们可以使用 数据工具 - 数据迁移 功能来对数据进行增量迁移。作为示例,我们新建一个数据迁移任务,把.infini_requests_logging-000002索引的数据迁移到目标集群的request索引。

我们需要迁移到一个新创建的索引,在Initialize Configuration步骤,根据提示配置目标索引的mapping和setting信息。如果不需要特殊配置,可以使用Auto Optimize功能自动填充。

接下来我们需要配置数据写入的更新字段和写入的延迟。请求日志通过timestamp字段记录请求时间,通常延迟不会超过 1 分钟,我们在Migrate Setting步骤里配置相应的Incremental信息。如果历史数据量比较大或者增量任务的运行间隔较长,我们也可以配置Partition分区规则来拆分任务为更细的粒度,避免因单个任务长时间导出数据对 ES 产生过高的负载。

最后,在创建任务时,我们勾选Detect Incremental Data,然后设置任务每 15 分钟检测一次增量数据。

点击开始后,增量任务就会开始运行。第一次运行时会对历史数据进行全量迁移,后续每 15 分钟会自动检测新数据并迁移到目标索引。

总结 #除了数据一致性,迁移功能的设计也需要兼顾性能、稳定性、ES 版本兼容性等,INFINI Console 提供了一套操作简便的数据迁移解决方案,可以用于各种场景的数据迁移需求。

关键词: