Utiliser la parallélisation de requête dans Azure Stream Analytics

Cet article explique comment tirer parti de la parallélisation dans Azure Stream Analytics. Vous découvrez comment mettre à l’échelle des travaux Stream Analytics en configurant des partitions d’entrée et en réglant la définition de requête Analytics.

Comme prérequis, vous pouvez vous familiariser avec la notion d’unité de streaming décrite dans Comprendre et ajuster les unités de streaming.

Quelles sont les parties d’une tâche Stream Analytics ?

La définition d’un travail Stream Analytics inclut au moins une entrée de streaming, une requête et une sortie. Les entrées sont là où la tâche lit le flux de données. La requête permet de transformer le flux d’entrée de données, et la sortie correspond à l’emplacement où le travail envoie ses résultats.

Partitions dans les entrées et sorties

Le partitionnement vous permet de répartir les données en sous-ensembles basés sur une clé de partition. Si votre entrée (par exemple Event Hubs) est partitionnée par une clé, nous vous recommandons de spécifier cette clé de partition lors de l’ajout d’une entrée à votre travail Stream Analytics. La mise à l’échelle d’un travail Stream Analytics tire parti des partitions dans l’entrée et la sortie. Une tâche Stream Analytics peut traiter et écrire différentes partitions en parallèle, ce qui permet d'augmenter le débit.

Entrées

Toutes les entrées de flux d'Azure Stream Analytics peuvent tirer parti du partitionnement : Event Hubs, IoT Hub, Stockage d’objets Blob, Data Lake Storage Gen2.

Remarque

Pour le niveau de compatibilité 1.2 et versions ultérieures, définissez la clé de partition comme propriété d’entrée, sans avoir besoin du mot clé PARTITION BY dans la requête. Pour le niveau de compatibilité 1.1 et ci-dessous, définissez la clé de partition avec le mot clé PARTITION BY dans la requête.

Sorties

Lorsque vous utilisez Stream Analytics, tirez parti du partitionnement dans les sorties suivantes :

  • Azure Data Lake Storage
  • Azure Functions
  • table Azure
  • Stockage Blob (définissez explicitement la clé de partition)
  • Azure Cosmos DB (définissez explicitement la clé de partition)
  • Event Hubs (définissez explicitement la clé de partition)
  • IoT Hub (définissez explicitement la clé de partition)
  • Service Bus
  • SQL et Azure Synapse Analytics avec partitionnement facultatif : consultez plus d’informations sur la page Sortie dans Azure SQL Database.

Power BI ne prend pas en charge le partitionnement. Toutefois, vous pouvez toujours partitionner l’entrée comme décrit dans cette section.

Pour plus d’informations sur les partitions, consultez les articles suivants :

Requête

Pour qu’un travail soit parallèle, les clés de partition doivent être alignées entre toutes les entrées, toutes les étapes de la logique de requête et toutes les sorties. Le partitionnement de la logique de requête est déterminé par les clés utilisées pour les jointures et les agrégations (GROUP BY). La dernière exigence peut être ignorée si la logique de la requête n’est pas basée sur des clés (projection, filtres, jointures référentielles...).

  • Si une entrée et une sortie sont partitionnées par WarehouseId, et que la requête groupe par ProductId sans WarehouseId, le travail n'est pas parallèle.
  • Si deux entrées à joindre sont partitionnés par des clés de partition différentes (WarehouseId et ProductId), le travail n’est pas parallèle.
  • Si un seul travail contient deux flux de données indépendants ou plus, chacun avec sa propre clé de partition, le travail n’est pas parallèle.

Le travail est parallèle uniquement lorsque toutes les entrées, sorties et étapes de requête utilisent la même clé.

Travaux massivement parallèles

