添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

一、同步数据

Elasticsearch经常与关系型数据库协作,为业务提供搜索功能,而官方提供了 Logstash 中间件用于 数据同步

Elasticsearch关联关系型数据库 需要满足以下几个条件:

  • Elasticsearch 中的 _id字段 必须设置为 MySQL 中的 "id" 字段,用于保证es中间件与关系型数据库建立映射关系。
  • 如果MySQL更新了某些数据,那么这些操作会导致数据同步。但是,这个同步操作会覆盖掉ES中的整个 document ,而不是某些特定字段。
  • 当向 MySQL 中插入或更新数据时,建议设置 update-time字段 作为更新的标识,以便可以捕获这个更新。
  • 相关配置如下:

  • MySQL:8.0.24
  • Elasticsearch:7.15.2
  • Logstash:7.15.2:提供了ElasticSearch插件用于推送数据到ES中
  • 二、解决删除操作无法同步

    当Logstash使用ES插件推送MySQL数据时,只有 insert update 操作会进行同步,但是 delete 操作不会传播到Elasticsearch数据库中。

    解决方案:

  • 通过软删除处理。软删除的表一般会包含 is_deleted字段 ,可以在查询时对该字段进行过滤将相关文档排除在外。然后,通过定时任务从 MySQL 和 Elasticsearch 中删除该数据。
  • 通过业务处理删除。在删除时,设置事务将删除 同时传播到 MySQL、Elasticsearch中 。但是,事务错误时可能会导致数据不一致。
  • 软删除处理需要配置 logstash.conf 文件,例如:

    input {
    	jdbc {
    		# jdbc_driver_library => "mysql-connector-java-8.0.27.jar"
    		type => "jdbc"
    		jdbc_connection_string => "jdbc:mysql://cos-mysql:3306/cosimcloud?characterEncoding=UTF-8&autoReconnect=true"
    		jdbc_user => "root"
    		jdbc_password => "xxx"
    		jdbc_driver_class => "Java::com.mysql.cj.jdbc.Driver"
    		statement => "SELECT fid AS id, fis_deleted as deleted FROM t1"
    		connection_retry_attempts => "3"
    		jdbc_validate_connection => "true"
    		jdbc_validation_timeout => "600"
    		jdbc_paging_enabled => "true"
    		jdbc_page_size => "5000"
    		# 设置定时任务
    		schedule => "* * * * *"
    		# 基于字段进行增量更新
    		tracking_column => "fupdated_at"
    		tracking_column_type => "timestamp"
    		lowercase_column_names => false
    filter {
    	if [deleted] {
            mutate {
                add_field => {
                    "[@metadata][elasticsearch_action]" => "delete"
            mutate {
                remove_field => [ "deleted","@version","@timestamp" ]
        } else {
            mutate {
                add_field => {
                    "[@metadata][elasticsearch_action]" => "index"
            mutate {
                remove_field => [ "deleted","@version","@timestamp" ]
    output {
    	elasticsearch {
    		hosts => "es01:9200"
    		user => "elastic"
    		password => "elastic"
    		ecs_compatibility => disabled
    		manage_template => true
    		template_overwrite => true
    		template => "/usr/share/logstash/template/logstash-template.json"
    		template_name => "logstash-mysql"
    		index => "logstash-fmu"
    		pipeline => "fmu-tag"
    		action => "%{[@metadata][elasticsearch_action]}"
    		document_id => "%{id}"
    

    需要配置:

  • 在input-jdbc-statement 中,需要查询软删除字段deleted
  • 在filter中,判断deleted并且使用mutate处理数据
  • output中elasticsearch的action字段
  • 配置action导致template无法生成索引(待处理)
  • 三、相关文档

    如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

    分类:
    后端
    标签: