Records in Azure SQL Database bijwerken of samenvoegen met behulp van Azure Functions

Momenteel ondersteunt Azure Stream Analytics (ASA) alleen het invoegen (toevoegen) van rijen aan SQL-uitvoer (Azure SQL Databases en Azure Synapse Analytics). In dit artikel worden tijdelijke oplossingen besproken voor het inschakelen van UPDATE, UPSERT of MERGE op SQL-databases, met behulp van Azure Functions als intermediaire laag.

De alternatieve opties voor Azure Functions worden aan het einde weergegeven.

Vereiste

U kunt gegevens naar een tabel schrijven met behulp van een van de volgende modi:

Modus Equivalente T-SQL-instructie Vereisten
Toevoegen INSERT Geen
Vervangen SAMENVOEGEN (UPSERT: Invoegen of Bijwerken) Unieke sleutel
Verzamelen MERGE (UPSERT) met samengestelde toewijzingsoperator (+=, -=...) Unieke sleutel en accumulator

Om de verschillen te illustreren, overweeg wat er gebeurt wanneer u de volgende twee records verwerkt.

Aankomsttijd Device_Id Meetwaarde
10:00:00 V 1
10:05 Een 20

In de toevoegmodus voegt u twee records in. De equivalente T-SQL-instructie is:

INSERT INTO [target] VALUES (...);

Resulteert in:

Gewijzigde_Tijd Device_Id Meetwaarde
10:00:00 V 1
10:05 A 20

In de vervangingsmodus krijgt u alleen de laatste waarde per sleutel. Hier gebruikt u Device_Id als sleutel. De equivalente T-SQL-instructie is:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resulteert in:

Gewijzigde_Tijd Apparaatsleutel Meetwaarde
10:05 V 20

Ten slotte kunt u in de cumulatieve modus optellenValue met een samengestelde toewijzingsoperator (+=). Hier gebruikt u ook Device_Id als sleutel:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resulteert in:

Gewijzigde_Tijd Apparaatsleutel Meetwaarde
10:05 Een 21

Voor prestatieoverwegingen ondersteunen de outputadapters van de ASA SQL-database momenteel alleen de Toevoegmodus. Deze adapters gebruiken bulk-insert om de doorvoer te maximaliseren en de backpressure te beperken.

In dit artikel wordt beschreven hoe u Azure Functions gebruikt om de modi Replace and Accumulate voor ASA te implementeren. Wanneer u een functie als intermediaire laag gebruikt, heeft de mogelijke schrijfprestaties geen invloed op de streamingtaak. In dit opzicht werkt het gebruik van Azure Functions het beste met Azure SQL. Met Synapse SQL kan het overschakelen van bulksgewijs naar rij-per-rij-instructies grotere prestatieproblemen opleveren.

uitvoer van Azure Functions

In deze taak vervangt u de ASA SQL-uitvoer door de ASA Azure Functions-uitvoer. Met de functie worden de mogelijkheden UPDATE, UPSERT of MERGE geïmplementeerd.

Op dit moment hebt u toegang tot een SQL Database in een functie met behulp van twee opties. De eerste optie is de Azure SQL uitvoerbinding. Het is momenteel beperkt tot C# en biedt alleen de vervangingsmodus. De tweede optie is het opstellen van een SQL-query die moet worden verzonden via het juiste SQL-stuurprogramma (Microsoft. Data.SqlClient voor .NET).

In beide voorbeelden wordt uitgegaan van het volgende tabelschema. Voor de bindingsoptie moet een primaire sleutel worden ingesteld in de doeltabel. Het is niet nodig, maar wordt aanbevolen bij het gebruik van een SQL-stuurprogramma.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Als u een functie als uitvoer van ASA wilt gebruiken, moet de functie voldoen aan de volgende verwachtingen:

  • Azure Stream Analytics verwacht HTTP-status 200 van de Functions-app voor batches die het met succes verwerkt.
  • Wanneer Azure Stream Analytics een 413 ("http-aanvraagentiteit te groot")-uitzondering ontvangt van een Azure-function, verkleint het de grootte van de batches die worden verzonden naar Azure Function.
  • Tijdens de testverbinding verzendt Stream Analytics een POST-aanvraag met een lege batch naar Azure Functions en verwacht de HTTP-status 20x terug om de test te valideren.

