FlinkSQL 实时数据同步

news/发布时间2024/5/3 9:35:36

准备工作

  • MySQL 数据库(version: 5.7.25),注意,MySQL 数据库版本必须大于 5.6,否则不支持。

  • 开启 MySQL 的 log-bin:

    [mysqld]
    # Binary Logging.
    log-bin=mysql-bin
    server-id=1
    
  • Flink (version : 1.15.4)

  • 添加 flink-connector-jdbc-1.15.4.jarflink-sql-connector-mysql-cdc-2.4.0.jar${FLINK_HOME}/lib

数据同步

准备待同步源端表和目标端表

创建源端表:player_scores

use cdc_test_source;CREATE TABLE `player_scores`  (`player` varchar(255),`team` varchar(255),`score` int(11),PRIMARY KEY (`player`)
);

准备目标端表

在目标端创建和源端同构的表:player_scores

use cdc_test_target;CREATE TABLE `player_scores`  (`player` varchar(255),`team` varchar(255),`score` int(11),PRIMARY KEY (`player`)
);
./bin/start-cluster.sh

启动 sql-client

./bin/sql-client.sh

数据同步任务创建

  • 创建源端表对应的逻辑表

    CREATE TABLE source_dest (`player` STRING,`team` STRING,`score` INT,PRIMARY KEY (`player`) NOT ENFORCED
    ) WITH ('connector' = 'mysql-cdc','hostname' = '10.4.45.207','port' = '3306','username' = 'username','password' = 'password','database-name' = 'cdc_test_source','table-name' = 'player_scores'
    );
    
  • 创建目标端表对应的逻辑表

    CREATE TABLE sink_dest (`player` STRING,`team` STRING,`score` INT,PRIMARY KEY (`player`) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://10.4.45.207:3306/cdc_test_target','username' = 'username','password' = 'password','table-name' = 'player_scores','sink.parallelism' = '1'
    );
    
  • 建立源端逻辑表和目标端逻辑表的连接

    INSERT INTOsink_dest (player, team, score)
    SELECTplayer,team,score
    FROMsource_dest;
    
  • 任务创建成功:

  • Flink Web 查看提交的任务:

  • 源端表中插入测试数据

    INSERT INTO `cdc_test_source`.`player_scores` (`player`, `team`, `score`) VALUES ('Kobe', 'Lakers', 3);
    
  • 查看目标端表是否自动完成同步

  • 源端表中进行更新、删除操作,查看目标端表是否自动完成同步

遇到的问题

执行 FlinkSQL 报错

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
filesystem
jdbc
oracle-cdc
print
python-input-format

错误原因

缺少 flink-connector-jdbc 相关类文件。

解决方案

添加 flink-connector-jdbc-1.15.4.jarflink-sql-connector-mysql-cdc-2.4.0.jar 重启后解决。

MySQL 开启 bin-log 报错

[ERROR] You have enabled the binary log, but you haven’t provided the mandatory server-id. Please refer to the proper server start-up parameters documentation 
2016-09-03T03:17:51.815890Z 0 [ERROR] Aborting

报错原因

在设置 bin-log 日志的时候,没有设置 server_id 参数。server-id 参数用于在复制中,为主库和备库提供一个独立的 ID,以区分主库和备库;

开启二进制文件的时候,需要设置这个参数。

解决方案

修改 MySQL 配置文件 my.ini (windows) / my.cnf (linux)

[mysqld]
# Binary Logging.
log-bin=mysql-bin
server-id=1

重启 MySQL 服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ulsteruni.cn/article/22356615.html

如若内容造成侵权/违法违规/事实不符,请联系编程大学网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

关于虚拟机内存和JVM内存设置的思考

关于虚拟机内存和JVM内存设置的思考背景 最近有同事总问JVM的设置问题. 之前总结过不少. 但是感觉没法讲对方说服 当然了, 自己能力有限, 只能自说自话. 现在这个就是留存一个底稿. 希望能人能帮忙解释关于内存和CPU的观点 CPU的能力有上限. 一般情况下不建议让CPU处于高峰作业…

04-drf视图层详细

drf的request请求 这里的request请求是基于APIView的,也就是新的request 正常情况下,request请求分为:urlcoded、json、form-data,可以控制只接受哪一个请求导入模块 from rest_framework.parsers import JSONParser, MultiPartParser, FormParser模块 描述 请求JSONParser…

缓存数据“消失”之谜

吃一堑,长一智。吃一堑,长一智。“邪门!真是邪门!”自从踏入 Go 的领域之后,奇事怪事接连不断。很多看上去似乎没啥问题的代码,可就是有问题,可怎么也看不出问题所在。问题背景 事情是这样的:有两个流程和一个缓存数据: 流程一:接收 kafka 数据,解析模型数据,并存入…

Python调用微信OCR识别文字和坐标

原理 在看雪看到一篇文章:逆向调用QQ截图NT与WeChatOCR-软件逆向。里面说了怎么调用微信和QQ本地的OCR模型,还有很详细的分析过程。 我稍微看了下文章,多的也看不懂。大概流程是使用mmmojo.dll这个dll来与WeChatOCR.exe做通信的,也是用它来启动和关闭WeChatOCR.exe进程的。…

泰国股票盘搭建【TG:@Gangguhk】

功能最强大的股票配资系统 我们的股票配资系统是由拥有10年项目开发经验的资深技术人员,针对股票配资市场情况及股票投资者需要而精心研发,可同时运行于手机端、电脑端的多屏杠杆融资风控管理系统。功能包括自设配资额度、多级代理、交易管理、客户管理、警戒平仓、系统监控、…

openGauss AI4DB-数据库自治运维

AI4DB: 数据库自治运维 如上文所述,AI4DB主要用于对数据库进行自治运维和管理,从而帮助数据库运维人员减少运维工作量。在实现上,DBMind的AI4DB框架具有监控和服务化的性质,同时也提供即时AI工具包,提供开箱即用的AI运维功能(如索引推荐)。AI4DB的监控平台以开源的Prome…