注意事项:
MySQL驱动器jar包需要指定, 要和MySQL的版本兼容
logstash的同步不是实时同步的, 使用同步可以使用Canel, Flink CDC监听Binlog的方式, 可以参考博主的SpringBoot集成Flink-CDC
存量更新和存量更新配置根据自己的需求开启
相关位置改成自己, 比如mysql, sql存放位置
启动logstash的时候需要指定logstash-mysql-es.conf配置启动
我的配置存放logstash安装目录的config中, 进入bin目录, 执行cmd, 输入启命令为logstash -f …\config\logstash-mysql-es.conf
# 输入源 input { jdbc { # 数据库连接参数 jdbc_connection_string => "jdbc:mysql://192.168.132.10:3306/whitebrocade?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai" # mysql用户名 jdbc_user => "root" # mysql密码 jdbc_password => "12345678" # mysql驱动器jar包, 我这里用的是5.7的MySQL jdbc_driver_library => "E:/software/Maven/repository/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 开启分页 jdbc_paging_enabled => "true" # 最大页码 jdbc_page_size => "1000" # 这个type可以用来做多个输入源和多个输出源区分 这里举例所以只有一个 type => "user_DTS" # 用于同步的查询sql statement_filepath => "E:/software/Logstash/test_data/userDTS.sql" # 直接书写sql语句 # statement => "select * from user" # 加上jdbc时区, 要不然logstash的时间会不准确 jdbc_default_timezone => "Asia/Shanghai" # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false) lowercase_column_names => "false" # 处理中文乱码问题 codec => plain { charset => "UTF-8" } # 日志级别 sql_log_level => warn # 如果需要增量更新的话,则需要在input/jdbc下添加如下配置 # 如果要使用其它字段追踪, 而不是用时间开启这个配置 use_column_value => true # 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年 tracking_column_type => "timestamp" # 追踪的字段 这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写 因为不配置为false的话默认是true 查询结果字段默认会变成全小写 tracking_column => "updateDate" # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中; record_last_run => true # 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 这个就是增量数据同步的关键 last_run_metadata_path => "E:/software/Logstash/test_data/userDTS_last_value.txt" # 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录 clean_run => false # 设置监听间隔 各字段含义秒(由左至右)分、时、天、月、年 # 这里配置为每15秒扫描一次MySQL schedule => "*/15 * * * * *" } } # 过滤 filter { mutate { # 去掉没用字段, 注意了type不要去除掉, 这里需要根据type判断是否同步到ES remove_field => ["@version", "@timestamp"] } } # 输出源 output { # 判断类型是否位user_DTS(即是否为user表相关的, 如果是就同步到ES中) if[type] == "user_DTS"{ elasticsearch { # elasticsearch url hosts => ["http://localhost:9200"] # 用户名&密码 # user => "whitebrocade" # password => "whitebrocade" # 下面两个参数可以开启更新模式 action => "update" doc_as_upsert => true # 索引名 index => "user" # 文档id 设置成数据库的id document_id => "%{id}" } } stdout { # 以json格式输出到控制台, 用于调试使用, 正式运行时可以注释掉 codec => json_lines } }
PUT /user { "settings": { "number_of_shards": 3, "number_of_replicas": 2 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text" }, "createDate": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis" }, "updateDate": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis" } } } }