跳到主要内容
版本: Next

CDCSOURCE 整库同步

设计背景

目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。

Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等

原理

source 合并

source_merge

面对建立的数据库连接过多,Binlog 重复读取会造成源库的巨大压力,上文分享采用了 source 合并的优化,尝试合并同一作业中的 source,如果都是读的同一数据源,则会被合并成一个 source 节点。

​ Dinky 采用的是只构建一个 source,然后根据 schema、database、table 进行分流处理,分别 sink 到对应的表。

元数据映射

Dinky 是通过自身的数据源中心的元数据功能捕获源库的元数据信息,并同步构建 sink 阶段 datastream 或 tableAPI 所使用的 FlinkDDL。

meta_mapping

多种 sink 方式

Dinky 提供了各式各样的 sink 方式,通过修改语句参数可以实现不同的 sink 方式。Dinky 支持通过 DataStream 来扩展新的 sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。

sink

EXECUTE CDCSOURCE 基本使用

CDCSOURCE 语句用于将上游指定数据库的所有表的数据采用一个任务同步到下游系统。整库同步默认支持 Standalone、Yarn Session、Yarn Per job、K8s Session

说明

# 将下面 Dinky根目录下 整库同步依赖包放置 $FLINK_HOME/lib下
jar/dlink-client-base-${version}.jar
jar/dlink-common-${version}.jar
lib/dlink-client-${version}.jar

语法结构

EXECUTE CDCSOURCE jobname 
WITH ( key1=val1, key2=val2, ...)

With 参数说明

WITH 参数通常用于指定 CDCSOURCE 所需参数,语法为'key1'='value1', 'key2' = 'value2'的键值对。

配置项

配置项是否必须默认值说明
connector指定要使用的连接器,当前支持 mysql-cdc 及 oracle-cdc
hostname数据库服务器的 IP 地址或主机名
port数据库服务器的端口号
username连接到数据库服务器时要使用的数据库的用户名
password连接到数据库服务器时要使用的数据库的密码
scan.startup.modelatest-offset消费者的可选启动模式,有效枚举为“initial”和“latest-offset”
database-name如果table-name="test\.student,test\.score",此参数可选。
table-name支持正则,示例:"test\.student,test\.score"
source.*指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。
checkpoint单位 ms
parallelism任务并行度
sink.connector指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式
sink.sink.db目标数据源的库名,不指定时默认使用源数据源的库名
sink.table.prefix目标表的表名前缀,如 ODS 即为所有的表名前拼接 ODS
sink.table.suffix目标表的表名后缀
sink.table.upper目标表的表名全大写
sink.table.lower目标表的表名全小写
sink.*目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名
sink[N].*N代表为多目的地写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置.

示例

实时数据合并至一个 kafka topic

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector'='datastream-kafka',
'sink.topic'='dlinkcdc',
'sink.brokers'='127.0.0.1:9092'
)

实时数据同步至对应 kafka topic

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector'='datastream-kafka',
'sink.brokers'='127.0.0.1:9092'
)

实时数据 DataStream 入仓 Doris

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'datastream-doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.sink.enable-delete' = 'true'
)

实时数据 FlinkSQL 入仓 Doris

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.enable-delete' = 'true'
)

实时数据入湖 Hudi

EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'source.server-time-zone' = 'UTC',
'checkpoint'='1000',
'scan.startup.mode'='initial',
'parallelism'='1',
'database-name'='data_deal',
'table-name'='data_deal\.stu,data_deal\.stu_copy1',
'sink.connector'='hudi',
'sink.path'='hdfs://cluster1/tmp/flink/cdcdata/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='id',
'sink.hoodie.parquet.max.file.size'='268435456',
'sink.write.precombine.field'='update_time',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.db'='cdc_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true',
'sink.hive_sync.metastore.uris'='thrift://cdh.com:9083',
'sink.hive_sync.username'='flinkcdc'
)

同时将CDCSOURCE数据写入到Doirs和Kafka

EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink[0].connector' = 'doris',
'sink[0].fenodes' = '127.0.0.1:8030',
'sink[0].username' = 'root',
'sink[0].password' = 'dw123456',
'sink[0].sink.batch.size' = '1',
'sink[0].sink.max-retries' = '1',
'sink[0].sink.batch.interval' = '60000',
'sink[0].sink.db' = 'test',
'sink[0].table.prefix' = 'ODS_',
'sink[0].table.upper' = 'true',
'sink[0].table.identifier' = '${schemaName}.${tableName}',
'sink[0].sink.enable-delete' = 'true',
'sink[1].connector'='datastream-kafka',
'sink[1].topic'='dlinkcdc',
'sink[1].brokers'='127.0.0.1:9092'
)
说明
  • 按照示例格式书写,且一个 FlinkSQL 任务只能写一个 CDCSOURCE。
  • 配置项中的英文逗号前不能加空格,需要紧随右单引号。
  • 禁用全局变量、语句集、批模式。

目前已经支持 application ,需提前准备好相关jar包,或者 和 add jar语法并用。以 mysql cdc-2.3.0 - flink-1.14 为例,需要如下jar

  • flink-shaded-guava-18.0-13.0.jar
  • HikariCP-4.0.3.jar
  • druid-1.2.8.jar
  • dlink-metadata-mysql-0.7.0.jar
  • dlink-metadata-base-0.7.0.jar
  • jackson-datatype-jsr310-2.13.4.jar
  • flink-sql-connector-mysql-cdc-2.3.0.jar
  • dlink-client-1.14-0.7.0.jar cdcsource_example.png