Logstash 同步 MySQL 数据到 Elastic Search

环境准备

  1. 下载 Logstash 并解压
  2. 下载 Elastic Search 并启动,参见 Elastic Search 启用
  3. 需要下载 MySQL Java Connector 到一个特定目录,路径后续会用到

启动

  • 需要提前启动 Elastic Search 和 MySQL

    • 因为 MySQL 服务一直挂在后台,所以其实可以不用管
    • 但 Elastic Search 需要提前启动
  • 其实可以直接启动,但是会报 pipelines.yml 找不到工作流的错误

    # 省略一些 INFO 日志,来到报错行
    ERROR: Pipelines YAML file is empty. Location: <path>/pipelines.yml
    
  • 所以下一步配置工作流

配置 pipelines.yml

- pipeline.id: test
  path.config: "config/mysql.conf"
  • 将给出的例子取消注释
    • id 随便取名,只适用于区分工作流的名字
    • 指明该 config 的位置
  • 注意,此处的 config 的相对路径以 logstash-<version> 为根目录,所以需要加上 config 路径
    • 即直接在 pipelines.yml 所在目录下新建 mysql.conf

配置 mysql.conf

  • 观察到同级目录下还有一个 logstash-sample.conf
    • 此为配置的模板文件,复制粘贴其格式即可
  • 配置自己所需要的 mysql.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  stdin {}
  jdbc {
    # 区别 jdbc 的类型,在下面 output 的时候做判断
    type => "user"
    # 数据库连接地址,数据库需要修改
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/<database>?characterEncoding=UTF-8"
    # 数据库账号密码
    jdbc_user => "root"
    jdbc_password => "password"
    # MySQL Connector 依赖包路径,相对路径没成功,所以这里笔者用的绝对路径
    jdbc_driver_library => "<path_to_connector>"
    # Driver Class 名字,跟普通 JDBC 相同
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # 跟 statement_filepath 二选一配置,说明在下方
    statement => "SELECT * FROM user"
    ###################### 以下配置选择性添加 #############################
    # 数据库重连尝试次数
    connection_retry_attempts => "3"
    # 判断数据库连接是否可用,默认 false
    jdbc_validate_connection => "true"
    # 数据库连接可用校验超时时间
    jdbc_validation_timeout => "3600"
    # 开启分页查询,默认 false
    jdbc_paging_enabled => "true"
    # 单次分页查询条数,默认 100000,若字段较多且更新频率较高,建议调低
    jdbc_page_size => "500"
    # 是否将字段名转换为小写,默认 true(如果有数据序列化、反序列化需求,建议改为 false)
    lowercase_column_names => false
    # SQL 日志级别,默认 info
    sql_log_level => warn
  }
}

output {
  if [type] == "user" {
    elasticsearch {
      hosts => ["http://localhost:9200"]
      index => "user"
      # 数据唯一索引,建议使用数据库的主键
      # 此处 id 更改为该数据库表的主键名
      document_id => "%{id}"
    }
  }
}

  • 说明
    • 现根据 jdbc 配置连接数据库,所以 username, password, driver class, driver library 需要正确
    • 然后执行设置的 statement,或者 statement_filepath 里面的 SQL 语句
    • 将执行结果输出到 logstash 进行下一步 filter 清洗(此处没有设置 filter)
    • 最后根据 jdbc 设置的 type 选择性输出到某一数据库

再启动

  • 再启动即可看到数据导入的日志:
[2023-03-10T22:17:18,147][WARN ][logstash.inputs.jdbc     ][test][36d9eb008791f9c7d7369939a04c776115306dd4587ac6b11cef2da0d3147fda] (0.018373s) SELECT * FROM (SELECT * FROM user) AS `t1` LIMIT 500 OFFSET 0
  • 根据这些日志也可以反推出流程(特别是里面的子查询,很显眼)

检查数据

  • 如果不放心可以进 kibana 或者 发送一个请求 查看结果

  • # 建议 MySQL 的每一张表都独立成一个 index,所以需要替换
    # 对于上面的例子,替换成 user 即可
    curl GET http://localhost:9200/<index>/_search
    
  • 如果 total 里的条数和 MySQL 当中数据条数一致,就已经成功了