Optie 1: Bijwerken op sleutel met de Azure Functions SQL-binding

Deze optie maakt gebruik van de Azure Function SQL-uitvoerbinding. Deze extensie kan een object in een tabel vervangen zonder dat u een SQL-instructie hoeft te schrijven. Op dit moment worden samengestelde toewijzingsoperatoren (accumulaties) niet ondersteund.

Dit voorbeeld is gebaseerd op:

Volg deze zelfstudie om meer inzicht te krijgen in de bindingsmethode.

Maak eerst een standaard-HttpTrigger-functie-app door deze zelfstudie te volgen. Gebruik de volgende informatie:

  • Taal: C#
  • Runtime: .NET 6 (onder function/runtime v4)
  • Sjabloon: HTTP trigger

Installeer de bindingsextensie door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Voeg het SqlConnectionString item toe in de Values sectie van uwlocal.settings.json, vul de verbindingsreeks van de doelserver in:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, klassenaam en functienaam bij met uw eigen naam:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Werk de doeltabelnaam in de bindingssectie bij.

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Werk de Device klasse en mapping-sectie bij zodat deze overeenkomen met uw eigen schema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

U kunt nu de bedrading tussen de lokale functie en de database testen door foutopsporing (F5 in Visual Studio Code). De SQL-database moet bereikbaar zijn vanaf uw computer. U kunt SSMS gebruiken om de connectiviteit te controleren. Verzend vervolgens POST-aanvragen naar het lokale eindpunt. Een aanvraag met een lege hoofdtekst moet HTTP 204 retourneren. Een aanvraag met een werkelijke gegevenslading moet worden bewaard in de doeltabel (in de modus vervangen/bijwerken). Hier volgt een voorbeeldpayload die overeenkomt met het schema dat in dit voorbeeld wordt gebruikt:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

De functie kan nu worden gepubliceerd naar Azure. Stel een toepassingsinstelling in voor SqlConnectionString. De Azure SQL Server-firewall moet Azure-services toestaan om het te bereiken, zodat de live-functie toegang heeft.

Vervolgens kunt u de functie definiëren als uitvoer in de ASA-taak en deze gebruiken om records te vervangen in plaats van ze in te voegen.

Optie 2: Samenvoegen met samengestelde toewijzing (cumulatie) via een aangepaste SQL-query

Notitie

Bij het opnieuw opstarten en herstellen kan de ASA uitvoergebeurtenissen opnieuw verzenden die het al heeft verzonden. Dit gedrag kan ertoe leiden dat de accumulatielogica mislukt (dubbele afzonderlijke waarden). U kunt dit probleem voorkomen door dezelfde gegevens in een tabel uit te voeren met behulp van de systeemeigen ASA SQL-uitvoer. U kunt deze besturingstabel gebruiken om problemen te detecteren en de accumulatie zo nodig opnieuw te synchroniseren.

Deze optie maakt gebruik van Microsoft.Data.SqlClient. Met deze bibliotheek kunt u SQL-query's verzenden naar een SQL Database.

Dit voorbeeld is gebaseerd op:

Maak eerst een standaard-HttpTrigger-functie-app door deze zelfstudie te volgen. De volgende informatie wordt gebruikt:

  • Taal: C#
  • Runtime: .NET 6 (onder function/runtime v4)
  • Sjabloon: HTTP trigger

Installeer de SqlClient-bibliotheek door de volgende opdracht uit te voeren in een terminal die zich in de projectmap bevindt:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Voeg het SqlConnectionString item toe in de Values sectie van uwlocal.settings.json, vul de verbindingsreeks van de doelserver in:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Vervang de volledige functie (.cs bestand in het project) door het volgende codefragment. Werk de naamruimte, de klassenaam en de functienaam zelf bij:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Werk de sectie van de sqltext commando constructie bij zodat deze overeenkomt met uw eigen schema (u ziet hoe accumulatie wordt bereikt via de += operator tijdens het bijwerken).

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

U kunt nu de bedrading tussen de lokale functie en de database testen door foutopsporing (F5 in VS Code). De SQL-database moet bereikbaar zijn vanaf uw computer. U kunt SSMS gebruiken om de connectiviteit te controleren. Verzend vervolgens POST-aanvragen naar het lokale eindpunt. Een aanvraag met een lege hoofdtekst moet HTTP 204 retourneren. Een verzoek met een daadwerkelijke payload moet worden bewaard in de bestemmingstabel (in accumulatie-/samenvoegmodus). Hier volgt een voorbeeldpayload die overeenkomt met het schema dat in dit voorbeeld wordt gebruikt:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

De functie kan nu worden gepubliceerd naar Azure. Er moet een toepassingsinstelling worden ingesteld voor SqlConnectionString. De Azure SQL Server-firewall moet Azure-services toestaan zodat de live-functie deze kan bereiken.

De functie kan vervolgens worden gedefinieerd als uitvoer in de ASA-taak en wordt gebruikt om records te vervangen in plaats van ze in te voegen.

Alternatieven

Buiten Azure Functions kunnen meerdere methoden het verwachte resultaat bereiken. In deze sectie worden enkele van deze methoden beschreven.

Naverwerking in de doel-SQL Database

Een achtergrondtaak werkt zodra de gegevens in de database worden ingevoegd via de standaard-ASA SQL-uitvoer.

Voor Azure SQL gebruikt u INSTEAD OFDML-triggers om de INSERT-opdrachten te onderscheppen die door ASA worden uitgevoerd.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Voor Synapse SQL kan ASA worden ingevoegd in een faseringstabel. Een terugkerende taak kan vervolgens de gegevens naar behoefte transformeren in een tussenliggende tabel. Ten slotte worden de gegevens verplaatst naar de productietabel.

Voorverwerking in Azure Cosmos DB

Azure Cosmos DB biedt systeemeigen ondersteuning voor UPSERT. Hier is alleen toevoegen of vervangen mogelijk. U moet accumulaties aan de clientzijde beheren in Azure Cosmos DB.

Als de vereisten overeenkomen, kunt u de doel-SQL-database vervangen door een Azure Cosmos DB-instance. Deze wijziging vereist een belangrijke wijziging in de algehele oplossingsarchitectuur.

Voor Synapse SQL kunt u Azure Cosmos DB als intermediaire laag gebruiken via Azure-Synapse Link voor Azure Cosmos DB. Gebruik Azure Synapse Link om een analytisch archief te maken. U kunt vervolgens rechtstreeks in Synapse SQL een query uitvoeren op dit gegevensarchief.

Vergelijking van de alternatieven

Elke benadering biedt verschillende waardeproposities en mogelijkheden:

Type Optie Modi Azure SQL Database Azure Synapse Analytics
Naverwerking
Triggers Vervangen, verzamelen + N.v.t., triggers zijn niet beschikbaar in Synapse SQL
Testomgeving Vervangen, verzamelen + +
Voorverwerking
Azure Functions Vervangen, accumuleren + - (rij per rij prestaties)
Vervanging van Azure Cosmos DB Vervangen N.v.t. N.v.t.
Azure Cosmos DB Azure Synapse Link Vervangen N.v.t. +

Ondersteuning krijgen

Probeer de microsoft Q&A-vragenpagina voor Azure Stream Analytics voor meer hulp.

Volgende stappen