Kafka单表离线同步至MaxCompute

本文以将Kafka单表离线同步至MaxCompute为例,为您介绍Kafka的分钟、小时、天增量数据定时调度写入MaxCompute小时、天分区表的配置详情。

注意事项

  • Kafka的版本需要大于等于0.10.2小于等于2.2.x,且Kafka启用了记录时间戳,并且记录带有正确的业务时间戳。

  • 增量数据开始同步后,如果仍有时间戳小于等于起始时间的记录写入Kafka Topic的话,这些数据可能被漏读,所以当Kafka Topic中数据写入出现延迟或者时间戳乱序时,要注意对离线同步任务造成的数据漏读风险。

  • Kafka侧参数同步结束策略原则上只有满足以下条件可以选择1分钟读取不到新数据,否则存在数据漏读风险。

    • Kafka Topic中部分或全部分区存在长时间(10分钟以上)无数据写入情况。

    • 每个周期实例启动后,不会有时间戳小于结束时间参数的记录写入Kafka Topic。

前提条件

使用限制

暂不支持将源端数据同步至MaxCompute外部表。

操作步骤

说明

本文以数据开发(Data Studio)(新版)界面操作为例,演示离线同步任务配置。

一、创建离线同步节点

  1. 进入DataWorks工作空间列表页,在顶部切换至目标地域,找到已创建的工作空间,单击操作列的快速进入 > Data Studio,进入Data Studio。

  2. 在左侧导航栏单击image,然后单击项目目录右侧的image,选择新建节点 > 数据集成 > 离线同步,自定义离线同步任务节点名称后,单击确认

二、网络与资源配置

  1. 网络与资源配置步骤,选择同步任务所使用的数据来源资源组数据去向。您可以为该任务分配任务资源占用CU数。

    • 数据来源选择已添加的Kafka数据源。

    • 数据去向选择已添加的MaxCompute数据源。

    • 资源组选择已与KafkaMaxCompute数据源连通的资源组。您可以为该任务分配任务资源占用CU数。

  2. 数据来源数据去向卡片中单击测试连通性

    image

  3. 确保数据来源与数据去向均连通成功后,单击下一步

三、配置数据来源与去向

配置数据来源(Kafka)参数

本文档将Kafka数据单表离线同步至MaxCompute,数据来源为Kafka文件,配置要点如下。

image

说明

通用的Kafka数据来源的配置项介绍可查看Kafka Reader文档,以下为本次实践的配置参考。

配置项

配置要点

主题

选择待同步的Kafka Topic。如果您使用的是标准模式的DataWorks,需要在对应开发和生产环境的Kafka集群中有同名的Topic,此处的主题即选择此同名的Topic即可。

说明

如果:

  • 开发环境的Topic不存在:则此处配置离线同步节点的主题时,下拉框中无法搜索到待同步的Topic。

  • 生产环境的Topic不存在:则离线同步任务配置完,提交发布后,在生产环境周期调度运行时会因为没法找到待同步的表而导致任务失败。

消费群组ID

根据业务需要填写,确保在Kafka集群侧唯一,便于在Kafka集群侧统计和监控消费情况。

读取起始位点起始时间读取结束位点结束时间

读取起始点位读取结束位点选择指定时间起始时间结束时间分别设置为调度参数${startTime}${endTime}

这几个参数明确了后续同步数据时从哪个数据开始同步,同步到哪个数据同步任务结束,本实践的配置表明${startTime}时间的数据开始同步,一直到${endTime}时间的数据结束。${startTime}${endTime}在同步任务实际运行时会根据调度配置做参数替换。

时区

可置空或选择默认使用DataWorks所在地域的服务器时区。

说明

如果您此前有联系阿里云技术支持修改过调度时区,这里可选择您修改后的时区。

键类型值类型编码

根据Kafka Topic记录实际情况选择。

同步结束策略

同步结束策略如果满足以下条件可以选择1分钟读取不到新数据,否则选择到达指定结束位点

  • Kafka Topic中部分或全部分区存在长时间(10分钟以上)无数据写入情况。

  • 每个周期实例启动后,不会有时间戳小于结束时间参数的记录写入Kafka Topic。

高级配置

保持默认即可。

