添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

1 背景:

随着公司业务的成长,数据量也随之的不断增长。随之而来的问题是在做ETL的时候,时间花费也越来越长。为了节省时间开销,我们只想要更新最新的数据,不想要把公司历年所有的数据都进行处理。这种情况就被称为变更数据捕获(Change Data Capture,又名CDC)。在SQLServer2008之前,对数据变更的捕获通常使用触发器把DML操作中的INSERT/UPDATE/DELETE数据记录下来,但是触发器的维护比较困难;时间戳严重依赖于数据库的设计,而且必须在业务代码中对时间字段进行维护。

1.1 适用环境:

仅在SQLServer2008(含)以后的企业版、开发版和评估版中可用,标准版中不可用

1.2 详解:

变更数据捕获的更改数据源为 SQL Server 事务日志。 在将插入、更新和删除应用于跟踪的源表时,将会在日志中添加说明这些更改的项。 日志用作捕获进程的输入来源。 它会读取日志,并在跟踪的表的关联更改表中添加有关更改的信息。 系统将提供一些函数,以枚举在更改表中指定范围内发生的更改,并以筛选的结果集的形式返回该值。 通常,应用程序进程使用筛选的结果集在某种外部环境中更新源表示形式。

2 基本操作

相关的存储过程:

Sys.sp_cdc_add_job           --添加job
Sys.sp_cdc_generate_wrapper_function
Sys.sp_cdc_change_job        --修改job配置
Sys.sp_cdc_get_captured_columns     --获取捕获列信息
Sys.sp_cdc_cleanup_change_table     
Sys.sp_cdc_get_ddl_history
Sys.sp_cdc_disable_db   --禁用当前数据的CDC,建议先禁用表,再禁用库
Sys.sp_cdc_help_change_data_capture
Sys.sp_cdc_disable_table     --关闭表捕获
Sys.sp_cdc_help_jobs
Sys.sp_cdc_drop_job          --删除Job
Sys.sp_cdc_scan              
Sys.sp_cdc_enable_db         --启用当前数据库CDC功能
Sys.sp_cdc_start_job         --启动Job
Sys.sp_cdc_enable_table      --开启表捕获
Sys.sp_cdc_stop_job          --停止Job

相关函数:

Cdc.fn_cdc_get_all_changes_<capture_instance>
Sys.fn_cdc_has_column_changed
Cdc.fn_cdc_get_net_changes_<capture_instance>
Sys.fn_cdc_increment_lsn
Sys.fn_cdc_decrement_lsn
Sys.fn_cdc_is_bit_set
Sys.fn_cdc_get_column_ordinal
Sys.fn_cdc_map_lsn_to_time
Sys.fn_cdc_get_max_lsn
Sys.fn_cdc_map_time_to_lsn
Sys.fn_cdc_get_min_lsn

3. 操作案例

本节以microsoft发布的adventureworks数据库为例来演示如何配置和使用Sql Server的CDC功能,本文中使用的数据库为SQL Server 2008 R2, AdventureWorks 数据库可在 这里 下载,然后恢复到sql server中来。在操作之前确保SQL Agent服务是启动状态,最好设置为自动启动。

3.1 对AdventureWorks启用CDC

USE AdventureWorks
EXECUTE sys.sp_cdc_enable_db;

sys.sp_cdc_enable_db

作用域: 整个目标库,包含元数据、DDL触发器、cdc架构和cdc用户。无法对系统数据库和分发数据库启用该功能。且执行者需要用sysadmin角色权限。

返回值: 0,成功; 1,失败。

如果执行过程中出现 错误: 返回的错误为15517: '无法作为数据库主体执行,因为主体"dbo" 不存在、无法模拟这种类型的主体,或您没有所需的权限。 原因为某些存储过程使用了具有WITH EXECUTE AS 的选项。使其在当前库具有了某个架构,但是当在别的地方执行时,但是该架构不存在。解决办法为(如果成功,请忽略):

USE AdventureWorks
ALTER AUTHORIZATION ON DATABASE::[AdventureWorks] TO [sa]
EXECUTE sys.sp_cdc_enable_db;

经过检查,uspUpdateEmployeeHireInfo这个存储过程的确有:WITH EXECUTE AS CALLER,使用sa的原因是即使sa被禁用,sa还是存在的。所以不会报错。

3.2 检查AdventureWorks CDC是否开启

SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC OFF 'ELSE 'CDC ON'END as 描述
FROM sys.databases
WHERE NAME = 'AdventureWorks'

 数据库CDC开启后,将在当前数据库下创建cdc用户,在架构下面也会增加cdc这个架构,因为CDC要求独占方式使用这两个架构,所以要单独创建。如果存在了非CDC功能创建的cdc用户、架构的话,则需要先删除该cdc命名的架构,才能开启。如下图所示:

当前数据库的系统表下,会创建一些CDC辅助的表,用于在CDC过程中,记录一些系统信息

 3.3 对数据库AdventureWorks中的表开启CDC

使用db_owner角色的成员执行sys.sp_cdc_enable_table为每个需要跟踪的表创建捕获实例,sys.sp_cdc_enable_table的参数列表为:

sys.sp_cdc_enable_table
    [ @source_schema = ] 'source_schema',
    [ @source_name = ] 'source_name' ,
    [ @role_name = ] 'role_name'
    [,[ @capture_instance = ] 'capture_instance' ]
    [,[ @supports_net_changes = ] supports_net_changes ]
    [,[ @index_name = ] 'index_name' ]
    [,[ @captured_column_list = ] 'captured_column_list' ]
    [,[ @filegroup_name = ] 'filegroup_name' ]
  [,[ @partition_switch = ] 'partition_switch' ]

a)  执行sys.sp_cdc_enable_table后,可以通过sys.tables目录视图中的is_tracked_by_cdc列来判断是否创建成功。

b) 默认情况下会对表的全部列做捕获,如果只需要对某些列做捕获,可以在@captured_column_list参数中指定这些列;

c) 如果要把更改表放到文件组里的话,最好创建单独的文件组(最起码与源表独立)。

d) 如果不想控制访问角色,则@role_name必须显式设置为null。如果指定了不存在的角色,执行该存储过程后,会自动创建指定的角色。

本节我们选择HumanResources.Department、Person.ADDRESS、Person.Person这三张表开启CDC功能。

USE AdventureWorks;
--表HumanResources.Department
EXECUTE sys.sp_cdc_enable_table
    @source_schema = N'HumanResources'
  , @source_name = N'Department'
  , @role_name = N'cdc_Admin'--可以自动创建
  , @capture_instance=DEFAULT
--表Person.ADDRESS
EXECUTE sys.sp_cdc_enable_table
    @source_schema = N'Person'
  , @source_name = N'ADDRESS'
  , @role_name = N'cdc_Admin'--可以自动创建
  , @capture_instance=DEFAULT
--表Person.Person
EXECUTE sys.sp_cdc_enable_table
    @source_schema = N'Person'
  , @source_name = N'Person'
  , @role_name = N'cdc_Admin'--可以自动创建
  , @capture_instance=DEFAULT

1. 系统表中多出了三张cdc.{Schema}_{TableName}_CT的表,其中SchemaName为被捕获表的架构名称,TableName为被捕获的表名

2. 在SQL Agent下创建了两个Job, cdc.AdventureWorks_cleanup和cdc.AdventureWorks_capture

3. cdc_Admin角色被创建

 3.4 验证三张表的CDC开启成功

SELECT  name ,
        is_tracked_by_cdc ,
        CASE WHEN is_tracked_by_cdc = 0 THEN 'CDC ON'
             ELSE 'CDC OFF'
        END 描述