Un travail massivement parallèle est le scénario le plus évolutif d’Azure Stream Analytics. Elle permet de connecter une partition de l’entrée à une instance de la requête, puis de connecter celle-ci à une partition de la sortie. Ce parallélisme comporte les exigences suivantes :

  • Si votre logique de requête dépend de la clé qui est actuellement traitée par la même instance de requête, vous devez vous assurer que les événements atteignent la même partition de votre entrée. Pour Event Hubs ou IoT Hub, cela signifie que vous devez définir la valeur de PartitionKey pour les données d’événement. Par ailleurs, vous pouvez utiliser des expéditeurs partitionnés. Pour le stockage d'objets blob, cela signifie que les événements sont envoyés dans le même dossier de partition. Il peut s’agir, par exemple, d’une instance de requête qui agrège les données par userID, le hub d’événements d’entrée étant partitionné à l’aide de l’identificateur userID comme clé de partition. Toutefois, si votre logique de requête ne demande pas la même clé pour être traitée par la même instance de requête, vous pouvez ignorer cette exigence. Un exemple de cette logique serait une requête simple du type select/project/filter.

  • Ensuite, faites en sorte que votre requête soit partitionnée. Pour les travaux avec le niveau de compatibilité 1.2 ou supérieur (recommandé), spécifiez une colonne personnalisée comme clé de partition dans les paramètres d’entrée et le travail est automatiquement parallèle. Pour les travaux avec le niveau de compatibilité 1.0 ou 1.1, utilisez PARTITION BY PartitionId dans toutes les étapes de votre requête. Vous pouvez avoir plusieurs étapes, mais elles doivent toutes être partitionnée par la même clé.

  • La plupart des sorties prises en charge dans Stream Analytics peuvent tirer parti du partitionnement. Si vous utilisez un type de sortie qui ne prend pas en charge le partitionnement, votre travail n’est pas simplement parallélisable. Pour les sorties Event Hubs, vérifiez que la colonne de clé de partition est définie sur la même clé de partition que celle utilisée dans la requête. Pour plus d’informations, consultez la section des sorties.

  • Le nombre de partitions d’entrée doit être égal à celui des partitions de sortie. La sortie du Stockage Blob peut prendre en charge les partitions et hériter du schéma de partitionnement de la requête en amont. Lorsque vous spécifiez une clé de partition pour le stockage d’objets blob, les données sont partitionnées par partition d'entrée, le résultat est toujours entièrement parallèle. Voici des exemples de valeurs de partition qui permettent la création d’un travail entièrement parallèle :

    • Huit partitions d’entrée de concentrateur Event Hub et huit partitions de sortie de concentrateur Event Hub
    • Huit partitions d’entrée de concentrateur Event Hub et une sortie de stockage d’objets blob
    • Huit partitions d’entrée Event Hub et une sortie de Stockage Blob partitionnée par un champ personnalisé avec cardinalité arbitraire
    • Huit partitions d’entrée de stockage d’objets blob et une sortie de stockage d’objets blob
    • Huit partitions d’entrée de stockage blob et huit partitions de sortie Event Hub

Les sections suivantes discutent de quelques exemples de scénarios qui sont d'un parallélisme embarrassant.

Requête simple

  • Entrée : un hub d’événements avec huit partitions
  • Sortie : un Event Hub à huit partitions (« Colonne clé de partition » doit être configurée pour utiliser « PartitionId »)

Requête :

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Cette requête est un filtre simple. Par conséquent, vous n’avez pas besoin de vous soucier du partitionnement de l’entrée envoyée au hub d’événements. Notez que les travaux dont le niveau de compatibilité est inférieur à 1.2 doivent inclure la clause PARTITION BY PartitionId afin de répondre à l’exigence n°2 précitée. Pour la sortie, vous devez configurer la sortie du hub d’événements dans le travail pour que la clé de partition soit définie sur PartitionId. La dernière vérification consiste à s’assurer que le nombre de partitions d’entrée est égal au nombre de partitions de sortie.

Requête avec clé de regroupement

  • Entrée : Event Hub avec huit partitions
  • Sortie : Stockage d'objets blob

