添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

重置debezium connector的offset

背景

服务在开发及上线后经常会碰到这样的情况:修改connector配置、刷新历史数据等,此时就需要重新执行snapshot,在此记录下目前在实践的几种方法

准备数据

  • 以MySQL为例,创建测试表,包含100条数据



  • 创建connector
curl --location --request POST 'localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{ 
    "name": "test_connector3", 
    "config": { 
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "snapshot.delay.ms": 1000, 
        "snapshot.fetch.size": 1000, 
        "database.hostname": "172.24.115.224", 
        "database.server.name": "binlog-app", 
        "database.port": "3306", 
        "database.user": "yuan", 
        "database.password": "yuan", 
        "table.include.list": "app.test_connector3", 
        "include.schema.changes": "false", 
        "decimal.handling.mode": "double", 
        "database.history.kafka.bootstrap.servers": "172.24.115.224:9092", 
        "database.history.kafka.topic": "app.history.schema", 
        "transforms.retopic.topic.regex": "(.*?)$", 
        "transforms": "retopic,rename,snapshotasinsert", 
        "transforms.rename.type": "org.apache.kafka.connect.transforms.RegexRouter", 
        "transforms.rename.replacement": "$1-$2-$3", 
        "transforms.snapshotasinsert.type": "io.debezium.connector.mysql.transforms.ReadToInsertEvent", 
        "transforms.retopic.topic.replacement": "$1", 
        "transforms.retopic.key.enforce.uniqueness": "false", 
        "transforms.retopic.type": "io.debezium.transforms.ByLogicalTableRouter", 
        "transforms.rename.regex": "([\\w-]*).([\\w-]*).([\\w-]*)" 
 
  • 验证Kafka,已经生成了100条数据



解决方案

方案1

将指定connector的offset置为NULL,kafka-connect-service服务的offset信息保存在kafka.connect.service.offset里,该信息是由offset.storage.topic配置

  • 获取指定connector(test_connector3)在各个partition里对应的offset信息
 kafkacat -b localhost:9092 -C -t kafka.connect.service.offset -E -e -f 'Partition(%p) %k %s\n'  | grep test_connector3 
 

注意:需要参数-e(Exit successfully when last message received),否则可能导致pipe的过程中grep获取不到所有的数据



由上可知:key: ["test_connector3",{"server":"binlog-app"}], partition:19

  • 删除connector
DELETE /connectors/{name} 

注意:不能使用PUT /connectors/{name}/pause、PUT /connectors/{name}/resume接口,resume的时候并不会重新读取offset

  • 删除connector offset
 echo '["test_connector3",{"server":"binlog-app"}]|' | \ 
> kafkacat -P -Z -b localhost:9092 -t kafka.connect.service.offset -K \| -p 19 
 

注意:kafkacat -P 会解析文件以生成数据,此处需要pipe,或者将文件作为参数,不能将string作为参数

  • 重新创建connector
POST /connectors 
  • 验证Kafka数据已经写入,并且snapshot重新执行,Kafka里消息总量为200





方案2

这是一种比较取巧的方式,不是将offset置为初始值,而是在当前的基础上,重新同步所有的历史数据

  • 执行SQL
update