Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
För närvarande stöder Azure Stream Analytics (ASA) endast infogning av (väntande) rader i SQL-utdata (Azure SQL Databaser och Azure Synapse Analytics). I den här artikeln beskrivs lösningar för att aktivera UPDATE, UPSERT eller MERGE på SQL-databaser med hjälp av Azure Functions som mellanliggande lager.
Alternativa lösningar till Azure Functions presenteras i slutet.
Krav
Du kan skriva data till en tabell med något av följande lägen:
| Läge | Motsvarande T-SQL-instruktion | Krav |
|---|---|---|
| Lägga till | INSERT | Ingen |
| Replace | MERGE (UPSERT) | Unik nyckel |
| Ackumulera | MERGE (UPSERT) med sammansatt tilldelningsoperator (+=, -=...) |
Unik nyckel och ackumulator |
För att illustrera skillnaderna bör du tänka på vad som händer när du matar in följande två poster:
| Ankomsttid | Device_Id | Mått_Värde |
|---|---|---|
| 10:00:00 | A | 1 |
| 10:05 | A | 20 |
I tilläggsläget infogar du två poster. Motsvarande T-SQL-instruktion är:
INSERT INTO [target] VALUES (...);
Resulterar i:
| Ändringstid | Device_Id | Mätvärde |
|---|---|---|
| 10:00:00 | A | 1 |
| 10:05 | A | 20 |
I ersättningsläge får du bara det sista värdet efter nyckel. Här använder du Device_Id som nyckel. Motsvarande T-SQL-instruktion är:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resulterar i:
| Modifierad_Tid | Enhets_Nyckel | Mätvärde |
|---|---|---|
| 10:05 | A | 20 |
Slutligen summerarValue du i ackumulerat läge med en sammansatt tilldelningsoperator (+=). Här använder du även Device_Id som nyckel:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resulterar i:
| Ändrad_Tid | Enhets_Nyckel | Mätvärde |
|---|---|---|
| 10:05 | A | 21 |
För prestandaöverväganden stöder utgångsadaptrarna för ASA SQL-databasen för närvarande endast tilläggsläge inbyggt. Dessa adaptrar använder massinfogning för att maximera dataflödet och begränsa tillbakatrycket.
Den här artikeln visar hur du använder Azure Functions för att implementera ersätt- och ackumulerade lägen för ASA. När du använder en funktion som mellanliggande lager, påverkas inte strömningsjobbet av den potentiella skrivprestandan. I det här avseendet fungerar användning av Azure Functions bäst med Azure SQL. Med Synapse SQL kan växling från massuttryck till rad-för-rad-instruktioner skapa större prestandaproblem.
Azure Functions utdata
I det här jobbet ersätter du ASA SQL-utdata med utdata från ASA Azure Functions. Funktionen implementerar UPDATE-, UPSERT- eller MERGE-funktioner.
För närvarande kan du komma åt en SQL Database i en funktion med hjälp av två alternativ. Det första alternativet är utdatabindningen Azure SQL. Den är för närvarande begränsad till C# och erbjuder endast ersättningsläge. Det andra alternativet är att skapa en SQL-fråga som ska skickas via lämplig SQL-drivrutin (Microsoft. Data.SqlClient för .NET).
Båda följande exempel förutsätter följande tabellschema. Bindningsalternativet kräver att en primärnyckel anges i måltabellen. Det är inte nödvändigt, men rekommenderas, när du använder en SQL-drivrutin.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Om du vill använda en funktion som utdata från ASA måste funktionen uppfylla följande förväntningar:
- Azure Stream Analytics förväntar sig HTTP-status 200 från Functions-appen för batchar som den bearbetar korrekt.
- När Azure Stream Analytics tar emot ett undantag på 413 ("http Request Entity Too Large") från en Azure-funktion minskar storleken på de batchar som den skickar till Azure funktion.
- Vid test av anslutning skickar Stream Analytics en POST-begäran med en tom batch till Azure Functions och förväntar sig HTTP-status 20x tillbaka för att validera testet.
Alternativ 1: Uppdatera efter nyckel med Azure Function SQL-bindning
Det här alternativet använder Azure Function SQL-utdatabindning. Det här tillägget kan ersätta ett objekt i en tabell utan att du behöver skriva en SQL-instruktion. För närvarande stöder den inte sammansatta tilldelningsoperatorer (ackumuleringar).
Det här exemplet byggdes på:
- Azure Functions-körningsversion 4
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
För att bättre förstå bindningsmetoden följer du självstudiekursen där.
Skapa först en HttpTrigger-funktionsapp med standardinställningar genom att följa den här guiden. Använd följande information:
- Språk:
C# - Körtid:
.NET 6(under funktion/körtid v4) - Mall:
HTTP trigger
Installera bindningstillägget genom att köra följande kommando i en terminal i projektmappen:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Lägg till SqlConnectionString-objektet i Values-avsnittet i din local.settings.json och fyll i målserverns anslutningssträng:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ersätt hela funktionen (.cs fil i projektet) med följande kodfragment. Uppdatera namnområdet, klassnamnet och funktionsnamnet med ditt eget:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Uppdatera måltabellens namn i bindningsavsnittet:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Device Uppdatera avsnittet klass och mappning så att det matchar ditt eget schema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Du kan nu testa kabeldragningen mellan den lokala funktionen och databasen genom att felsöka (F5 i Visual Studio Code). SQL-databasen måste kunna nås från datorn. Du kan använda SSMS för att kontrollera anslutningen. Skicka sedan POST-begäranden till den lokala slutpunkten. En begäran med en tom brödtext ska returnera HTTP 204. En begäran med en faktisk nyttolast ska sparas i måltabellen (i läget ersätt/uppdatera). Här är ett exempel på nyttolast som motsvarar schemat som används i det här exemplet:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funktionen kan nu publiceras till Azure. Ange en programinställning för SqlConnectionString. Azure SQL Server-brandväggen bör tillåta Azure-tjänster åtkomst för att livefunktionen ska kunna nå den.
Du kan sedan definiera funktionen som utdata i ASA-jobbet och använda den för att ersätta poster i stället för att infoga dem.
Alternativ 2: Sammanfoga med sammansatt tilldelning (ackumuleras) via en anpassad SQL-fråga
Kommentar
Vid omstart och återställning kan ASA skicka utdatahändelser som redan har genererats. Det här beteendet kan orsaka att ackumuleringslogik misslyckas (dubblering av enskilda värden). För att förhindra det här problemet matar du ut samma data i en tabell med hjälp av de inbyggda ASA SQL-utdata. Du kan använda den här kontrolltabellen för att identifiera problem och synkronisera om ackumuleringen vid behov.
Det här alternativet använder Microsoft.Data.SqlClient. Med det här biblioteket kan du skicka sql-frågor till en SQL Database.
Det här exemplet byggdes på:
Skapa först en standardapp för HttpTrigger-funktion genom att följa den här självstudien. Följande information används:
- Språk:
C# - Körtid:
.NET 6(under funktion/körtid v4) - Mall:
HTTP trigger
Installera SqlClient-biblioteket genom att köra följande kommando i en terminal i projektmappen:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Lägg till SqlConnectionString objektet i Values avsnittet av din local.settings.json och fyll i anslutningssträngen för målservern:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Ersätt hela funktionen (.cs fil i projektet) med följande kodfragment. Uppdatera namnområdet, klassnamnet och funktionsnamnet efter ditt eget:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Uppdatera kommandobyggnadsavsnittet sqltext så att det matchar ditt eget schema (observera hur ackumulering uppnås via operatorn vid += uppdatering):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Nu kan du testa kabeldragningen mellan den lokala funktionen och databasen genom att felsöka (F5 i VS Code). SQL-databasen måste kunna nås från datorn. Du kan använda SSMS för att kontrollera anslutningen. Skicka sedan POST-begäranden till den lokala slutpunkten. En begäran med en tom brödtext ska returnera HTTP 204. En begäran med en faktisk nyttolast ska sparas i måltabellen (i ackumulerat/sammanslagningsläge). Här är ett exempel på nyttolast som motsvarar schemat som används i det här exemplet:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
Funktionen kan nu publiceras till Azure. En programinställning ska anges för SqlConnectionString. Azure SQL Server-brandväggen bör tillåta Azure-tjänster så att livefunktionen kan nå den.
Funktionen kan sedan definieras som utdata i ASA-jobbet och användas för att ersätta poster i stället för att infoga dem.
Alternativ
Förutom Azure Functions kan flera metoder uppnå det förväntade resultatet. I det här avsnittet beskrivs några av dessa metoder.
Efterbearbetning i sql-måldatabasen
En bakgrundsaktivitet fungerar när data infogas i databasen via standard-ASA SQL-utdata.
För Azure SQL använder du INSTEAD OFDML-utlösare för att fånga upp kommandona INSERT som ASA utfärdar.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
För Synapse SQL kan ASA infoga i en stagingtabell. En återkommande uppgift kan sedan omvandla data efter behov till en mellanliggande tabell. Slutligen flyttas data till produktionstabellen.
Förbearbetning i Azure Cosmos DB
Azure Cosmos DB stöder UPSERT internt. Här är det bara möjligt att lägga till eller ersätta. Du måste hantera ackumulering på klientsidan i Azure Cosmos DB.
Om kraven matchar kan du ersätta SQL-måldatabasen med en Azure Cosmos DB instans. Den här ändringen kräver en viktig ändring i den övergripande lösningsarkitekturen.
För Synapse SQL kan du använda Azure Cosmos DB som mellanliggande lager via Azure Synapse Link för Azure Cosmos DB. Använd Azure Synapse Link för att skapa ett analytical store. Du kan sedan köra frågor mot det här datalagret direkt i Synapse SQL.
Jämförelse av alternativen
Varje metod erbjuder olika värdeförslag och funktioner:
| Typ | Alternativ | Lägen | Azure SQL Database | Azure Synapse Analytics |
|---|---|---|---|---|
| Efterbearbetning | ||||
| Utlösare | Ersätt, ackumulera | + | Inte tillämpligt, utlösare är inte tillgängliga i Synapse SQL | |
| Mellanlagring | Ersätt, ackumulera | + | + | |
| Förbearbetning | ||||
| Azure Functions | Ersätt, ackumulera | + | – (prestanda rad för rad) | |
| Azure Cosmos DB-ersättning | Replace | Saknas | Saknas | |
| Azure Cosmos DB Azure Synapse Link | Replace | Saknas | + |
Få support
Om du vill ha mer hjälp kan du prova microsofts Q&A-frågesida för Azure Stream Analytics.
Nästa steg
- Förstå utdata från Azure Stream Analytics
- Azure Stream Analytics-utdata till Azure SQL-databas
- Öka dataflödesprestanda till Azure SQL Database från Azure Stream Analytics
- Använda hanterade identiteter för att komma åt Azure SQL Database eller Azure Synapse Analytics från ett Azure Stream Analytics-jobb
- Använda referensdata från en SQL Database för ett Azure Stream Analytics-jobb
- Köra Azure Functions i Azure Stream Analytics-jobb – Handledning för Redis-utdata
- Snabbstart: Skapa ett Stream Analytics-jobb med hjälp av Azure Portal