一、同步数据
Elasticsearch经常与关系型数据库协作,为业务提供搜索功能,而官方提供了
Logstash
中间件用于
数据同步
。
Elasticsearch关联关系型数据库 需要满足以下几个条件:
document
,而不是某些特定字段。
update-time字段
作为更新的标识,以便可以捕获这个更新。
相关配置如下:
二、解决删除操作无法同步
当Logstash使用ES插件推送MySQL数据时,只有
insert
、
update
操作会进行同步,但是
delete
操作不会传播到Elasticsearch数据库中。
解决方案:
is_deleted字段
,可以在查询时对该字段进行过滤将相关文档排除在外。然后,通过定时任务从 MySQL 和 Elasticsearch 中删除该数据。
软删除处理需要配置
logstash.conf
文件,例如:
input {
jdbc {
# jdbc_driver_library => "mysql-connector-java-8.0.27.jar"
type => "jdbc"
jdbc_connection_string => "jdbc:mysql://cos-mysql:3306/cosimcloud?characterEncoding=UTF-8&autoReconnect=true"
jdbc_user => "root"
jdbc_password => "xxx"
jdbc_driver_class => "Java::com.mysql.cj.jdbc.Driver"
statement => "SELECT fid AS id, fis_deleted as deleted FROM t1"
connection_retry_attempts => "3"
jdbc_validate_connection => "true"
jdbc_validation_timeout => "600"
jdbc_paging_enabled => "true"
jdbc_page_size => "5000"
# 设置定时任务
schedule => "* * * * *"
# 基于字段进行增量更新
tracking_column => "fupdated_at"
tracking_column_type => "timestamp"
lowercase_column_names => false
filter {
if [deleted] {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "delete"
mutate {
remove_field => [ "deleted","@version","@timestamp" ]
} else {
mutate {
add_field => {
"[@metadata][elasticsearch_action]" => "index"
mutate {
remove_field => [ "deleted","@version","@timestamp" ]
output {
elasticsearch {
hosts => "es01:9200"
user => "elastic"
password => "elastic"
ecs_compatibility => disabled
manage_template => true
template_overwrite => true
template => "/usr/share/logstash/template/logstash-template.json"
template_name => "logstash-mysql"
index => "logstash-fmu"
pipeline => "fmu-tag"
action => "%{[@metadata][elasticsearch_action]}"
document_id => "%{id}"
需要配置:
在input-jdbc-statement 中,需要查询软删除字段deleted
在filter中,判断deleted并且使用mutate处理数据
output中elasticsearch的action字段
配置action导致template无法生成索引(待处理)
三、相关文档