Requête :

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Cette requête comporte une clé de regroupement. Par conséquent, les événements regroupés doivent être envoyés à la même partition Event Hubs. Dans cet exemple, comme vous regroupez par TollBoothID, assurez-vous que TollBoothID est utilisé comme clé de partition lorsque les événements sont envoyés à Event Hubs. Vous pouvez ensuite utiliser dans Azure Stream Analytics PARTITION BY PartitionId pour hériter de ce schéma de partition et activer la parallélisation complète. Puisque la sortie est le stockage blob, vous n’avez pas besoin de vous soucier de la configuration d’une valeur de clé de partition, comme décrit dans la condition n°4.

Exemples de scénarios qui ne sont pas* en parallélisme embarrassant

Dans la section précédente, l’article a abordé certains scénarios de parallélisme embarrassant. Dans cette section, vous découvrirez des scénarios qui ne remplissent pas toutes les conditions pour être en parallélisme embarrassant.

Nombre de partitions non concordant

  • Entrée : un hub d’événements avec huit partitions
  • Sortie : un Event Hub avec 32 partitions

Si le nombre de partitions en entrée ne correspond pas au nombre de partitions en sortie, la topologie n’est pas trivialement parallèle, quelle que soit la requête. Toutefois, vous pouvez toujours obtenir un certain niveau de parallélisation.

Requête avec une sortie non partitionnée

  • Entrée : un hub d’événements avec huit partitions
  • Sortie : Power BI

Pour le moment, la sortie Power BI ne prend pas en charge le partitionnement. Par conséquent, ce scénario n'est pas trivialement parallèle.

Requête à plusieurs étapes avec différentes valeurs PARTITION BY

  • Entrée : Event Hub avec huit partitions
  • Sortie : Event Hub avec huit partitions
  • Niveau de compatibilité : 1.0 ou 1.1

Requête :

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Comme vous pouvez le voir, la deuxième étape utilise TollBoothId comme clé de partitionnement. Cette étape n’est pas la même que la première étape et nécessite donc un réarrangement.

Requête à plusieurs étapes avec différentes valeurs PARTITION BY

  • Entrée : Event Hub avec huit partitions (« Colonne de clé de partition » non définie, par défaut « PartitionId »)
  • Sortie : le hub d’événements à huit partitions (« Colonne de clé de partition » doit être défini de façon à utiliser « TollBoothId »)
  • Niveau de compatibilité : 1.2 ou version ultérieure

Requête :

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

le niveau de compatibilité 1.2 ou supérieur permet l’exécution de requête en parallèle par défaut. Par exemple, la requête de la section précédente est partitionnée tant que la colonne « TollBoothId » est définie comme clé de partition d’entrée. La clause PARTITION BY PartitionId n’est pas obligatoire.

Calcul du nombre maximum d'unités de diffusion en continu pour une tâche

Le nombre total d’unités de streaming qu’un travail Stream Analytics peut utiliser dépend du nombre d’étapes de la requête définie pour le travail et du nombre de partitions pour chaque étape.

Étapes dans une requête

Une requête peut avoir une ou plusieurs étapes. Chaque étape est une sous-requête définie par le mot-clé WITH. La requête qui se trouve en dehors du mot-clé WITH (une seule requête) est également comptabilisée comme une étape, par exemple, l’instruction SELECT dans la requête suivante :

Requête :

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Cette requête compte deux étapes.

Remarque

Cette requête est approfondie plus loin dans cet article.

Partitionnement d'une étape

Les conditions suivantes doivent être respectées pour procéder au partitionnement d'une étape :

  • La source d'entrée doit être partitionnée.
  • L’instruction SELECT de la requête doit lire à partir d’une source d’entrée partitionnée.
  • La requête de l’étape doit contenir le mot clé PARTITION BY.

Lorsqu’une requête est partitionnée, les événements d’entrée sont traités et agrégés dans des groupes de partition distincts, et les événements de sortie sont générés pour chacun des groupes. Si vous souhaitez un agrégat combiné, vous devez créer une deuxième étape non partitionnée pour l'agrégation.

