将数据复制到 Azure SQL 数据库时,可能需要不同的写入行为:
- 追加:我的源数据只包含新记录。
- 更新插入:我的源数据包含插入和更新内容。
- 覆盖:我需要每次都重新加载整个维度表。
- 使用自定义逻辑进行写入:在将数据最终插入目标表之前,我需要额外的处理。
有关如何在 Azure 数据工厂中进行配置和最佳做法,请参阅相应的部分。
追加数据
追加数据是此 Azure SQL 数据库接收器连接器的默认行为。 Azure 数据工厂执行批量插入,以有效地在表中写入数据。 可以相应地在复制活动中配置源和接收器。
更新插入数据
选项 1: 如果有大量的数据需要复制,请使用以下方法执行更新插入:
- 首先,使用数据库范围的临时表通过复制活动批量加载所有记录。 系统不会记录对数据库范围的临时表执行的操作,因此你可以在数秒内加载数百万条记录。
- 在 Azure 数据工厂中运行存储过程活动,以应用 MERGE 或 INSERT/UPDATE 语句。 使用临时表作为源,以单个事务的形式执行所有更新或插入操作。 这样可以减少往返次数和日志操作次数。 在存储过程活动结束时,可以截断临时表,使之可以用于下一更新插入循环。
例如,在 Azure 数据工厂中,可以使用复制活动创建一个管道,并将其与存储过程活动相链接。 前者将数据从源存储复制到数据集中的 Azure SQL 数据库临时表(例如,表名为 ##UpsertTempTable 的表)。 然后,后者调用一个存储过程,以将临时表中的源数据合并到目标表中,并清理临时表。
在数据库中使用 MERGE 逻辑定义一个存储过程(如以下示例所示),以便从上述存储过程活动指向该过程。 假设目标是包含三个列的 Marketing 表:ProfileID、State 和 Category。 根据 ProfileID 列执行更新插入。
CREATE PROCEDURE [dbo].[spMergeData]
AS
BEGIN
MERGE TargetTable AS target
USING ##UpsertTempTable AS source
ON (target.[ProfileID] = source.[ProfileID])
WHEN MATCHED THEN
UPDATE SET State = source.State
WHEN NOT matched THEN
INSERT ([ProfileID], [State], [Category])
VALUES (source.ProfileID, source.State, source.Category);
TRUNCATE TABLE ##UpsertTempTable
END
选项 2: 也可选择在复制活动中调用存储过程。 此方法针对源表中的每个行运行,而不是在复制活动中使用批量插入作为默认方法,因此它不适用于大规模更新插入。
覆盖整个表
可以在复制活动接收器中配置 preCopyScript 属性。 在此情况下,对于运行的每个复制活动,Azure 数据工厂会先运行脚本。 然后,运行复制来插入数据。 例如,若要使用最新数据覆盖整个表,请指定一个脚本,以先删除所有记录,然后从源批量加载新数据。
使用自定义逻辑写入数据
使用自定义逻辑写入数据的步骤与更新插入数据部分中的描述类似。 如果在将源数据大规模地最终插入目标表之前需要应用额外的处理,可执行以下两项操作之一:
- 先加载到数据库范围的临时表,然后调用存储过程。
- 在复制过程中调用存储过程。
调用 SQL 接收器的存储过程
将数据复制到 Azure SQL 数据库时,还可通过其他参数配置并调用某个用户指定的存储过程。
提示
调用存储过程时,会逐行处理数据,而不使用批量操作,我们不建议将批量操作用于大规模复制。 在有关将数据载入 Azure SQL 数据库的最佳做法中了解详细信息。
当内置复制机制无法使用时,还可使用存储过程。 例如,在将源数据最终插入目标表之前应用额外的处理。 额外处理的示例包括合并列、查找其他值以及将数据插入多个表。
以下示例演示如何使用存储过程在 Azure SQL 数据库数据库中的表内执行 upsert。 假设输入数据和接收器 Marketing 表各有三列:ProfileID、State 和 Category。 基于 ProfileID 列执行 upsert,并仅将其应用于特定类别 。
输出数据集: “tableName”是存储过程中相同的表类型参数名,如以下存储过程脚本中所示:
{
"name": "AzureSQLDbDataset",
"properties":
{
"type": "AzureSqlTable",
"linkedServiceName": {
"referenceName": "<Azure SQL Database linked service name>",
"type": "LinkedServiceReference"
},
"typeProperties": {
"tableName": "Marketing"
}
}
}
按如下所示在复制活动中定义 SQL sink 节。
"sink": {
"type": "SqlSink",
"SqlWriterTableType": "MarketingType",
"SqlWriterStoredProcedureName": "spOverwriteMarketing",
"storedProcedureParameters": {
"category": {
"value": "ProductA"
}
}
}
在数据库中,使用与 SqlWriterStoredProcedureName 相同的名称定义存储过程。 它可处理来自指定源的输入数据,并将其合并到输出表中。 存储过程中的表类型的参数名称与数据集中定义的 tableName 相同。
CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256)
AS
BEGIN
MERGE [dbo].[Marketing] AS target
USING @Marketing AS source
ON (target.ProfileID = source.ProfileID and target.Category = @category)
WHEN MATCHED THEN
UPDATE SET State = source.State
WHEN NOT MATCHED THEN
INSERT (ProfileID, State, Category)
VALUES (source.ProfileID, source.State, source.Category);
END
在数据库中,使用与 sqlWriterTableType 相同的名称定义表类型。 表类型的架构与输入数据返回的架构相同。
CREATE TYPE [dbo].[MarketingType] AS TABLE(
[ProfileID] [varchar](256) NOT NULL,
[State] [varchar](256) NOT NULL,
[Category] [varchar](256) NOT NULL
)
Azure SQL 数据库的数据类型映射
从/向 Azure SQL 数据库复制数据时,以下映射用于从 Azure SQL 数据库数据类型映射到 Azure 数据工厂临时数据类型。
Azure SQL 数据库数据类型 | Azure 数据工厂临时数据类型 |
---|---|
bigint | Int64 |
binary | Byte[] |
bit | Boolean |
char | String, Char[] |
date | DateTime |
Datetime | DateTime |
datetime2 | DateTime |
Datetimeoffset | DateTimeOffset |
Decimal | Decimal |
FILESTREAM attribute (varbinary(max)) | Byte[] |
Float | Double |
image | Byte[] |
int | Int32 |
money | Decimal |
nchar | String, Char[] |
ntext | String, Char[] |
numeric | Decimal |
nvarchar | String, Char[] |
real | Single |
rowversion | Byte[] |
smalldatetime | DateTime |
smallint | Int16 |
smallmoney | Decimal |
sql_variant | Object |
text | String, Char[] |
time | TimeSpan |
timestamp | Byte[] |
tinyint | Byte |
uniqueidentifier | Guid |
varbinary | Byte[] |
varchar | String, Char[] |
xml | Xml |
备注: 对于映射到十进制临时类型的数据类型,目前 Azure 数据工厂支持的最大精度为 28。 如果有精度大于 28 的数据,请考虑在 SQL 查询中将其转换为字符串。