MySQL数据实时增量同步到Elasticsearch
nanshan 2025-07-08 21:43 4 浏览 0 评论
Mysql到Elasticsearch的数据同步,一般用ETL来实现,但性能并不理想,目前大部分的ETL是定时查询Mysql数据库有没有新增数据或者修改数据,如果数据量小影响不大,但如果几百万上千万的数据量性能就明显的下降很多,本文是使用Go实现的go-mysql-transfer中间件来实时监控Mysql的Binlog日志,然后同步到Elasticsearch,从实时性、性能效果都不错。
一、go-mysql-transfer
go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具。能够实时监听MySQL二进制日志(binlog)的变动,将变更内容形成指定格式的消息,发送到接收端。在数据库和接收端之间形成一个高性能、低延迟的增量数据(Binlog)同步管道, 具有如下特点:
1、不依赖其它组件,一键部署
2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用
3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑
4、支持监控告警,集成Prometheus客户端
5、高可用集群部署
6、数据同步失败重试
7、全量数据初始化
详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解
项目开源地址:go-mysql-transfer
二、配置
# app.yml
target: elasticsearch #目标类型
#elasticsearch连接配置
es_addrs: 127.0.0.1:9200 #连接地址,多个用逗号分隔
es_version: 7 # Elasticsearch版本,支持6和7、默认为7
#es_password: # 用户名
#es_version: # 密码
三、数据转换规则
相关配置如下:
rule:
-
schema: eseap #数据库名称
table: t_user #表名称
#order_by_column: id #排序字段,存量数据同步时不能为空
#column_lower_case: true #列名称转为小写,默认为false
#column_upper_case:false#列名称转为大写,默认为false
column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
# 包含的列,多值逗号分隔,如:id,name,age,area_id 为空时表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id 默认为空
#default_column_values: area_name=合肥 #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss
#Elasticsearch相关
es_index: user_index #Index名称,可以为空,默认使用表(Table)名称
#es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导
# -
# column: REMARK #数据库列名称
# field: remark #映射后的ES字段名称
# type: text #ES字段类型
# analyzer: ik_smart #ES分词器,type为text此项有意义
# #format: #日期格式,type为date此项有意义
# -
# column: USER_NAME #数据库列名称
# field: account #映射后的ES字段名称
# type: keyword #ES字段类型
示例一
t_user表,数据如下:
自动创建的Mapping,如下:
同步到Elasticsearch的数据如下:
示例二
t_user表,同实例一
使用如下配置:
rule:
-
schema: eseap #数据库名称
table: t_user #表名称
order_by_column: id #排序字段,存量数据同步时不能为空
column_lower_case: true #列名称转为小写,默认为false
#column_upper_case:false#列名称转为大写,默认为false
#column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
# 包含的列,多值逗号分隔,如:id,name,age,area_id 为空时表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id 默认为空
default_column_values: area_name=合肥 #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss
#Elasticsearch相关
es_index: user_index #Index名称,可以为空,默认使用表(Table)名称
es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导
-
column: REMARK #数据库列名称
field: remark #映射后的ES字段名称
type: text #ES字段类型
analyzer: ik_smart #ES分词器,type为text此项有意义
#format: #日期格式,type为date此项有意义
-
column: USER_NAME #数据库列名称
field: account #映射后的ES字段名称
type: keyword #ES字段类型
es_mappings 定义索引的mappings(映射关系),不定义es_mappings则使用列类型自动创建索引的mappings(映射关系)。
自动创建的Mapping,如下:
同步到Elasticsearch的数据如下:
四、Lua脚本
使用Lua脚本可以实现更复杂的数据处理逻辑,go-mysql-transfer支持Lua5.1语法。
示例一
t_user表,数据如下:
引入Lua脚本:
#规则配置
rule:
-
schema: eseap #数据库名称
table: t_user #表名称
order_by_column: id #排序字段,存量数据同步时不能为空
lua_file_path: lua/t_user_es.lua #lua脚本文件
es_index: user_index #Elasticsearch Index名称,可以为空,默认使用表(Table)名称
es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导
-
field: id #映射后的ES字段名称
type: keyword #ES字段类型
-
field: userName #映射后的ES字段名称
type: keyword #ES字段类型
-
field: password #映射后的ES字段名称
type: keyword #ES字段类型
-
field: createTime #映射后的ES字段名称
type: date #ES字段类型
format: yyyy-MM-dd HH:mm:ss #日期格式,type为date此项有意义
-
field: remark #映射后的ES字段名称
type: text #ES字段类型
analyzer: ik_smart #ES分词器,type为text此项有意义
-
field: source #映射后的ES字段名称
type: keyword #ES字段类型
es_mappings 定义索引的mappings(映射关系),不定义es_mappings则根据字段的值自动创建mappings(映射关系)。根据es_mappings 生成的mappings如下:
user_index索引mappings
Lua脚本:
local ops = require("esOps") --加载elasticsearch操作模块
local row = ops.rawRow() --当前数据库的一行数据,table类型,key为列名称
local action = ops.rawAction() --当前数据库事件,包括:insert、update、delete
local id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local password = row["PASSWORD"] --获取USER_NAME列的值
local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
local remark = row["REMARK"] --获取REMARK列的值
local result = {} -- 定义一个table,作为结果集
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["remark"] = remark
result["source"] = "binlog" -- 数据来源
if action == "insert" then -- 只监听新增事件
ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串
end
同步到Elasticsearch的数据如下:
示例二
t_user表,同实例一
引入Lua脚本:
schema: eseap #数据库名称
table: t_user #表名称
lua_file_path: lua/t_user_es2.lua #lua脚本文件
未明确定义index名称、mappings,es会根据值自动创建一个名为t_user的index。
使用如下脚本:
local ops = require("esOps") --加载elasticsearch操作模块
local row = ops.rawRow() --当前数据库的一行数据,table类型,key为列名称
local action = ops.rawAction() --当前数据库事件,包括:insert、update、delete
local id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local password = row["PASSWORD"] --获取USER_NAME列的值
local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
local result = {} -- 定义一个table,作为结果集
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["remark"] = remark
result["source"] = "binlog" -- 数据来源
if action == "insert" then -- 只监听新增事件
ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串
end
同步到Elasticsearch的数据如下:
esOps模块提供的方法如下:
- INSERT: 插入操作,如:ops.INSERT(index,id,result)。参数index为索引名称,字符串类型;参数index为要插入数据的主键;参数result为要插入的数据,可以为table类型或者json字符串
- UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。参数index为索引名称,字符串类型;参数index为要修改数据的主键;参数result为要修改的数据,可以为table类型或者json字符串
- DELETE: 删除操作,如:ops.DELETE(index,id)。参数index为索引名称,字符串类型;参数id为要删除的数据主键,类型不限;
文章来源:
https://www.jianshu.com/p/5a9b6c4f318c
相关推荐
- 删库之后不要着急跑路,教你神不知鬼不觉找回数据
-
在工作中,我们误删数据或者数据库,我们一定需要跑路吗?我看未必,程序员一定要学会自救,神不知鬼不觉的将数据找回。在mysql数据库中,我们知道binlog日志记录了我们对数据库的所有操作,所以...
- 数据库告警不可用,增删改受阻(数据库限制删除)
-
前言:昨晚,突然出现服务不可用告警,查看日志上线报文入库到数据库很慢并受阻,出现数据不同步问题。排查问题查看发现服务都是在执行update、insert这些DML命令的时候,报的数据库执行超时。经过一...
- Binlog实现MySQL复制,5个关键步骤,务必掌握!
-
复制是MySQL最重要的功能之一,MySQL集群的高可用、负载均衡和读写分离都是基于复制来实现的。Binlog就是实现主从复制的关键,主数据库将修改操作记录到Binlog中,从数据库通过解...
- MySQL数据实时增量同步到Elasticsearch
-
Mysql到Elasticsearch的数据同步,一般用ETL来实现,但性能并不理想,目前大部分的ETL是定时查询Mysql数据库有没有新增数据或者修改数据,如果数据量小影响不大,但如果几百万上千万的...
- MySQL 数据库恢复:如何执行时间点恢复(PITR)以挽救受损数据?
-
天津鸿萌科贸发展有限公司从事数据安全服务二十余年,致力于为各领域客户提供专业的数据恢复、数据备份、数据取证、数据迁移、网络安全、数据清除等解决方案,并针对企业面临的数据安全风险,提供专业的相关数据安全...
- 阿里面试:MySQL Binlog有哪些格式?底层原理?优缺点?
-
binlog的格式也有三种:STATEMENT、ROW、MIXED,下面我详解binlog三种模式@mikechenStatement模式Statement模式:是基于SQL语句的复制(statem...
- 快速带你读懂MySQL的binlog写入机制
-
深入讲解MySQL中的重要日志binlog的写入机制以及影响IO性能的关键配置,并且介绍了如何利用binlog去恢复数据,保证MySQL的可靠性。Q:binlog写入时机binlog的写入逻辑并...
- MySQL 误删除数据恢复全攻略:基于 Binlog 的实战指南
-
在MySQL的世界里,二进制日志(Binlog)就是我们的"时光机"。它默默记录着数据库的每一个重要变更,就像一位忠实的史官,为我们在数据灾难中提供最后的救命稻草。本文将带您深入掌握如...
- 一文了解MySQL Binlog(一文了解肝脏有益和有害的食物)
-
MySQL的Binlog日志是一种二进制格式的日志,Binlog记录所有的DDL和DML语句(除了数据查询语句SELECT、SHOW等),以Event的形式记录,同时记录语句执行时...
- 数据丢失?别慌!MySQL备份恢复攻略
-
想象一下,某个晴朗的午后,你正享受着咖啡,突然接到紧急电话:你的网站或APP彻底挂了!系统崩溃,界面全白。虽然心头一紧,但你或许还能安慰自己:系统崩溃只是暂停服务,数据还在,修复修复就好了。然而,如果...
- Mysql中的bin log、redo log、undo log的区别
-
最近在整理面试题,在看mvcc的时候看到了undolog,今天索性把这三个log都记录一遍。MySQL的逻辑架构说之前先说一下MySQL的基本架构,MySQL主要分为两层:Server层和存储引...
- binlog日志定时清理(binlog清理规则)
-
binlog日志binlog是MySQL数据库的一种日志文件,用于记录所有对数据的修改操作。binlog全称为binarylog,它以二进制格式记录MySQL服务器上所有的修改操作,包括对哪个数据库...
- 茶水间炸锅了!菜鸟误删用户表,运维老张的MySQL救命三招!
-
(公司茶水间,运维老张、开发小王和新人小李围着咖啡机)小李:(紧张兮兮)张哥!我...我好像把测试库的用户表删了!下午演示咋办啊?老张:(淡定喝咖啡)慌啥?昨晚的备份是吃干饭的?走,教你恢复!一、基础...
- 解决运维痛点,提高运维安全性-雷池 SafeLine WAF新功能身份认证
-
雷池介绍使用雷池SafeLineWAF已经两年多了,在1.5.x版本时就已经开始测试使用,并在推出LTS版本后转入LTS分支。近期雷池SafeLineWAF重点更新了身份认证功能,并提供了SS...
- 【Docker 新手入门指南】第十五章:常见故障排除
-
一、前期准备:收集关键信息在排查问题前,建议先获取以下系统数据,便于精准定位故障:1.系统基础信息#查看Docker版本(确认是否为最新稳定版)dockerversion#查看...
你 发表评论:
欢迎- 一周热门
-
-
极空间如何无损移机,新Z4 Pro又有哪些升级?极空间Z4 Pro深度体验
-
UOS服务器操作系统防火墙设置(uos20关闭防火墙)
-
如何修复用户配置文件服务在 WINDOWS 上登录失败的问题
-
手机如何设置与显示准确时间的详细指南
-
如何在安装前及安装后修改黑群晖的Mac地址和Sn系列号
-
日本海上自卫队的军衔制度(日本海上自卫队的军衔制度是什么)
-
爱折腾的特斯拉车主必看!手把手教你TESLAMATE的备份和恢复
-
10个免费文件中转服务站,分享文件简单方便,你知道几个?
-
NAS:DS video/DS file/DS photo等群晖移动端APP远程访问的教程
-
FANUC 0i-TF数据备份方法(fanuc系统备份教程)
-
- 最近发表
- 标签列表
-
- linux 查询端口号 (58)
- docker映射容器目录到宿主机 (66)
- 杀端口 (60)
- yum更换阿里源 (62)
- internet explorer 增强的安全配置已启用 (65)
- linux自动挂载 (56)
- 禁用selinux (55)
- sysv-rc-conf (69)
- ubuntu防火墙状态查看 (64)
- windows server 2022激活密钥 (56)
- 无法与服务器建立安全连接是什么意思 (74)
- 443/80端口被占用怎么解决 (56)
- ping无法访问目标主机怎么解决 (58)
- fdatasync (59)
- 405 not allowed (56)
- 免备案虚拟主机zxhost (55)
- linux根据pid查看进程 (60)
- dhcp工具 (62)
- mysql 1045 (57)
- 宝塔远程工具 (56)
- ssh服务器拒绝了密码 请再试一次 (56)
- ubuntu卸载docker (56)
- linux查看nginx状态 (63)
- tomcat 乱码 (76)
- 2008r2激活序列号 (65)