Calcul des unités maximales de diffusion en continu pour une tâche

Toutes les étapes non partitionnées se développent jusqu'à une unité de flux (SU V2s) pour un job Stream Analytics. En outre, vous pouvez ajouter une SU V2 pour chaque partition dans une étape partitionnée. Vous pouvez voir quelques exemples dans le tableau suivant.

Requête Nombre maximal d'unités SU pour la tâche
  • La requête contient une étape.
  • L’étape n’est pas partitionnée.
1 SU V2
  • Le flux de données d’entrée est partitionné par 16.
  • La requête contient une étape.
  • L'étape est partitionnée.
16 SU V2 (1 * 16 partitions)
  • La requête contient 2 étapes.
  • Aucune des étapes n'est partitionnée.
1 SU V2
  • Le flux de données d'entrée est partitionné par 3.
  • La requête contient 2 étapes. L'étape d'entrée est partitionnée et la deuxième étape ne l'est pas.
  • L’instruction SELECT lit dans l’entrée partitionnée.
4 SU V2s (3 pour les étapes partitionnée + 1 pour les étapes nonpartitionées)

Exemples de mise à l’échelle

La requête suivante calcule le nombre de voitures, dans une fenêtre de trois minutes, qui traversent un poste de péage pourvu de trois cabines de péage. Vous pouvez mettre à l’échelle cette requête jusqu’à une unité de calcul SU V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Pour utiliser davantage de SUs pour la requête, partitionnez à la fois le flux de données d’entrée et la requête. Comme la partition de flux de données est définie sur 3, la requête modifiée suivante peut être mise à l’échelle jusqu’à compter 3 SU V2 :

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Lorsque vous partitionnez une requête, les événements d’entrée sont traités et agrégés dans des groupes de partitions distincts. La requête génère des événements de sortie pour chacun des groupes. Le partitionnement peut provoquer des résultats inattendus si le champ GROUP BY n’est pas la clé de partition dans le flux de données d’entrée. Par exemple, dans la requête précédente, le champ TollBoothId n’est pas la clé de partition d’Input1. Le résultat est que les données de TollBooth #1 peuvent être réparties sur plusieurs partitions.

Stream Analytics traite chacune des partitions Input1 séparément. Par conséquent, la requête crée plusieurs enregistrements du nombre de voitures pour la même cabine de péage dans la même fenêtre de basculement. Si vous ne pouvez pas modifier la clé de partition d’entrée, corrigez ce problème en ajoutant une étape de nonpartition pour agréger des valeurs entre des partitions, comme dans l’exemple suivant :

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Vous pouvez mettre à l’échelle cette requête vers 4 SU V2.

Remarque

Si vous joignez deux flux, assurez-vous que les flux sont partitionnés par la clé de partition de la colonne que vous utilisez pour créer les jointures. Assurez-vous également d’avoir le même nombre de partitions dans les deux flux de données.

Obtention de débits supérieurs à grande échelle

Un travail massivement en parallèle est nécessaire mais insuffisant pour maintenir un débit plus élevé à grande échelle. Chaque système de stockage et sa sortie Stream Analytics correspondante varient quant à la manière d’obtenir le meilleur débit possible en écriture. Comme pour n’importe quel scénario à grande échelle, certains défis nécessitent les bonnes configurations à résoudre. Cette section aborde les configurations pour quelques sorties courantes et fournit des exemples pour maintenir des taux d’ingestion de 1 K, 5 K et 10 K événements par seconde.

Les observations suivantes utilisent un travail Stream Analytics avec une requête sans état (passthrough), une fonction définie par l’utilisateur (UDF) JavaScript basique qui écrit dans Event Hubs, Azure SQL ou Azure Cosmos DB.

Event Hubs

Taux d’ingestion (événements par seconde) Unités de diffusion en continu Ressources de sortie
1 K 1/3 2 TU
5 K 1 6 TU
10 k 2 10 TU