FROM    sys.tables
WHERE   OBJECT_ID IN( OBJECT_ID('HumanResources.Department'),
                       OBJECT_ID('Person.ADDRESS'),
                       OBJECT_ID('Person.Person') )

 3.5 CDC功能验证

3.5.1 查看HumanResources.Department变更之前的数据

3.5.2 新增数据

INSERT  INTO HumanResources.Department
        ( Name ,
          GroupName ,
          ModifiedDate
        SELECT  Name + '1' ,
                GroupName + '1' ,
                GETDATE() ModifiedDate
        FROM    HumanResources.Department

 操作之后的数据增加了16条,

 查询CDC变更表cdc.HumanResources_Department_CT:

SELECT * FROM cdc.HumanResources_Department_CT

结果如下:

可以看到的确多了16条记录,注意__$operation的值

3.5.3 删除HumanResources.Department在3.5.3步中新增的数据

DELETE  FROM HumanResources.Department WHERE DepartmentID>16

查询CDC数据

3.5.4 修改数据

UPDATE  HumanResources.Department SET ModifiedDate=GETDATE()

我们发现这次CDC表增加的数据增加了2倍,因为cdc.<capture_instance>_CT这样命名的表,是用于记录源表更改的表,对于insert/delete操作,会有对应的一行记录,而对于update,会有两行记录

__$operation列:1 = 删除、2= 插入、3= 更新(旧值)、4= 更新(新值)
__$start_lsn列:由于更改是来源与数据库的事务日志,保存事务日志的开始序列号(LSN)

 微软不建议直接查询这类表,建议使用cdc.fn_cdc_get_all_changes_<捕获实例> 和cdc.fn_cdc_get_net_changes_<capture_instance>  来查询。

使用示例如下:

USE AdventureWorks;
SELECT DepartmentID,Name,GroupName,__$operation as operation_type,ModifiedDate
cdc.fn_cdc_get_net_changes_HumanResources_Department(sys.fn_cdc_get_min_lsn('HumanResources_Department'), sys.fn_cdc_get_max_lsn(), N'all with mask')
--最后一个参数说明如下:
--all
--返回的行和元数据列 $start_lsn 中应用该行所需的操作的最终更改的 LSN 和_ _$operation。 该列_ _$update_mask 始终为 NULL。
--all with mask
--返回的行和元数据列 $start_lsn 中应用该行所需的操作的最终更改的 LSN 和_ _$operation。 此外,当更新操作返回 (__$operation = 4) 的更新中修改过的捕获的列中返回的值中进行标记_ _$update_mask。
--all with merge
--返回对元数据列 $start_lsn 中的行所做的最终更改的 LSN。 该列_ _$operation 将是两个值之一: 1 表示删除,5 表示应用更改所需的操作是插入或更新。 该列_ _$update_mask 始终为 NULL。

结果如下:

4. 常用操作举例

4.1 查询已经开启的捕获实例

由于可能不记得或者不知道开启了什么表的捕获,所以可以使用以下语句来查找

USE AdventureWorks;
--返回所有表的变更捕获配置信息
EXECUTE sys.sp_cdc_help_change_data_capture;

4.2 查看对某个实例(即表)的哪些列做了捕获监控

USE AdventureWorks;
--查看对某个实例(即表)的哪些列做了捕获监控
EXEC sys.sp_cdc_get_captured_columns
@capture_instance = 'HumanResources_Department' -- sysname

4.3 查看当前库所有的cdc Job

USE AdventureWorks;
--查看当前库所有的cdc Job
SELECT * FROM msdb.dbo.cdc_jobs

4.4 查看当前配置使用sp_cdc_help_jobs

启用cdc之后会自动创建了两个作业,可以先使用sp_cdc_help_jobs来查看

EXECUTE sp_cdc_help_jobs

对于一个大型的OLTP系统,由于数据更改会非常频繁,变更表中的数据会非常多,如果存放过久(最久可以存放100年),那对数据库空间是非常大的挑战。此时可以调整上图中cdc.AdventureWorks_cleanup 中retention(单位:分钟),默认情况下,保留时间为3天.

4.5 修改作业配置

USE AdventureWorks;
--显示原有配置:
EXEC sp_cdc_help_jobs
--更改数据保留时间为分钟
EXECUTE sys.sp_cdc_change_job
    @job_type = N'cleanup',
    @retention=100
--停用作业
EXEC sys.sp_cdc_stop_job N'cleanup'   --job name 为NVARCHAR类型
--启用作业
EXEC sys.sp_cdc_start_job N'cleanup'  --job name 为NVARCHAR类型
--再次查看
EXEC sp_cdc_help_jobs

执行后,cleanup作业的运行时间间隔已经被修改成100分钟了.

注意:修改后要先停用(如果已经启用),再启用,才能生效

 4.6 作业的其他操作

USE AdventureWorks;
--停用作业
EXEC sys.sp_cdc_stop_jobN'cleanup'
--启用作业
EXEC sys.sp_cdc_start_jobN'cleanup'
--删除作业
EXEC sys.sp_cdc_drop_job@job_type = N'cleanup' -- nvarchar(20)
--创建作业
EXEC sys.sp_cdc_add_job
    @job_type = N'cleanup',
    @start_job = 0,
    @retention = 5760
--查看作业
EXEC sys.sp_cdc_help_jobs

 4.7 DDL变更捕获

CDC除了捕获数据变更之外,还能捕获DDL操作的变化。

4.7.1 现在先来对HumanResources.Department 表修改一下,把name的长度加长

USE AdventureWorks;
ALTER TABLE HumanResources.Department ALTER COLUMN Name NVARCHAR(120) ;

4.7.2 查询ddl记录表

USE AdventureWorks;
SELECT  * FROM    cdc.ddl_history

我们发现有一条schema的表更记录

4.8 使用CDC的函数来获取更改

 4.8.1 使用cdc.fn_cdc_get_all_changes_HumanResources_Department 函数报告捕获实例HumanResources_Department 的当前所有可用更改

USE AdventureWorks;
DECLARE @from_lsn binary(10)   --开始事务序列号
DECLARE @to_lsn   binary(10)   --终止事务序列号
SET @from_lsn = sys.fn_cdc_get_min_lsn('HumanResources_Department')
SET @to_lsn   = sys.fn_cdc_get_max_lsn()
SELECT * 
FROM cdc.fn_cdc_get_all_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all update old');
-----------------------------------------------------------------------------------------------------------------------------------------------------
USE AdventureWorks;
DECLARE @datetimeString as VARCHAR(100)
--当前日期的前一天0点时间
SELECT @datetimeString=DATENAME(year,GETDATE())+'-'+DATENAME(month,GETDATE())+'-'+DATENAME(DAY,DATEADD(DAY,-1,GETDATE()))+' 00:00:00' 
DECLARE @begin_time as DATETIME
set @begin_time=CONVERT(DATETIME,@datetimeString)
DECLARE @end_time as DATETIME
--当前时间
SET @end_time = GETDATE();
DECLARE @from_lsn BINARY(10), @to_lsn BINARY(10);
-- Map the time interval to a change data capture query range.
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
SELECT
[__$operation] as operation_type
,[DepartmentID]
,[Name]
,[GroupName]
,[ModifiedDate]
FROM cdc.fn_cdc_get_net_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all with merge');

4.8.2 获取某个时间段的更改信息

先根据日志序列号(logsequence number ,LSN)来获取跟踪变更数据:Sys.fn_cdc_map_time_to_lsn获取变更范围内的最大、最小LSN值。可以使用:

a) Smallest greater than;

