Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Actuellement, Azure Stream Analytics (ASA) prend uniquement en charge l’insertion (ajout) de lignes dans des sorties SQL (Azure SQL Databases et Azure Synapse Analytics). Cet article décrit les solutions de contournement permettant d’activer UPDATE, UPSERT ou MERGE sur des bases de données SQL, en utilisant Azure Functions comme couche intermédiaire.
Les options alternatives à Azure Functions sont présentées à la fin.
Condition requise
Vous pouvez écrire des données dans une table à l’aide de l’un des modes suivants :
| Mode | Instruction T-SQL équivalente | Spécifications |
|---|---|---|
| Ajouter | INSERT | Aucun |
| Remplacer | MERGE (UPSERT) | Clé unique |
| Accumuler | MERGE (UPSERT) avec opérateur d’assignation composé (+=, -=…) |
Clé unique et accumulateur |
Pour illustrer les différences, tenez compte de ce qui se passe lors de l’ingestion des deux enregistrements suivants :
| Heure_d'Arrivée | Id_appareil | Valeur_Mesure |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
Dans le mode d’ajout , vous insérez deux enregistrements. L’instruction T-SQL équivalente est la suivante :
INSERT INTO [target] VALUES (...);
Ce qui donne :
| Heure_Modifiée | Device_Id | Mesure_Valeur |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
En mode remplacement , vous obtenez uniquement la dernière valeur par clé. Ici, vous utilisez Device_Id comme clé. L’instruction T-SQL équivalente est la suivante :
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ce qui donne :
| Heure_Modifiée | Device_Key | Measure_Value |
|---|---|---|
| 10:05 | A | 20 |
Enfin, en mode accumulation , vous sommez Value avec un opérateur d’affectation composée (+=). Ici également, vous utilisez Device_Id comme clé :
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Ce qui donne :
| Heure_Modifiée | Device_Key | Measure_Value |
|---|---|---|
| 10:05 | A | 21 |
Pour des raisons de performances, les adaptateurs de sortie de la base de données SQL de ASA ne prennent actuellement en charge que le mode d’ajout en natif. Ces adaptateurs utilisent l’insertion en bloc pour maximiser le débit et réduire la contre-pression.
Cet article explique comment utiliser les fonctions Azure pour implémenter les modes Remplacer et Accumuler dans Azure Stream Analytics. Lorsque vous utilisez une fonction en tant que couche intermédiaire, les performances d’écriture potentielles n’affectent pas la tâche de diffusion en continu. À cet égard, l’utilisation d’Azure Functions fonctionne mieux avec Azure SQL. Avec Synapse SQL, passer d’une instruction en bloc à une instruction ligne par ligne peut créer des problèmes de performance plus importants.
la sortie Azure Functions
Dans ce travail, vous remplacez la sortie SQL ASA par la sortie ASA Azure Functions. La fonction implémente les fonctionnalités UPDATE, UPSERT ou MERGE.
Actuellement, vous pouvez accéder à une base de données SQL dans une fonction à l’aide de deux options. La première option est la liaison de sortie Azure SQL. Il est actuellement limité à C#, et il offre uniquement le mode de remplacement. La deuxième option consiste à composer une requête SQL à envoyer via le pilote SQL approprié (Microsoft. Data.SqlClient pour .NET).
Les deux exemples suivants supposent le schéma de tableau suivant. L’option de liaison exige qu’une clé primaire soit définie sur la table cible. Ce n’est pas obligatoire, mais recommandé, lorsque vous utilisez un pilote SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Pour utiliser une fonction comme sortie d’ASA, la fonction doit répondre aux attentes suivantes :
- Azure Stream Analytics attend l’état HTTP 200 de l’application Functions pour les lots qu’il traite avec succès.
- Lorsque Azure Stream Analytics reçoit une exception 413 (« entité de requête http trop grande ») d’une fonction Azure, elle réduit la taille des lots qu’il envoie à Azure Function.
- Lors du test de connexion, Stream Analytics envoie une requête POST avec un lot vide à Azure Functions et attend un statut HTTP 20x en retour pour valider le test.
Option 1 : Mettre à jour par clé avec la liaison SQL pour Azure Functions
Cette option utilise la liaison de sortie SQL pour les fonctions Azure. Cette extension peut remplacer un objet dans une table sans avoir à écrire une instruction SQL. Pour l’instant, il ne prend pas en charge les opérateurs d'affectation composés (accumulations).
Cet exemple a été construit sur :
- Runtime des Fonctions Azure version 4
- .NET 6.0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
Pour mieux comprendre l’approche de liaison, suivez ce didacticiel.
Tout d’abord, créez une application de fonction HttpTrigger par défaut en suivant ce tutoriel. Utilisez les informations suivantes :
- Langage :
C# - Runtime :
.NET 6(sous fonction/runtime v4) - Modèle :
HTTP trigger
Installez l’extension de liaison en exécutant la commande suivante dans un terminal situé dans le dossier du projet :
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Ajoutez l’élément SqlConnectionString dans la section Values de votre fichier local.settings.json, en renseignant la chaîne de connexion du serveur de destination :
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Remplacez l’ensemble de la fonction (fichier .cs dans le projet) par l’extrait de code suivant. Mettez à jour l’espace de noms, le nom de la classe et le nom de la fonction avec vos propres :
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Mettez à jour le nom de la table de destination dans la section de liaison des données :
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Mettez à jour la classe Device et la section de mappage pour qu’elles correspondent à votre propre schéma :
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Vous pouvez maintenant tester la liaison entre la fonction locale et la base de données en déboguant (F5 dans Visual Studio Code). La base de données SQL doit être accessible depuis votre machine. Vous pouvez utiliser SSMS pour vérifier la connectivité. Ensuite, envoyez des requêtes POST au point de terminaison local. Une requête avec un corps vide doit retourner HTTP 204. Une requête avec une charge utile réelle doit être rendue persistante dans la table de destination (en mode de remplacement/mise à jour). Voici un exemple de charge utile correspondant au schéma utilisé dans cet exemple :
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La fonction peut désormais être publiée sur Azure. Définissez un paramètre d’application pour SqlConnectionString. Le pare-feu du serveur SQL Azure doit autoriser les services Azure à entrer pour que la fonction en direct puisse l’atteindre.
Vous pouvez ensuite définir la fonction en tant que sortie dans le travail ASA et l’utiliser pour remplacer les enregistrements au lieu de les insérer.
Option 2 : Fusionner avec un opérateur d’assignation composé (accumuler) à l’aide d’une requête SQL personnalisée
Note
Lors du redémarrage et de la récupération, ASA peut renvoyer des événements de sortie déjà émis. Ce comportement peut entraîner l’échec de la logique d’accumulation (doublement des valeurs individuelles). Pour éviter ce problème, affichez les mêmes données dans une table à l’aide de la sortie SQL ASA native. Vous pouvez utiliser cette table de contrôle pour détecter les problèmes et resynchroniser l’accumulation si nécessaire.
Cette option utilise Microsoft.Data.SqlClient. Cette bibliothèque vous permet d’envoyer toutes les requêtes SQL à une base de données SQL.
Cet exemple a été construit sur :
Tout d’abord, créez une application de fonction HttpTrigger par défaut en suivant ce tutoriel. Les informations suivantes sont utilisées :
- Langage :
C# - Runtime :
.NET 6(sous fonction/runtime v4) - Modèle :
HTTP trigger
Installez la bibliothèque SqlClient en exécutant la commande suivante dans un terminal situé dans le dossier du projet :
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Ajoutez l’élément SqlConnectionString dans la section Values de votre fichier local.settings.json, en renseignant la chaîne de connexion du serveur de destination :
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Remplacez l’ensemble de la fonction (fichier .cs dans le projet) par l’extrait de code suivant. Mettez à jour l’espace de noms, le nom de la classe et le nom de la fonction par les vôtres :
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Mettez à jour la section de liaison de la commande sqltext pour qu’elle corresponde à votre propre schéma (notez la manière dont l’accumulation est obtenue via l’opérateur += lors de la mise à jour) :
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Vous pouvez maintenant tester la liaison entre la fonction locale et la base de données en déboguant (F5 dans VS Code). La base de données SQL doit être accessible depuis votre machine. Vous pouvez utiliser SSMS pour vérifier la connectivité. Ensuite, envoyez des requêtes POST au point de terminaison local. Une requête avec un corps vide doit retourner HTTP 204. Une requête avec une charge utile réelle doit être rendue persistante dans la table de destination (en mode d’accumulation/de fusion). Voici un exemple de charge utile correspondant au schéma utilisé dans cet exemple :
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
La fonction peut désormais être publiée sur Azure. Un paramètre d’application doit être défini pour SqlConnectionString. Le pare-feu du serveur SQL Azure doit autoriser les services Azure à entrer pour que la fonction en direct puisse l’atteindre.
La fonction peut ensuite être définie comme sortie dans la tâche Azure Stream Analytics et utilisée pour remplacer les enregistrements plutôt qu'en les insérant.
Autres solutions
En dehors de Azure Functions, plusieurs méthodes peuvent obtenir le résultat attendu. Cette section décrit certaines de ces méthodes.
Post-traitement dans la base de données SQL cible
Une tâche en arrière-plan s'exécute une fois que les données sont insérées dans la base de données via les sorties SQL standard d'ASA.
Pour Azure SQL, utilisez INSTEAD OFdéclencheurs DML pour intercepter les commandes que ASA émet.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Pour Synapse SQL, Azure Stream Analytics peut insérer une table de mise en lots. Une tâche récurrente peut ensuite transformer les données selon les besoins dans une table intermédiaire. Enfin, les données sont déplacées vers la table de production.
Prétraitement dans Azure Cosmos DB
Azure Cosmos DB prend en charge UPSERT en mode natif. Ici, seul l’ajout ou le remplacement est possible. Vous devez gérer les accumulations côté client dans Azure Cosmos DB.
Si la configuration requise correspond, vous pouvez remplacer la base de données SQL cible par une instance Azure Cosmos DB. Cette modification nécessite un changement important dans l’architecture globale de la solution.
Pour Synapse SQL, vous pouvez utiliser Azure Cosmos DB en tant que couche intermédiaire via Azure Synapse Link pour Azure Cosmos DB. Utilisez Azure Synapse Link pour créer un magasin analytique. Vous pouvez ensuite interroger ce magasin de données directement dans Synapse SQL.
Comparaison des alternatives
Chaque approche offre différentes propositions de valeur et fonctionnalités :
| Type | Option | Modes | Azure SQL Database | Azure Synapse Analytics |
|---|---|---|---|---|
| Post-traitement | ||||
| Déclencheurs | Remplacer, Accumuler | + | Non applicable, les déclencheurs ne sont pas disponibles dans Synapse SQL | |
| Staging | Remplacer, Accumuler | + | + | |
| Pré-traitement | ||||
| Azure Functions | Remplacer, Accumuler | + | - (performances ligne par ligne) | |
| Remplacement de Azure Cosmos DB | Remplacer | N/A | N/A | |
| Azure Cosmos DB Azure Synapse Link | Remplacer | N/A | + |
Obtenir de l’aide
Pour obtenir de l’aide supplémentaire, essayez la page de questions microsoft Q&A pour Azure Stream Analytics.
Étapes suivantes
- Comprendre les sorties d’Azure Stream Analytics
- Sortie Azure Stream Analytics dans Azure SQL Database
- Augmenter les performances de débit pour Azure SQL Database à partir d’Azure Stream Analytics
- Utiliser les identités managées pour accéder à Azure SQL Database ou Azure Synapse Analytics à partir d’une tâche Azure Stream Analytics
- Utiliser les données de référence d’une base de données SQL pour une tâche Azure Stream Analytics
- Exécuter Azure Functions dans des travaux Azure Stream Analytics – Tutoriel pour la sortie Redis
- Démarrage rapide : Créer un travail Stream Analytics à l’aide du portail Azure