La solution Event Hubs s'adapte de manière linéaire en termes d'unités de streaming et de débit, ce qui en fait le moyen le plus efficace et performant pour analyser et diffuser des données depuis Stream Analytics. Vous pouvez mettre à l’échelle des travaux jusqu’à 66 SU V2s, ce qui se traduit littéralement par le traitement jusqu’à 400 Mo/s, soit 38 milliards d’événements par jour.

Azure SQL

Taux d’ingestion (événements par seconde) Unités de diffusion en continu Ressources de sortie
1 K 2/3 S3
5 K 3 P4
10 k 6 P6

SQL Azure prend en charge l’écriture en parallèle, appelée Inherit Partitioning, qui n’est pas activée par défaut. Toutefois, l’activation de la fonctionnalité Inherit Partitioning avec une requête entièrement parallèle peut ne pas suffire pour atteindre des débits supérieurs. Les débits en écriture SQL dépendent considérablement de la configuration et du schéma de table de votre base de données. L’article Performances en sortie SQL contient des informations plus détaillées sur les paramètres susceptibles d’optimiser votre débit en écriture. Comme indiqué dans l’article Sortie d’Azure Stream Analytics dans Azure SQL Database, cette solution n’est pas mise à l’échelle de manière linéaire en tant que pipeline entièrement parallèle au-delà de 8 partitions, et peut nécessiter un repartitionnement avant la sortie SQL (voir INTO). Des références (SKU) Premium sont nécessaires pour prendre en charge des taux d’E/S élevés, ainsi que la surcharge liée aux sauvegardes de fichiers journaux toutes les quelques minutes.

Azure Cosmos DB, une base de données distribuée globale

Taux d’ingestion (événements par seconde) Unités de diffusion en continu Ressources de sortie
1 K 2/3 20 000 RU
5 K 4 60 000 RU
10 k 8 120 000 RU

La sortie de Azure Cosmos DB depuis Stream Analytics est mise à jour pour utiliser l'intégration native sous le niveau de compatibilité 1.2. Le niveau de compatibilité 1.2 permet un débit sensiblement supérieur et réduit la consommation de RU par rapport au niveau 1.1, le niveau de compatibilité par défaut pour les nouvelles tâches. La solution utilise des conteneurs Azure Cosmos DB partitionnés sur /deviceId et le reste de la solution est configuré de manière identique.

Tous les exemples Azure de diffusion en continu à grande échelle utilisent des Event Hubs alimentés par des clients de test simulant une charge. Chaque événement en entrée est un document JSON de 1 ko, qui traduit facilement les taux d’ingestion configurés en débits (1 Mo/s, 5 Mo/s et 10 Mo/s). Les événements simulent un appareil IoT envoyant les données JSON suivantes (sous une forme abrégée) pour un maximum de 1000 appareils :

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Remarque

Les configurations sont sujettes à modification en raison des divers composants utilisés dans la solution. Pour une estimation plus précise, personnalisez les échantillons en fonction de votre scénario.

Identification des goulots d’étranglement

Utilisez le volet Métriques de votre travail Azure Stream Analytics pour identifier les goulots d’étranglement de votre pipeline. Examinez les événements d’entrée/sortie pour le débit, ainsi que le « Délai en filigrane » ou les Événements en backlog, pour voir si le travail suit la vitesse d’entrée. Pour les métriques d'Event Hubs, recherchez les Demandes limitées et ajustez les unités de seuil en conséquence. Pour les métriques de Azure Cosmos DB, examinez la valeur Nombre maximal de RU/s consommées par groupe de clés de partition sous Débit pour vous assurer que les groupes de clés de partition sont consommés de manière uniforme. Pour Azure SQL DB, surveillez E/S logs et processeur.

Obtenir de l’aide

Pour obtenir de l’aide supplémentaire, essayez la page de questions microsoft Q&A pour Azure Stream Analytics.

Étapes suivantes