Mysql数据实时同步

go-mysql-elasticsearch

本节参考了:https://www.linuxhub.org/?p=4665;大部分内容是一样的,只是原文有些细节部分导致很多问题;全程下来遇到很多错误,这里填一下坑。

中间件:https://github.com/siddontang/go-mysql-elasticsearch
原理:go-mysql-elasticsearch数据同步,从MySQL同步到Elasticsearch,就是MySQL主从一样,只是从库是Elasticsearch。

Centos 7环境,内存最好大约4G。

Mysql安装配置

yum install mariadb mariadb-server -y
# 修改配置
vim /etc/my.cnf
[mysqld] 
log-bin=mysql-bin
binlog_format="ROW"
server-id=100

systemctl enable mariadb
systemctl start mariadb

修改密码:mysqladmin -u root -p password回车后输入随机密码(空密码直接回车),然后输入两次新密码,修改完成。

# 创建数据库表
库名: pets_db
表名: cats
字段: id,name

# 创建数据库  pets_db
create database pets_db;

# 创建数据表 cats
CREATE TABLE pets_db.cats (
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8;

# 插入数据
insert into pets_db.cats(name) values("波斯猫(Persian cat)是以阿富汗的土种长毛猫和土耳其的安哥拉长毛猫为基础,在英国经过100多年的选种繁殖,于1860年诞生的一个品种。");
insert into pets_db.cats(name) values("索马里猫,英文名Somali cat,俗名别名 索马利猫。原产地应该是在非洲。据说,索马里猫是1967年由纯种的阿比西尼亚猫突变产生 出来的长毛猫,经过有计划繁殖而形成的品种。");

# 创建用户
CREATE USER elasticsearch IDENTIFIED BY '123456';
 
# 访问授权(这里视情况授权数据库,全部数据库直接用root也行)
GRANT SELECT,RELOAD,SUPER,SHOW DATABASES,REPLICATION CLIENT,REPLICATION SLAVE ON *.* TO elasticsearch@localhost IDENTIFIED BY '123456';
GRANT SELECT,RELOAD,SUPER,SHOW DATABASES,REPLICATION CLIENT,REPLICATION SLAVE ON *.* TO elasticsearch@127.0.0.1 IDENTIFIED BY '123456';
FLUSH PRIVILEGES;

Elasticsearch安装配置

# java环境
yum install java -y

# java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)

# 安装Elasticsearch
cd ~
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.rpm
yum localinstall elasticsearch-6.2.4.rpm  -y
rm -rf elasticsearch-6.2.4.rpm
# ik中文分词插件(版本要和elasticsearch的版本对应,GitHub 链接为:https://github.com/medcl/elasticsearch-analysis-ik)
/usr/share/elasticsearch/bin/elasticsearch-plugin install \
https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.2.4/elasticsearch-analysis-ik-6.2.4.zip

# 修改配置
vim /etc/elasticsearch/elasticsearch.yml
cluster.name: WebWorm
node.name: node1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200

# 目录,权限问题会导致无法启动,看日志/var/log/elasticsearch
mkdir -p /data/elasticsearch
chown -R elasticsearch:elasticsearch /data/elasticsearch

systemctl enable elasticsearch
systemctl start elasticsearch
systemctl status elasticsearch

netstat -antlp |egrep "9200|9300"

# 注意防火墙放行端口
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --reload

################################################################
# Go语言环境
mkdir -p /data/app/golang
cd ~
wget --no-check-certificate https://dl.google.com/go/go1.10.2.linux-amd64.tar.gz
tar -C /usr/local -xzf /data/down/go1.10.2.linux-amd64.tar.gz

echo "export GOROOT=/usr/local/go" >> /etc/profile
echo "export GOBIN=\$GOROOT/bin" >> /etc/profile
echo "export PATH=\$PATH:\$GOBIN" >> /etc/profile
echo "export GOPATH=/data/app/golang" >> /etc/profile
source /etc/profile

go version

# 包管理工具godep
#下载源码
go get github.com/tools/godep
#源码文件目录
ls /data/app/golang/src/github.com/tools/godep/


# 插件go-mysql-elastisearch
#下载源码(这一步会报no go file in...等,不用理会。)
go get github.com/siddontang/go-mysql-elasticsearch
#源码文件目录
ls /data/app/golang/src/github.com/siddontang/go-mysql-elasticsearch

#编译源码
cd /data/app/golang/src/github.com/siddontang/go-mysql-elasticsearch
make

# 规范应用程序go-mysql-elastisearch
mkdir /usr/local/mysql-elasticsearch       #应用程序目录
cd /data/app/golang/src/github.com/siddontang/go-mysql-elasticsearch
cp -r bin /usr/local/mysql-elasticsearch/  # bin文件目录
cp -r etc /usr/local/mysql-elasticsearch/  # 配置文件目录
# var(数据文件目录)文件夹是没有的,启动时会自动创建。

# 下载服务管理脚本
wget -O /etc/init.d/mysql-elasticsearch  https://raw.githubusercontent.com/linuxhub/script/master/shell/mysql-elasticsearch
chmod a+x /etc/init.d/mysql-elasticsearch 
chkconfig mysql-elasticsearch on

# 配置go-mysql-elastisearch
vim /usr/local/mysql-elasticsearch/etc/river.toml

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_user = "elasticsearch"
my_pass = "123456"
my_charset = "utf8"

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "127.0.0.1:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing. 
# TODO: support other storage, like etcd. 
data_dir = "/usr/local/mysql-elasticsearch/var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"

# pseudo server id like a slave 
server_id = 1001

# mysql or mariadb
flavor = "mysql"

# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"

# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false

# minimal items to be inserted in one bulk
bulk_size = 128

# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"

# Ignore table without primary key
skip_no_pk_table = false

# MySQL data source
# mysql数据源
[[source]]
schema = "pets_db"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
# I don't think it is necessary to sync all tables in a database.
tables = ["cats"]

# Below is for special rule mapping

# Very simple example
# 
# desc t;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type         | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id    | int(11)      | NO   | PRI | NULL    |       |
# | name  | varchar(256) | YES  |     | NULL    |       |
# +-------+--------------+------+-----+---------+-------+
# 
# The table `t` will be synced to ES index `test` and type `t`.
# 对应的数据库设置,rule规则有多个,不用的注释掉,否则会启动不了
[[rule]]
schema = "pets_db"
table = "cats"
index = "name_index"
type = "name_t"

# Wildcard table rule, the wildcard table must be in source tables 
# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
# In this example, all tables must have same schema with above table `t`;
#[[rule]]
#schema = "test"
#table = "t_[0-9]{4}"
#index = "test"
#type = "t"

# Simple field rule 
#
# desc tfield;
# +----------+--------------+------+-----+---------+-------+
# | Field    | Type         | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id       | int(11)      | NO   | PRI | NULL    |       |
# | tags     | varchar(256) | YES  |     | NULL    |       |
# | keywords | varchar(256) | YES  |     | NULL    |       |
# +----------+--------------+------+-----+---------+-------+
#
#[[rule]]
#schema = "test"
#table = "tfield"
#index = "test"
#type = "tfield"

[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

# Filter rule 
#
# desc tfilter;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type         | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id    | int(11)      | NO   | PRI | NULL    |       |
# | c1    | int(11)      | YES  |     | 0       |       |
# | c2    | int(11)      | YES  |     | 0       |       |
# | name  | varchar(256) | YES  |     | NULL    |       |
# +-------+--------------+------+-----+---------+-------+
#
#[[rule]]
#schema = "test"
#table = "tfilter"
#index = "test"
#type = "tfilter"

# Only sync following columns
#filter = ["id", "name"]

# id rule
#
# desc tid_[0-9]{4};
# +----------+--------------+------+-----+---------+-------+
# | Field    | Type         | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id       | int(11)      | NO   | PRI | NULL    |       |
# | tag      | varchar(256) | YES  |     | NULL    |       |
# | desc     | varchar(256) | YES  |     | NULL    |       |
# +----------+--------------+------+-----+---------+-------+
#
#[[rule]]
#schema = "test"
#table = "tid_[0-9]{4}"
#index = "test"
#type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK 
#id = ["id", "tag"]




# 启用停止服务
手动启动(排错):/usr/local/mysql-elasticsearch/bin/go-mysql-elasticsearch -config /usr/local/mysql-elasticsearch/etc/river.toml

service mysql-elasticsearch start
service mysql-elasticsearch status

ES搜索测试

# 搜索 波斯猫
curl -H "Content-type: application/json" -X POST http://localhost:9200/name_index/_search?pretty -d'{"query": { "match_phrase": { "name": "波斯猫" } } }'

# 搜索 索马里猫
curl -H "Content-type: application/json" -X POST http://localhost:9200/name_index/_search?pretty -d'{"query": { "match_phrase": { "name": "索马里猫" } } }'

# 搜索 猫
curl -H "Content-type: application/json" -X POST http://localhost:9200/name_index/_search?pretty -d'{"query": { "match_phrase": { "name": "猫" } } }'


# 更新数据再搜索;把波斯猫替换成狸花猫
# 登录数据库
mysql -uelasticsearch -p123456
update pets_db.cats set name="狸花猫的原产地是中国,属于自然猫,是在千百年中经过许多品种的自然淘汰而保留下来的品种。" where id=101;

curl -H "Content-type: application/json" -X POST http://localhost:9200/name_index/_search?pretty -d'{"query": { "match_phrase": { "name": "狸花猫" } } }'


# 删除数据再搜索,删除狸花猫
mysql -uelasticsearch -p123456
delete from pets_db.cats where id=101;

测试完成,Eelasticsearch与Mysql正常工作。

Maxwell

https://github.com/zendesk/maxwell


xBifrost

同步 MySQL 数据的异构中间件 xBifrost:https://www.oschina.net/p/xbifrost?utm_source=wechat&utm_medium=zaobao


参考资料:
使用go-mysql-elasticsearch同步mysql数据库信息到ElasticSearch:https://yq.aliyun.com/articles/705377?from=groupmessage&isappinstalled=0



MongoDB数据实时同步

mongo-connector

https://blog.csdn.net/xue632777974/article/details/78228209



Redis数据实时同步

目前我还没找到合适的中间件



SQL数据实时同步

https://www.elastic.co/cn/products/stack/elasticsearch-sql
https://blog.csdn.net/qq_21383435/article/details/79417527


通用数据迁移工具

可以导入、导出、在两台elasticsearch间转移数据。
https://github.com/medcl/esm-abandoned

wget https://github.com/medcl/esm-abandoned/releases/download/v0.4.1/linux64.tar.gz
tar zxvf linux64.tar.gz
cd bin/linux64

# 直接从ES到ES迁移
./esm -s http://[源IP]:9200 -x "[源index]" -y "[目标index]" -d http://[目标IP]:9200 -m elastic:[密码] -n elastic:[密码] -c 10000

# 源账户验证
-m [账户]:[密码] 

# 目标账户验证
-n [账户]:[密码]

# 从ES到bin文件到目标ES
# 导出数据到文件
./esm -s http://[源IP]:9200 -x "[源index]"  -m elastic:[密码] -c 5000 -o=dump.bin

# 从文件导入数据
./esm -d http://[目标IP]:9200 -y "[目标index]" -c 5000 -b 5 --refresh -i=dump.bin

支持多种数据库

kkbinlog

数据变动监听分发工具 kkbinlog(mysql、mongodb) :https://www.oschina.net/p/kkbinlog?utm_source=wechat&utm_medium=zaobao

文章作者: Leo
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 LeoLan的小站
数据处理分析 ELK Mysql与Elasticsearch数据实时同步 go-mysql-elasticsearch Elasticsearch mongo-connector
喜欢就支持一下吧