mongodb:实时数据同步(一)
关于 mongodb 数据实时同步,如果只是做数据备份的话,直接搭建一个replica set集群或者shard集群就可以实现目的了。但这样的话作为备份库的节点都是secondery,你没法往备份库上写数据上去。
不幸的是我最近就遇到了这样的需求,一个云上 mongodb 和一个云下机房的 mongodb 。云上的数据需要实时同步到云下,但云下的数据库会写入一些其它业务。
这样的话我只能将数据实时从云上采集到云下库。
本文介绍的是基于kafka-connector的一种解决方案。
环境准备
已有搭建好的kafka集群,可以参考cosmo这篇 《Kafka集群搭建》 快速搭建一个开发用的kafka集群。 debezium提供的 connector 插件: debezium-connector-mongodb mongodb官方提供的connector插件: mongo-kafka-connect-1.0.1-all.jar
两个概念
kafka-connector 由两个重要的部分组成source和sink。source用来从数据源采集数据,sink用来将数据保存到目标数据源。
为什么要使用两个connector?
本文将使用debezium提供的变更数据事件采集器来采集数据,使用 mongodb 官方提供的connector中的sink将数据推送给下游数据源。
插件安装
将下载下来的两个压缩包放在kafka插件目录下
/usr/local/share/kafka/plugins
如果目录不存在请新建
解压 debezium-connector-mongodb 和 mongo-kafka-connect-1.0.1-all.jar
启动kafka-connect
kafka-connector启动分为单机版和集群版,我们这里不讨论单机版。
#在所有kafka brokers上执行下面命令,启动connector
bin/connect-distributed.sh -daemon config/connect-distributed.properties
因为kafka-connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083。
GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
Debezium Mongodb Connector配置
Property |
Default |
Description |
---|---|---|
name |
|
connector的名称(唯一指定) |
connector.class |
|
Mongodb connector 具体实现类,默认值为 io.debezium.connector.mongodb.MongoDbConnector |
mongodb.hosts |
|
mongodb 链接信息host:port[,host:port],如果 mongodb.members.auto.discover 是false,需要指定具体的副本集名称例如 rs0/host:port。如果是shard 集群请配置config server的地址。 |
mongodb.name |
|
采集好的数据会推送到kafka消息队列,topics为[db].[collection]。如果配置了这个name,将在topics前加此name作为前缀。 |
mongodb.user |
|
mongodb 用户名 |
mongodb.password |
|
mongodb 密码 |
mongodb.authsource |
admin |
mongodb 鉴权库 |
mongodb.ssl.enabled |
false |
链接是否使用ssl |
mongodb.ssl.invalid.hostname.allowed |
false |
是否严格检查主机名 |
database.whitelist |
empty string |
监听数据变更的db库白名单,与黑名单不能同时使用 |
database.blacklist |
empty string |
监听数据变更的db库黑名单, 与白名单不能同时使用 |
collection.whitelist |
empty string |
监听数据变更的collection库白名单, 与黑名单不能同时使用 。逗号分隔 |
collection.blacklist |
empty string |
监听数据变更的collection库黑名单, 与白名单不能同时使用 。逗号分隔 |
snapshot.mode |
initial |
默认为: initial ,在启动时如果在oplog中找不到偏移量,会创建一个快照进行初始化同步。如果不需要请设置为never。 |
field.blacklist |
empty string |
字段映射黑名单,配置的字段将不会同步 ,用逗号分隔 |
field.renames |
empty string |
字段重命名[old]:[new],用逗号分隔 |
tasks.max |
1 |
如果是副本集集群,默认值1是可以接收的。如果是shard cluster 最好大于等于分片数量 |
initial.sync.max.threads |
1 |
初始化同步任务数 |
tombstones.on.delete |
true |
是否在delete之后推送 tombstone 事件 |
snapshot.delay.ms |
|
connector启动后拍摄快照之前等待的时间,单位为(毫秒)避免集群中多个connector启动时中断快照。 |
snapshot.fetch.size |
0 |
拍摄快照时每次拉取的最大数 |
启动debezium-connector数据采集任务
{
"name" : "debezium",
"config" : {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts" : "vm4:27100",
"mongodb.user" : "root",
"mongodb.password" : "root",
"mongodb.authsource" : "admin",
"database.whitelist" : "test",
"tasks.max" : "4",
"mongodb.name" : "debezium"
}
curl -H "Content-Type: application/json" -X POST -d '{ "name" : "debezium", "config" : { "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts" : "vm4:27100", "mongodb.user" : "root", "mongodb.password" : "root", "mongodb.authsource" : "admin", "database.whitelist" : "test", "tasks.max" : "4", "mongodb.name" : "debezium" } }' http://vm2:8083/connectors
Kafka-Connector-Mongodb-Sink配置
{
"name" : "mongo-sink", #sink名称
"config" : {
"topics" : "debezium.sync.realtime_air", #监听的topics,多个用逗号分隔
"max.batch.size" : "50", #每次处理的最大数量
"connection.uri" : "mongodb://192.168.4.49:27017/sync", #目标mongodb链接
"connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector", #mongodb链接实现类
"change.data.capture.handler" : "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler", #CDC实现类
"key.converter" : "org.apache.kafka.connect.json.JsonConverter", #键序列化类
"key.converter.schemas.enable" : "true", #键转化是否包含架构
"value.converter" : "org.apache.kafka.connect.json.JsonConverter", #值序列化类
"value.converter.schemas.enable" : "true",#值转化是否包含架构
"database" : "sync", #写入的数据库名称
"collection" : "mongosink", #写入的集合名称
"topic.override.debezium.sync.realtime_air.collection" : "realtime_air" #覆盖配置,设置debezium.sync.realtime_air 写入的集合名称为realtime_air