重置debezium connector的offset
背景
服务在开发及上线后经常会碰到这样的情况:修改connector配置、刷新历史数据等,此时就需要重新执行snapshot,在此记录下目前在实践的几种方法
准备数据
- 以MySQL为例,创建测试表,包含100条数据
- 创建connector
curl --location --request POST 'localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{
"name": "test_connector3",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.delay.ms": 1000,
"snapshot.fetch.size": 1000,
"database.hostname": "172.24.115.224",
"database.server.name": "binlog-app",
"database.port": "3306",
"database.user": "yuan",
"database.password": "yuan",
"table.include.list": "app.test_connector3",
"include.schema.changes": "false",
"decimal.handling.mode": "double",
"database.history.kafka.bootstrap.servers": "172.24.115.224:9092",
"database.history.kafka.topic": "app.history.schema",
"transforms.retopic.topic.regex": "(.*?)$",
"transforms": "retopic,rename,snapshotasinsert",
"transforms.rename.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.rename.replacement": "$1-$2-$3",
"transforms.snapshotasinsert.type": "io.debezium.connector.mysql.transforms.ReadToInsertEvent",
"transforms.retopic.topic.replacement": "$1",
"transforms.retopic.key.enforce.uniqueness": "false",
"transforms.retopic.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.rename.regex": "([\\w-]*).([\\w-]*).([\\w-]*)"
- 验证Kafka,已经生成了100条数据
解决方案
方案1
将指定connector的offset置为NULL,kafka-connect-service服务的offset信息保存在kafka.connect.service.offset里,该信息是由offset.storage.topic配置
- 获取指定connector(test_connector3)在各个partition里对应的offset信息
kafkacat -b localhost:9092 -C -t kafka.connect.service.offset -E -e -f 'Partition(%p) %k %s\n' | grep test_connector3
注意:需要参数-e(Exit successfully when last message received),否则可能导致pipe的过程中grep获取不到所有的数据
由上可知:key: ["test_connector3",{"server":"binlog-app"}], partition:19
- 删除connector
DELETE /connectors/{name}
注意:不能使用PUT /connectors/{name}/pause、PUT /connectors/{name}/resume接口,resume的时候并不会重新读取offset
- 删除connector offset
echo '["test_connector3",{"server":"binlog-app"}]|' | \
> kafkacat -P -Z -b localhost:9092 -t kafka.connect.service.offset -K \| -p 19
注意:kafkacat -P 会解析文件以生成数据,此处需要pipe,或者将文件作为参数,不能将string作为参数
- 重新创建connector
POST /connectors
- 验证Kafka数据已经写入,并且snapshot重新执行,Kafka里消息总量为200
方案2
这是一种比较取巧的方式,不是将offset置为初始值,而是在当前的基础上,重新同步所有的历史数据
- 执行SQL
update