准备工作
-
MySQL 数据库(version:
5.7.25
),注意,MySQL 数据库版本必须大于 5.6,否则不支持。 -
开启 MySQL 的 log-bin:
[mysqld] # Binary Logging. log-bin=mysql-bin server-id=1
-
Flink (version :
1.15.4
) -
添加
flink-connector-jdbc-1.15.4.jar
和flink-sql-connector-mysql-cdc-2.4.0.jar
到${FLINK_HOME}/lib
。
数据同步
准备待同步源端表和目标端表
创建源端表:player_scores
use cdc_test_source;CREATE TABLE `player_scores` (`player` varchar(255),`team` varchar(255),`score` int(11),PRIMARY KEY (`player`)
);
准备目标端表
在目标端创建和源端同构的表:player_scores
use cdc_test_target;CREATE TABLE `player_scores` (`player` varchar(255),`team` varchar(255),`score` int(11),PRIMARY KEY (`player`)
);
启动 Flink
./bin/start-cluster.sh
启动 sql-client
./bin/sql-client.sh
数据同步任务创建
-
创建源端表对应的逻辑表
CREATE TABLE source_dest (`player` STRING,`team` STRING,`score` INT,PRIMARY KEY (`player`) NOT ENFORCED ) WITH ('connector' = 'mysql-cdc','hostname' = '10.4.45.207','port' = '3306','username' = 'username','password' = 'password','database-name' = 'cdc_test_source','table-name' = 'player_scores' );
-
创建目标端表对应的逻辑表
CREATE TABLE sink_dest (`player` STRING,`team` STRING,`score` INT,PRIMARY KEY (`player`) NOT ENFORCED ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://10.4.45.207:3306/cdc_test_target','username' = 'username','password' = 'password','table-name' = 'player_scores','sink.parallelism' = '1' );
-
建立源端逻辑表和目标端逻辑表的连接
INSERT INTOsink_dest (player, team, score) SELECTplayer,team,score FROMsource_dest;
-
任务创建成功:
-
Flink Web 查看提交的任务:
-
源端表中插入测试数据
INSERT INTO `cdc_test_source`.`player_scores` (`player`, `team`, `score`) VALUES ('Kobe', 'Lakers', 3);
-
查看目标端表是否自动完成同步
-
源端表中进行更新、删除操作,查看目标端表是否自动完成同步
遇到的问题
执行 FlinkSQL 报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
filesystem
jdbc
oracle-cdc
print
python-input-format
错误原因
缺少 flink-connector-jdbc 相关类文件。
解决方案
添加 flink-connector-jdbc-1.15.4.jar
和 flink-sql-connector-mysql-cdc-2.4.0.jar
重启后解决。
MySQL 开启 bin-log 报错
[ERROR] You have enabled the binary log, but you haven’t provided the mandatory server-id. Please refer to the proper server start-up parameters documentation
2016-09-03T03:17:51.815890Z 0 [ERROR] Aborting
报错原因
在设置 bin-log 日志的时候,没有设置 server_id 参数。server-id 参数用于在复制中,为主库和备库提供一个独立的 ID,以区分主库和备库;
开启二进制文件的时候,需要设置这个参数。
解决方案
修改 MySQL 配置文件 my.ini (windows) / my.cnf (linux)
[mysqld]
# Binary Logging.
log-bin=mysql-bin
server-id=1
重启 MySQL 服务。