b) smallest greater than orequal;

c) largest less than;

d) largest less than or equal.

如查询某个时间段插入的数据:

USE AdventureWorks;
--插入数据
INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate)
VALUES('test','abc',GETDATE())
INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate)
VALUES('test1','abc1',GETDATE())
--检查数据
DECLARE @bglsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('smallest greater than or equal','2018-12-29 14:32:00.997')
DECLARE @edlsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('largest less than or equal',GETDATE())
SELECT DepartmentID,GroupName,Name
FROM cdc.HumanResources_Department_CT
WHERE [__$operation]=2 AND [__$start_lsn] BETWEEN @bglsn AND @edlsn

结果如下:

4.8.3 sys.fn_cdc_map_lsn_to_time 查询变更时间

USE AdventureWorks;
SELECT [__$operation] ,
       CASE [__$operation] 
       WHEN 1 THEN 'DELETE' 
       WHEN 2 THEN 'INSERT' 
       WHEN 3 THEN 'BEFORE UPDATE'
       WHEN 4 THEN 'AFTER UPDATE' 
       END [类型],
       sys.fn_cdc_map_lsn_to_time([__$start_lsn]) [更改时间] ,
       name ,
       DepartmentID ,
       GroupName ,
       ModifiedDate
FROM   cdc.HumanResources_Department_CT

