首页 » Big Data and AI » Database » 正文

通过数据工厂将数据载入 Azure SQL 数据库的最佳做法

数据工厂的顶级视图

将数据复制到 Azure SQL 数据库时,可能需要不同的写入行为:

  • 追加:我的源数据只包含新记录。
  • 更新插入:我的源数据包含插入和更新内容。
  • 覆盖:我需要每次都重新加载整个维度表。
  • 使用自定义逻辑进行写入:在将数据最终插入目标表之前,我需要额外的处理。

有关如何在 Azure 数据工厂中进行配置和最佳做法,请参阅相应的部分。

追加数据

追加数据是此 Azure SQL 数据库接收器连接器的默认行为。 Azure 数据工厂执行批量插入,以有效地在表中写入数据。 可以相应地在复制活动中配置源和接收器。

更新插入数据

选项 1: 如果有大量的数据需要复制,请使用以下方法执行更新插入:

  • 首先,使用数据库范围的临时表通过复制活动批量加载所有记录。 系统不会记录对数据库范围的临时表执行的操作,因此你可以在数秒内加载数百万条记录。
  • 在 Azure 数据工厂中运行存储过程活动,以应用 MERGE INSERT/UPDATE 语句。 使用临时表作为源,以单个事务的形式执行所有更新或插入操作。 这样可以减少往返次数和日志操作次数。 在存储过程活动结束时,可以截断临时表,使之可以用于下一更新插入循环。

例如,在 Azure 数据工厂中,可以使用复制活动创建一个管道,并将其与存储过程活动相链接。 前者将数据从源存储复制到数据集中的 Azure SQL 数据库临时表(例如,表名为 ##UpsertTempTable 的表)。 然后,后者调用一个存储过程,以将临时表中的源数据合并到目标表中,并清理临时表。

Upsert

在数据库中使用 MERGE 逻辑定义一个存储过程(如以下示例所示),以便从上述存储过程活动指向该过程。 假设目标是包含三个列的 Marketing 表:ProfileIDState 和 Category 根据 ProfileID 列执行更新插入。

SQL
CREATE PROCEDURE [dbo].[spMergeData]
AS
BEGIN
    MERGE TargetTable AS target
    USING ##UpsertTempTable AS source
    ON (target.[ProfileID] = source.[ProfileID])
    WHEN MATCHED THEN
        UPDATE SET State = source.State
    WHEN NOT matched THEN
        INSERT ([ProfileID], [State], [Category])
      VALUES (source.ProfileID, source.State, source.Category);
    
    TRUNCATE TABLE ##UpsertTempTable
END

选项 2: 也可选择在复制活动中调用存储过程。 此方法针对源表中的每个行运行,而不是在复制活动中使用批量插入作为默认方法,因此它不适用于大规模更新插入。

覆盖整个表

可以在复制活动接收器中配置 preCopyScript 属性。 在此情况下,对于运行的每个复制活动,Azure 数据工厂会先运行脚本。 然后,运行复制来插入数据。 例如,若要使用最新数据覆盖整个表,请指定一个脚本,以先删除所有记录,然后从源批量加载新数据。

使用自定义逻辑写入数据

使用自定义逻辑写入数据的步骤与更新插入数据部分中的描述类似。 如果在将源数据大规模地最终插入目标表之前需要应用额外的处理,可执行以下两项操作之一:

  • 先加载到数据库范围的临时表,然后调用存储过程。
  • 在复制过程中调用存储过程。

调用 SQL 接收器的存储过程

将数据复制到 Azure SQL 数据库时,还可通过其他参数配置并调用某个用户指定的存储过程。

 提示

调用存储过程时,会逐行处理数据,而不使用批量操作,我们不建议将批量操作用于大规模复制。 有关将数据载入 Azure SQL 数据库的最佳做法中了解详细信息。

当内置复制机制无法使用时,还可使用存储过程。 例如,在将源数据最终插入目标表之前应用额外的处理。 额外处理的示例包括合并列、查找其他值以及将数据插入多个表。

以下示例演示如何使用存储过程在 Azure SQL 数据库数据库中的表内执行 upsert。 假设输入数据和接收器 Marketing 表各有三列:ProfileIDState 和 Category 基于 ProfileID 列执行 upsert,并仅将其应用于特定类别 。

输出数据集: “tableName”是存储过程中相同的表类型参数名,如以下存储过程脚本中所示:

JSON
{
    "name": "AzureSQLDbDataset",
    "properties":
    {
        "type": "AzureSqlTable",
        "linkedServiceName": {
            "referenceName": "<Azure SQL Database linked service name>",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
            "tableName": "Marketing"
        }
    }
}

按如下所示在复制活动中定义 SQL sink 节。

JSON
"sink": {
    "type": "SqlSink",
    "SqlWriterTableType": "MarketingType",
    "SqlWriterStoredProcedureName": "spOverwriteMarketing",
    "storedProcedureParameters": {
        "category": {
            "value": "ProductA"
        }
    }
}

在数据库中,使用与 SqlWriterStoredProcedureName 相同的名称定义存储过程。 它可处理来自指定源的输入数据,并将其合并到输出表中。 存储过程中的表类型的参数名称与数据集中定义的 tableName 相同。

SQL
CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256)
AS
BEGIN
  MERGE [dbo].[Marketing] AS target
  USING @Marketing AS source
  ON (target.ProfileID = source.ProfileID and target.Category = @category)
  WHEN MATCHED THEN
      UPDATE SET State = source.State
  WHEN NOT MATCHED THEN
      INSERT (ProfileID, State, Category)
      VALUES (source.ProfileID, source.State, source.Category);
END

在数据库中,使用与 sqlWriterTableType 相同的名称定义表类型。 表类型的架构与输入数据返回的架构相同。

SQL
CREATE TYPE [dbo].[MarketingType] AS TABLE(
    [ProfileID] [varchar](256) NOT NULL,
    [State] [varchar](256) NOT NULL,
    [Category] [varchar](256) NOT NULL
)

Azure SQL 数据库的数据类型映射

从/向 Azure SQL 数据库复制数据时,以下映射用于从 Azure SQL 数据库数据类型映射到 Azure 数据工厂临时数据类型。

Azure SQL 数据库数据类型 Azure 数据工厂临时数据类型
bigint Int64
binary Byte[]
bit Boolean
char String, Char[]
date DateTime
Datetime DateTime
datetime2 DateTime
Datetimeoffset DateTimeOffset
Decimal Decimal
FILESTREAM attribute (varbinary(max)) Byte[]
Float Double
image Byte[]
int Int32
money Decimal
nchar String, Char[]
ntext String, Char[]
numeric Decimal
nvarchar String, Char[]
real Single
rowversion Byte[]
smalldatetime DateTime
smallint Int16
smallmoney Decimal
sql_variant Object
text String, Char[]
time TimeSpan
timestamp Byte[]
tinyint Byte
uniqueidentifier Guid
varbinary Byte[]
varchar String, Char[]
xml Xml

备注: 对于映射到十进制临时类型的数据类型,目前 Azure 数据工厂支持的最大精度为 28。 如果有精度大于 28 的数据,请考虑在 SQL 查询中将其转换为字符串。