配置数据去向(MaxCompute)参数

本文档将Kafka数据单表离线同步至MaxCompute,数据去向为表,配置要点如下。

image

说明

下表中未说明参数保持默认即可。

配置项

配置要点

数据源

默认显示上一步选择的MaxCompute数据源。如果您使用的是标准模式的DataWorks工作空间,会分别显示开发和生产项目的名称。

选择待同步的MaxCompute表。如果您使用的是标准类型的DataWorks工作空间,请确保在MaxCompute的开发环境和生产环境中存在同名且表结构一致的MaxCompute表。

您也可以单击意见生成目标表结构,系统将自动创建表接收数据,支持手动调整建表语句。

说明

如果:

  • 开发环境不存在待同步的MaxCompute表,则选择此处配置离线同步节点的去向表的下拉框中无法搜到待同步表。

  • 生产环境不存在待同步的MaxCompute表,同步任务提交发布后,数据同步任务调度运行时将会由于无法找到待同步表而导致同步任务运行失败。

  • 开发环境和生产环境的表结构不一致,同步任务提交发布后,同步任务实际调度运行时的列对应关系,可能与此处离线同步节点配置的列对应关系不一致,最终导致数据写入不正确。

分区信息

如果表为分区表,您可以填入分区列的取值。

  • 取值可以是固定值,如ds=20220101

  • 取值可以是调度系统参数,如ds=${partition},当任务运行时,会自动替换调度系统参数。

四、配置字段映射

选择数据来源和数据去向后,需要指定读取端和写入端列的映射关系。您可以选择同名映射同行映射取消映射手动编辑映射关系

  • Kafka侧字段中默认的6个字段。

    字段名

    含义

    __key__

    Kafka记录的Key。

    __value__

    Kafka记录的Value。

    __partition__

    Kafka记录所在分区号,分区号为从0开始的整数。

    __headers__

    Kafka记录的dHeaders。

    __offset__

    Kafka记录在所在分区的偏移量,偏移量为从0开始的整数。

    __timestamp__

    Kafka记录的13位整数毫秒时间戳。

  • Kafka侧字段可自定义配置JSON解析,可以通过.(获取子字段)[](获取数组元素)两种语法,获取Kafka记录JSON格式的value字段内容。

    重要

    如果JSON字段名中带有"."字符,由于会引发字段定义语法歧义,无法通过字段定义获取字段值。

    Kafka某条JSON格式的记录value的数据示例如下。

    {
          "a": {
          "a1": "hello"
          },
          "b": "world",
          "c":[
                "xxxxxxx",
                "yyyyyyy"
                ],
          "d":[
                {
                      "AA":"this",
                      "BB":"is_data"
                },
                {
                      "AA":"that",
                      "BB":"is_also_data"
                }
            ],
         "a.b": "unreachable"
    }
    • 如果同步a1的数据“hello”,Kafka侧字段增加a.a1

    • 如果同步b的数据“world”,Kafka侧字段增加b

    • 如果同步c的数据“yyyyyyy”,Kafka侧字段增加c[1]

    • 如果同步AA的数据“this”,Kafka侧字段增加d[0].AA

    • Kafka侧字段定义增加a.b无法同步数据"unreachable"

  • 允许源头表字段或目标表字段存在不参与映射的字段,源头表不参与映射的字段同步实例不会读取,目标端不参与映射的字段将写入NULL。

  • 不允许一个源头表字段映射到多个目标表字段,也不允许一个目标表字段映射到多个目标表字段

五、配置通道控制

离线同步任务支持设置任务期望最大并发数脏数据策略等。本教程脏数据策略配置为不容忍脏数据,其他配置保持默认。更多信息,请参见通过向导模式配置离线同步任务

六、调试配置并运行

  1. 单击离线同步节点编辑页面右侧的调试配置,设置调试运行使用的资源组脚本参数,然后单击顶部工具栏的运行,测试同步链路是否成功运行。

  2. 您可以在左侧导航栏单击image,然后单击个人目录右侧的image,创建一个后缀为.sql的文件,执行如下SQL查询数据去向表中的数据是否符合预期。

    说明
    SELECT * FROM <MaxCompute侧目标表名> WHERE pt=<指定分区> LIMIT 20;

七、调度配置与发布

单击离线同步任务右侧的调度配置,设置周期运行所需的调度配置参数后,单击顶部工具栏的发布,进入发布面板,根据页面提示完成发布

在上述配置数据来源与数据去向时,使用了三个调度参数:${startTime}${endTime}${partition},在此处调度配置中需根据实际同步需求指定这三个调度参数的替换策略,以下为几个典型场景的配置示例。

典型场景

推荐配置

场景示例说明

同步任务每5分钟调度一次

  • startTime=$[yyyymmddhh24mi-8/24/60]00

  • endTime=$[yyyymmddhh24mi-3/24/60]00

  • partition=$[yyyymmddhh24mi-8/24/60]

如果同步任务在2022-11-22 10:00被调度启动,则:

  • 会同步Kafka Topic中时间戳范围在2022-11-22 09:52(含)到2022-11-22 09:57(不含)的记录。

  • 同步的Kafka数据写入MaxCompute202211220952分区中。

  • endTime设置比实例调度时间($[yyyymmddhh24mi])早三分钟是为了确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中,避免漏读。

同步任务每小时调度一次

  • startTime=$[yyyymmddhh24-1/24]0000

  • endTime=$[yyyymmddhh24]0000

  • partition=$[yyyymmddhh24]

说明
  • 同步任务每2小时调度一次时,startTime=$[yyyymmddhh24-2/24]0000,另外调度参数保持不变。

  • 同步任务每3小时调度一次时,startTime=$[yyyymmddhh24-3/24]0000,另外调度参数保持不变。

  • 以此类推其他以小时为调度周期的场景下,调度参数的配置结果。

如果同步任务在2022-11-22 10:05被调度启动,则:

  • 会同步Kafka Topic中时间戳范围在2022-11-22 9:00(含)到2022-11-22 10:00(不含)的记录。

  • 同步的Kafka数据写入MaxCompute2022112210分区中。

同步任务每天调度一次

  • startTime=$[yyyymmdd-1]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

如果同步任务在2022-11-22 00:05被调度启动,则:

  • 会同步Kafka Topic中时间戳范围在2022-11-21 00:00(含)到2022-11-22 00:00(不含)的记录。

  • 同步的Kafka数据写入MaxCompute20221121分区中。

同步任务每周调度一次

  • startTime=$[yyyymmdd-7]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

如果同步任务在2022-11-22 00:05被调度启动,则:

  • 会同步Kafka Topic中时间戳范围在2022-11-15 00:00(含)到2022-11-22 00:00(不含)的记录。

  • 同步的Kafka数据写入MaxCompute20221121分区中。

同步任务每月调度一次

  • startTime=$[add_months(yyyymmdd,-1)]000000

  • endTime=$[yyyymmdd]000000

  • partition=$[yyyymmdd-1]

如果同步任务在2022-11-22 00:05被调度启动,则:

  • 会同步Kafka Topic中时间戳范围在2022-10-22 00:00(含)到2022-11-22 00:00(不含)的记录。

  • 同步的Kafka数据写入MaxCompute20221121分区中。

根据希望的调度间隔,设置调度周期。

典型场景

推荐配置

场景示例说明

同步任务每5分钟调度一次

  • 调度周期:分钟

  • 开始时间:00:00

  • 时间间隔:05分钟

  • 结束时间:23:59

同步任务每小时调度一次

  • 调度周期:小时

  • 开始时间:00:15

  • 时间间隔:1小时

  • 结束时间:23:59

开始时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

同步任务每天调度一次

  • 调度周期:天

  • 定时调度时间:00:15

定时调度时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

同步任务每周调度一次

  • 调度周期:周

  • 指定时间:星期一

  • 定时调度时间:00:15

定时调度时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

同步任务每月调度一次

  • 调度周期:月

  • 指定时间:每月1

  • 定时调度时间:00:15

定时调度时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

重要

如果出现实例启动后,仍有时间戳小于等于起始时间的记录写入Kafka Topic,则这些数据可能被漏读,所以当Kafka Topic中数据写入出现延迟或者时间戳乱序时,要注意对离线同步任务造成的数据漏读风险。