百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

flink-cdc同步mysql数据到elasticsearch

nanshan 2025-03-07 22:24 12 浏览 0 评论

什么是CDC?

CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1. 环境准备

  • mysql
  • elasticsearch
  • flink on yarn
  • 说明:如果没有安装hadoop,那么可以不用yarn,直接用flink standalone环境吧。

2. 下载下列依赖包

下面两个地址下载flink的依赖包,放在lib目录下面。

  1. flink-sql-connector-elasticsearch7_2.11-1.13.5.jar
  2. flink-sql-connector-mysql-cdc-1.4.0.jar

这里
flink-sql-connector-mysql-cdc,在这里只能下到最新版1.4:

可以自行
https://github.com/ververica/flink-cdc-connectors下载新版mvn clean install -DskipTests 自己编译。

这是我编译的最新版2.2,传上去发现太新了,如果重新换个版本,我得去gitee下载源码,不然github速度太慢了,然后用IDEA编译打包,又得下载一堆依赖。我投降,我直接去网上下载了个1.4的直接用了。

我下载的jar包,放在flink的lib目录下面:

flink-sql-connector-elasticsearch7_2.11-1.13.5.jar
flink-sql-connector-mysql-cdc-1.4.0.jar

3. 启动flink-sql client

1) 先在yarn上面启动一个application,进入flink13.5目录,执行:

bin/yarn-session.sh -d -s 1 -jm 1024 -tm 2048 -qu root.flink-queue-nm flink-cdc

2) 进入flink sql命令行

bin/sql-client.sh embedded -s flink-cdc

4. 同步数据

这里有一张mysql表:

CREATE TABLE `product_view` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`server_id` int(11) NOT NULL,
`duration` int(11) NOT NULL,
`times` varchar(11) NOT NULL,
`time` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `time` (`time`),
KEY `user_product` (`user_id`,`product_id`) USING BTREE,
KEY `times` (`times`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 样本数据
INSERT INTO `product_view` VALUES ('1', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('2', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('3', '1', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('4', '1', '1', '2', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('5', '8', '1', '1', '120', '120', '2020-05-14 13:14:00');
INSERT INTO `product_view` VALUES ('6', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
INSERT INTO `product_view` VALUES ('7', '8', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('8', '8', '1', '3', '120', '120', '2020-04-23 13:14:00');
INSERT INTO `product_view` VALUES ('9', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');

1) 创建数据表关联mysql

CREATE TABLE product_view_source (
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.2',
'port' = '3306',
'username' = 'bigdata',
'password' = 'bigdata',
'database-name' = 'test',
'table-name' = 'product_view'
);

这样,我们在flink sql client操作这个表相当于操作mysql里面的对应表。

2) 创建数据表关联elasticsearch

CREATE TABLE product_view_sink(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.2:9200',
'index' = 'product_view_index',
'username' = 'elastic',
'password' = 'elastic'
);

这样,es里面的product_view_index这个索引会被自动创建,如果想指定一些属性,可以提前手动创建好索引,我们操作表product_view_sink,往里面插入数据,可以发现es中已经有数据了。

3) 同步数据

建立同步任务,可以使用sql如下:

insert into product_view_sink select * from product_view_source;

这个时候是可以退出flink sql-client的,然后进入flink web-ui,可以看到mysql表数据已经同步到elasticsearch中了,对mysql进行插入删除更新,elasticsearch都是同步更新的。

相关推荐

小白初学linux之无法修改系统分辨率

/*此文是做为自己的一个总结还有就是最好也可以给大家提供一些帮助。*/时间:2020年7月14日11:28:41我安装的是Ubuntu20.04LTS,昨天处理的是,grub的引导问题,因为是...

Ubuntu 如何启动、停止或重启服务

在本文中,我们向您介绍在Ubuntu中启动、停止和重启服务的方法。列出Ubuntu中的所有服务在开始之前,先获取计算机上所有服务的列表,因为我们需要知道服务名称来管理服务。service--...

Win11学院:如何在Windows 11上使用WSL安装Ubuntu

IT之家2月18日消息,科技媒体pureinfotech昨日(2月17日)发布博文,介绍了3中简便的方法,让你轻松在Windows11系统中,使用WindowsSubs...

Linux安装中文输入法-Google拼音输入法,搜狗输入法

主要步骤,选择适合自己的尝试:1)卸载之前没装好的搜狗输入法。@:~/Downloads$sudoapt-getremovefcitx*删除依赖库@:~/Downloads$sudoap...

Ubuntu 22.04 请谨慎使用搜狗输入法,可能是你当机原因

在Ubunutu下没有什么有名的输入法,也就听说搜狗输入法有Linux版本,所以特意到官网去找了下载。在Ubuntu新版本里,他仍然用的是fcitx框架的输入引擎,而不是默认的ibus,所以要先把i...

前钢后胶!徐工XMR403VT小型压路机有点意思

【第一工程机械网原创】在越来越注重施工品质,对项目管理越来越精细化的今天,施工方在施工设备选择上,也越来越讲究设备的配套分工,因此小型压路机的应用场景也越来越多。徐工XMR403VT小型压路机高度集...

图大明白 | 404错误为什么是Not Found?为什么是404?

“404错误”大家都不陌生吧?常规来讲它长这样或者长这样艺术一点的长这样404NotFound意思就是所请求的页面不存在或者已被删除被称为“互联网最后一个界面”有很多同学发出疑问:为什么是404?...

Nginx负载均衡安全配置说明2(nginx负载均衡部署)

上一节,我们对Nginx安全配置的几个知识点做了一个说明,例如限制IP访问、文件目录禁止访问限制、需要防止DOS攻击、请求方法的限制和限制文件上传的大小这个进行了一个分析说明,详细的文章请关注我的头条...

惊艳写真系列第403期,本期主人公—叶青

惊艳写真系列第403期,本期主人公—叶青制作不易,欢迎各位看官提供宝贵意见。如果您喜欢记得关注,么么哒。您的每一份点赞和关注都是对作者的最大认可(图片素材均来源于网络,如有侵权联系删除。)本篇是写惊艳...

先秦布币之尖足布、圆足布、方足布,今年圆足最高拍卖价16万一枚

在战国魏、韩地区诞生桥足平首布、锐角平首布之后,赵也诞生了尖足平首布,并且在尖足布的基础上,后来相继派生出了圆足布、三孔布,以及类圆足布和类方足布。一尖足布尖足布是从耸肩尖足空首布演变而来的,是黄河...

403 禁止访问错误的全面排查与解决方案

当遇到403Forbidden错误时,意味着服务器已接收并理解请求,但拒绝执行访问操作。以下从用户端、服务器端等多个维度,提供分步排查与解决方法。一、用户端基础排查1.检查URL准确性确认...

这才是2019年夏最高颜值的泳装(2019夏季泳装秀)

最近的天气是越来越热了,又到了暑期泳衣勇闯海滩的时刻了,打开ins,微博满满地都是各大博主晒的泳装照,明星们也纷纷跑到海边去度假了。虽然我们没有超模般地身材,但是到了海边我们也要成为人群中最亮眼的那颗...

朋友圈爆火!这组《衡中班主任的一天》漫画,感动了无数人!

很多人觉得做老师很轻松他们说有些老师一天一节课就下班了有双休,还有寒暑假,真让人羡慕呀······但事实真是这样吗?最近衡水中学的赵心扬同学画了一组漫画形象地还原了衡中班主任一天的生活那么衡中班主任一...

国家安全教育 | 一组漫画,带你走进国家安全!

当前,我国面临哪些安全威胁?下面带你来看一组漫画!①你要配合,注意保密。我绝不对别人讲。②这件事,千万别对别人讲。③咱单位的…喂!老k!你要当心,有风声了!④你的泄密行为已触犯了国家法律!①请你协助了...

400、403、404、405,访问网页时出现这些代码是什么意思?

今天小泽访问一个页面时,出现了403,很抱歉,您的访问请求被禁止的提示。相信经常用电脑访问网页的朋友都遇到过这种情况,有的网页提示错误代码403,有的提示404,那这些代码都代表了什么呢?有什么含义呢...

取消回复欢迎 发表评论: