Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
Currently, Azure Stream Analytics (ASA) supports only inserting (appending) rows to SQL outputs (Azure SQL Databases and Azure Synapse Analytics). This article discusses workarounds to enable UPDATE, UPSERT, or MERGE on SQL databases, by using Azure Functions as the intermediary layer.
The alternative options to Azure Functions are presented at the end.
Requirement
You can write data to a table by using one of the following modes:
| Mode | Equivalent T-SQL statement | Requirements |
|---|---|---|
| Append | INSERT | None |
| Replace | MERGE (UPSERT) | Unique key |
| Accumulate | MERGE (UPSERT) with compound assignment operator (+=, -=...) |
Unique key and accumulator |
To illustrate the differences, consider what happens when ingesting the following two records:
| Arrival_Time | Device_Id | Measure_Value |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
In the append mode, you insert two records. The equivalent T-SQL statement is:
INSERT INTO [target] VALUES (...);
Resulting in:
| Modified_Time | Device_Id | Measure_Value |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
In replace mode, you get only the last value by key. Here you use Device_Id as the key. The equivalent T-SQL statement is:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resulting in:
| Modified_Time | Device_Key | Measure_Value |
|---|---|---|
| 10:05 | A | 20 |
Finally, in accumulate mode you sum Value with a compound assignment operator (+=). Here also you use Device_Id as the key:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resulting in:
| Modified_Time | Device_Key | Measure_Value |
|---|---|---|
| 10:05 | A | 21 |
For performance considerations, the ASA SQL database output adapters currently only support append mode natively. These adapters use bulk insert to maximize throughput and limit back pressure.
This article shows how to use Azure Functions to implement Replace and Accumulate modes for ASA. When you use a function as an intermediary layer, the potential write performance doesn't affect the streaming job. In this regard, using Azure Functions works best with Azure SQL. With Synapse SQL, switching from bulk to row-by-row statements might create greater performance issues.
Azure Functions output
In this job, you replace the ASA SQL output with the ASA Azure Functions output. The function implements the UPDATE, UPSERT, or MERGE capabilities.
Currently, you can access a SQL Database in a function by using two options. The first option is the Azure SQL output binding. It's currently limited to C#, and it only offers replace mode. The second option is to compose a SQL query to submit via the appropriate SQL driver (Microsoft.Data.SqlClient for .NET).
Both of the following samples assume the following table schema. The binding option requires a primary key to be set on the target table. It's not necessary, but recommended, when using a SQL driver.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
To use a function as an output from ASA, the function must meet the following expectations:
- Azure Stream Analytics expects HTTP status 200 from the Functions app for batches that it processes successfully.
- When Azure Stream Analytics receives a 413 ("http Request Entity Too Large") exception from an Azure function, it reduces the size of the batches that it sends to Azure Function.
- During test connection, Stream Analytics sends a POST request with an empty batch to Azure Functions and expects HTTP status 20x back to validate the test.
Option 1: Update by key with the Azure Function SQL Binding
This option uses the Azure Function SQL Output Binding. This extension can replace an object in a table without you having to write a SQL statement. At this time, it doesn't support compound assignment operators (accumulations).
This sample was built on:
- Azure Functions runtime version 4
- .NET 6.0
- Microsoft.Azure.WebJobs.Extensions.Sql 0.1.131-preview
To better understand the binding approach, follow this tutorial.
First, create a default HttpTrigger function app by following this tutorial. Use the following information:
- Language:
C# - Runtime:
.NET 6(under function/runtime v4) - Template:
HTTP trigger
Install the binding extension by running the following command in a terminal located in the project folder:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Add the SqlConnectionString item in the Values section of your local.settings.json, filling in the connection string of the destination server:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Replace the entire function (.cs file in the project) by the following code snippet. Update the namespace, class name, and function name with your own:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Update the destination table name in the binding section:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Update the Device class and mapping section to match your own schema:
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
You can now test the wiring between the local function and the database by debugging (F5 in Visual Studio Code). The SQL database needs to be reachable from your machine. You can use SSMS to check connectivity. Then, send POST requests to the local endpoint. A request with an empty body should return HTTP 204. A request with an actual payload should be persisted in the destination table (in replace / update mode). Here's a sample payload corresponding to the schema used in this sample:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
The function can now be published to Azure. Set an application setting for SqlConnectionString. The Azure SQL Server firewall should allow Azure services in for the live function to reach it.
You can then define the function as an output in the ASA job, and use it to replace records instead of inserting them.
Option 2: Merge with compound assignment (accumulate) via a custom SQL query
Note
Upon restart and recovery, ASA might resend output events that it already emitted. This behavior can cause the accumulation logic to fail (doubling individual values). To prevent this problem, output the same data in a table by using the native ASA SQL Output. You can use this control table to detect problems and resynchronize the accumulation when necessary.
This option uses Microsoft.Data.SqlClient. This library lets you send any SQL queries to a SQL Database.
This sample was built on:
First, create a default HttpTrigger function app by following this tutorial. The following information is used:
- Language:
C# - Runtime:
.NET 6(under function/runtime v4) - Template:
HTTP trigger
Install the SqlClient library by running the following command in a terminal located in the project folder:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Add the SqlConnectionString item in the Values section of your local.settings.json, filling in the connection string of the destination server:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Replace the entire function (.cs file in the project) by the following code snippet. Update the namespace, class name, and function name by your own:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Update the sqltext command building section to match your own schema (notice how accumulation is achieved via the += operator on update):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
You can now test the wiring between the local function and the database by debugging (F5 in VS Code). The SQL database needs to be reachable from your machine. You can use SSMS to check connectivity. Then, send POST requests to the local endpoint. A request with an empty body should return HTTP 204. A request with an actual payload should be persisted in the destination table (in accumulate / merge mode). Here's a sample payload corresponding to the schema used in this sample:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
The function can now be published to Azure. An application setting should be set for SqlConnectionString. The Azure SQL Server firewall should allow Azure services in for the live function to reach it.
The function can then be defined as an output in the ASA job, and used to replace records instead of inserting them.
Alternatives
Outside of Azure Functions, multiple methods can achieve the expected result. This section describes some of these methods.
Post-processing in the target SQL Database
A background task operates once the data is inserted in the database via the standard ASA SQL outputs.
For Azure SQL, use INSTEAD OF DML triggers to intercept the INSERT commands that ASA issues.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
For Synapse SQL, ASA can insert into a staging table. A recurring task can then transform the data as needed into an intermediary table. Finally, the data is moved to the production table.
Preprocessing in Azure Cosmos DB
Azure Cosmos DB supports UPSERT natively. Here, only append or replace is possible. You must manage accumulations client-side in Azure Cosmos DB.
If the requirements match, you can replace the target SQL database with an Azure Cosmos DB instance. This change requires an important change in the overall solution architecture.
For Synapse SQL, you can use Azure Cosmos DB as an intermediary layer via Azure Synapse Link for Azure Cosmos DB. Use Azure Synapse Link to create an analytical store. You can then query this data store directly in Synapse SQL.
Comparison of the alternatives
Each approach offers different value propositions and capabilities:
| Type | Option | Modes | Azure SQL Database | Azure Synapse Analytics |
|---|---|---|---|---|
| Post-Processing | ||||
| Triggers | Replace, Accumulate | + | N/A, triggers aren't available in Synapse SQL | |
| Staging | Replace, Accumulate | + | + | |
| Pre-Processing | ||||
| Azure Functions | Replace, Accumulate | + | - (row-by-row performance) | |
| Azure Cosmos DB replacement | Replace | N/A | N/A | |
| Azure Cosmos DB Azure Synapse Link | Replace | N/A | + |
Get support
For further assistance, try the Microsoft Q&A question page for Azure Stream Analytics.
Next steps
- Understand outputs from Azure Stream Analytics
- Azure Stream Analytics output to Azure SQL Database
- Increase throughput performance to Azure SQL Database from Azure Stream Analytics
- Use managed identities to access Azure SQL Database or Azure Synapse Analytics from an Azure Stream Analytics job
- Use reference data from a SQL Database for an Azure Stream Analytics job
- Run Azure Functions in Azure Stream Analytics jobs - Tutorial for Redis output
- Quickstart: Create a Stream Analytics job by using the Azure portal