4.8.4 获取LSN边界

USE AdventureWorks;
SELECT sys.fn_cdc_get_max_lsn()                                   AS [数据库级别的最大LSN],
       sys.fn_cdc_get_min_lsn('cdc.HumanResources_Department_CT') AS [捕获实例的lsn]

这两个值可以用于上面提到的函数里面用于筛选数据时使用。

4.8.5 更新DepartmentID=33的记录

update  HumanResources.Department set name ='changed for test record 33' where DepartmentID=33;

可以利用这个特性很方便地处理ETL过程中的缓慢变化维.

4.8.5 返回某个表的变更捕获配置信息

EXEC sys.sp_cdc_help_change_data_capture '{schema}', '{table_name}'

例如查看HumanResources.Department的配置信息:

USE AdventureWorks;
EXEC sys.sp_cdc_help_change_data_capture 'HumanResources', 'Department'

 4.8.6 获取当前库中所有变更捕获配置信息

USE AdventureWorks;
--返回所有表的变更捕获配置信息
EXECUTE sys.sp_cdc_help_change_data_capture;

4.8.7 查看对某个表的哪些列做了捕获监控,使用上面返回的capture_instance列值

USE AdventureWorks;
--查看对某个表的哪些列做了捕获监控,使用上面返回的capture_instance列值
EXEC sys.sp_cdc_get_captured_columns @capture_instance = 'HumanResources_Department'

由于在开启该表时sys.sp_cdc_enable_table 的参数:@captured_column_list参数没有指定,所以dbo.Department表的所有字段都进行监控了,如果你只关心某些字段,强烈建议在创建捕获的时候设置这个属性

4.8.8

declare @datetimeString as  varchar(100)
Select @datetimeString=Datename(year,GetDate())+'-'+Datename(month,GetDate())+'-'+Datename(day,dateadd(day,-1,GetDate()))+' 00:00:00'
declare @begin_time as datetime
set  @begin_time=convert(datetime,@datetimeString)
--select @begin_time
declare @end_time as datetime
SET @end_time = getdate();
declare @from_lsn binary(10), @to_lsn binary(10);
-- Map the time interval to a change data capture query range.
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
--select @from_lsn,@to_lsn
SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@from_lsn, @to_lsn, N'all with merge'); --declare @min_lsn as binary(10) --declare @max_lsn as binary(10) --set @min_lsn =sys.fn_cdc_get_min_lsn('dbo_s_Fee') --set @max_lsn=sys.fn_cdc_get_max_lsn() --SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
--OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp --FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@min_lsn, @max_lsn, N'all with merge');

5. 参考文献

a)  关于变更数据捕获 (SQL Server)

b) https://www.cnblogs.com/chenmh/p/4408825.html