首页 Order Logstash 正文

Logstash 同步MySQL到ES

金鹏头像 金鹏 Logstash 2025-04-08 21:04:50 0 76
导读:logstash-mysql-es.conf注意事项:    MySQL驱动器jar包需要指定,要和MySQL的版本兼容 &nb...

logstash-mysql-es.conf

注意事项:

    MySQL驱动器jar包需要指定, 要和MySQL的版本兼容

    logstash的同步不是实时同步的, 使用同步可以使用Canel, Flink CDC监听Binlog的方式, 可以参考博主的SpringBoot集成Flink-CDC

    存量更新和存量更新配置根据自己的需求开启

    相关位置改成自己, 比如mysql, sql存放位置

    启动logstash的时候需要指定logstash-mysql-es.conf配置启动

    我的配置存放logstash安装目录的config中, 进入bin目录, 执行cmd, 输入启命令为logstash -f …\config\logstash-mysql-es.conf


# 输入源
input {
   jdbc { 
		# 数据库连接参数
		jdbc_connection_string => "jdbc:mysql://192.168.132.10:3306/whitebrocade?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai"
		# mysql用户名
		jdbc_user => "root"
		# mysql密码
		jdbc_password => "12345678"
		
		# mysql驱动器jar包, 我这里用的是5.7的MySQL
		jdbc_driver_library => "E:/software/Maven/repository/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar"
		# 驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		# 开启分页
		jdbc_paging_enabled => "true"
		# 最大页码
		jdbc_page_size => "1000"
		
		# 这个type可以用来做多个输入源和多个输出源区分  这里举例所以只有一个
		type => "user_DTS" 
		# 用于同步的查询sql
		statement_filepath => "E:/software/Logstash/test_data/userDTS.sql"
		# 直接书写sql语句
		# statement => "select * from user"
		# 加上jdbc时区, 要不然logstash的时间会不准确
		jdbc_default_timezone => "Asia/Shanghai"
		# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
		lowercase_column_names => "false"
		 # 处理中文乱码问题
		codec => plain { charset => "UTF-8" }
		
		# 日志级别
		sql_log_level => warn
		
		# 如果需要增量更新的话,则需要在input/jdbc下添加如下配置
		# 如果要使用其它字段追踪, 而不是用时间开启这个配置
		use_column_value => true
		# 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年
		tracking_column_type => "timestamp"
		# 追踪的字段  这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写  因为不配置为false的话默认是true  查询结果字段默认会变成全小写
		tracking_column => "updateDate"
		# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
		record_last_run => true
		# 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这个就是增量数据同步的关键
		last_run_metadata_path => "E:/software/Logstash/test_data/userDTS_last_value.txt"
		# 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录
		clean_run => false
		# 设置监听间隔  各字段含义秒(由左至右)分、时、天、月、年
		# 这里配置为每15秒扫描一次MySQL
		schedule => "*/15 * * * * *"

    }
}

# 过滤
filter {
	mutate {
		# 去掉没用字段, 注意了type不要去除掉, 这里需要根据type判断是否同步到ES
		remove_field  => ["@version", "@timestamp"]
	}
}

# 输出源
output {
	# 判断类型是否位user_DTS(即是否为user表相关的, 如果是就同步到ES中)
    if[type] == "user_DTS"{
		elasticsearch {
			# elasticsearch url
			hosts => ["http://localhost:9200"]
			# 用户名&密码
			# user => "whitebrocade"
			# password => "whitebrocade"
			# 下面两个参数可以开启更新模式
			action => "update"
			doc_as_upsert => true
			# 索引名
			index => "user"
			# 文档id 设置成数据库的id
			document_id => "%{id}"
       }
    }
    stdout {
        # 以json格式输出到控制台, 用于调试使用, 正式运行时可以注释掉
        codec => json_lines
    }
}


ES创建user索引

Logstash同步的时候, 并不会创建user的索引, 所以这里我们得自己手动创建一下

推荐一个ES的客户端工具,es-clint, 下边使用这工具创建

PUT /user
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text"
      },
      "createDate": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
      },
      "updateDate": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
      }
    }
  }
}







本文地址:https://www.jinpeng.work/?id=232
若非特殊说明,文章均属本站原创,转载请注明原链接。
广告3

相关推荐

欢迎 发表评论:

  • 请填写验证码

日历

«    2025年4月    »
123456
78910111213
14151617181920
21222324252627
282930

控制面板

您好,欢迎到访网站!
  查看权限
广告2

退出请按Esc键