我正在尝试设置一个Debezium MySQL源连接器。我的目标是为每个数据库有一个主题,所以我正在研究利用主题的可能性,这样一个主题可以包含不同的消息类型,并且它们的模式可以存储在中。
按照这里的几个答案,我已经将密钥和值转换器主题名策略设置为 io.confluent.kafka.serializers.subject.TopicRecordNameStrategy 。
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
要将来自同一模式的所有消息重新路由到同一个主题,我将使用以下配置:
{ "name": "aws-db-connector", "config": { "group.id": "aws-db-group", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "secret-pw", "database.server.id": "184054", "database.server.name": "aws-db", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.aws-db", "database.include.list": "db1,db2", "transforms": "unwrap,Reroute", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "db,table,op,source.ts_ms", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)", "transforms.Reroute.topic.replacement": "$2_schema", "transforms.Reroute.key.field.name": "table", "transforms.Reroute.key.field.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)", "transforms.Reroute.key.field.replacement": "$3" }
在我的 docker-compose 文件中,我设置了:
docker-compose
- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy - CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081 - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
对于价值观来说,这是完美无缺的。我可以看到,我的模式注册中心包含多个主题,格式为 <TopicName>-<RecordName>-Value ,其中 TopicName 是我要重路由这个数据的主题的名称。 RecordName 是Debezium以 server_name.database_name.table_name 格式创建的“旧”主题名称。
<TopicName>-<RecordName>-Value
TopicName
RecordName
server_name.database_name.table_name
对于Keys来说,不幸的是,这个策略并不像预期的那样工作,而且我只有一个模式主题:看起来 RecordName 包含的是新的主题名,而不是原始的主题名称。如果一个字段名在不同的表中具有不同的类型,则会导致冲突和不兼容错误。
是否有任何方法在生成关键主题时提供适当的 RecordName ?
编辑-添加示例:
假设我的数据库包含三个表, table1 、 table2 和 table3 。
table1
table2
table3
Table1:
CREATE TABLE `table1` ( `id` INT NOT NULL AUTO_INCREMENT, `name` TEXT, PRIMARY KEY (`id`) );
Table2:
CREATE TABLE `table2` ( `id` INT NOT NULL AUTO_INCREMENT, `name` BINARY, PRIMARY KEY (`id`) );
Table3:
CREATE TABLE `table3` ( `id` BINARY NOT NULL, `name` INT, PRIMARY KEY (`id`)