Azure OpenAI pour le Big Data

Vous pouvez utiliser le service openAI Azure pour résoudre de nombreuses tâches en langage naturel en invitant l’API d’achèvement. Pour faciliter la mise à l’échelle de vos flux de travail de génération de requêtes à partir de quelques exemples vers de grands jeux de données d’exemples, le service OpenAI Azure s’intègre à la bibliothèque de machine learning distribuée SynapseML. À l’aide de cette intégration, vous pouvez utiliser l’infrastructure d’informatique distribuée Apache Spark pour traiter des millions d’invites avec le service OpenAI. Ce tutoriel montre comment appliquer des modèles de langage volumineux à une échelle distribuée à l’aide de Azure OpenAI et de Microsoft Fabric.

Prérequis

Les principales conditions préalables à ce guide rapide incluent une ressource Azure OpenAI fonctionnelle et un cluster Apache Spark avec SynapseML installé.

  • Obtenez un abonnement Microsoft Fabric. Vous pouvez également vous inscrire à un essai gratuit Microsoft Fabric.

  • Connectez-vous à Microsoft Fabric.

  • Basculez vers Fabric à l’aide du sélecteur d’expérience situé en bas à gauche de votre page d’accueil.

    Capture d’écran montrant la sélection de Fabric dans le menu du sélecteur d’expérience.

Importer ce guide en tant que notebook

L’étape suivante consiste à ajouter ce code à votre cluster Spark. Vous pouvez soit créer un bloc-notes dans votre plateforme Spark et copier le code dans ce bloc-notes pour exécuter la démo. Vous pouvez également télécharger le notebook et l’importer dans Synapse Analytics.

  1. Télécharger cette démo sous forme de notebook (sélectionner Raw, puis enregistrer le fichier)
  2. Importez le notebook dans l’espace de travail Synapse ou, si vous utilisez Fabric, importez-le dans l’espace de travail Fabric.
  3. Installez SynapseML sur votre cluster. Consultez les instructions d’installation de Synapse en bas du site web SynapseML. Si vous utilisez Fabric, consultez le Guide d’installation. Cette étape nécessite de coller une cellule supplémentaire en haut du bloc-notes que vous avez importé.
  4. Connectez votre ordinateur portable à un cluster et suivez les étapes en modifiant et en exécutant les cellules.

Remplissez les informations de service.

Ensuite, modifiez la cellule du notebook pour pointer vers votre service. Définissez les variables service_name, deployment_name, location et key pour qu’elles correspondent à votre service OpenAI.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-4.1-mini"
deployment_name_embeddings = "text-embedding-3-small"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

Créer un jeu de données de requêtes

Ensuite, créez un dataframe composé d’une série de lignes, avec une invite par ligne.

Vous pouvez également charger des données directement à partir d'ADLS ou d'autres bases de données. Pour plus d'informations sur le chargement et la préparation des dataframes Spark, consultez le guide de chargement des données Apache Spark.

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code that's",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

Créer le client Apache Spark OpenAIPrompt

Pour appliquer le service Azure OpenAI à votre dataframe, créez un objet OpenAIPrompt, qui agit comme un client distribué. Définissez les paramètres de service avec une valeur unique ou une colonne de trame de données à l’aide des setters appropriés sur l’objet OpenAIPrompt . Dans cet exemple, définissez maxTokens la valeur 200. Un jeton est d’environ quatre caractères, et cette limite s’applique à la somme de l’invite et du résultat. Définissez le promptCol paramètre avec le nom de la colonne d’invite dans le dataframe.

from synapse.ml.services.openai import OpenAIPrompt

completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Transformer le dataframe à l’aide du client OpenAIPrompt

Après avoir créé le dataframe et le client d’invite, transformez votre jeu de données d’entrée et ajoutez une colonne nommée completions avec toutes les informations ajoutées par le service. Pour plus de facilité, sélectionnez uniquement le texte.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

Votre sortie doit ressembler à ceci. Le texte de complétion est différent de l’exemple.

prompt erreur texte
Bonjour, je m’appelle null Makaveli, j'ai 18 ans et je veux devenir rappeur quand je serai grand. J'aime écrire et faire de la musique, et je viens de Los Angeles, CA.
Le meilleur code est celui qui est null compréhensible Il s’agit d’une déclaration subjective, et il n’y a pas de réponse définitive.
SynapseML est null Algorithme machine learning capable d’apprendre à prédire le résultat futur des événements.

Autres exemples d’utilisation

Génération d’incorporations de texte

Outre l’achèvement du texte, vous pouvez également incorporer du texte à utiliser dans des algorithmes en aval ou des architectures de récupération de vecteurs. En créant des incorporations, vous pouvez rechercher et récupérer des documents à partir de grandes collections. Utilisez cette approche lorsque l’ingénierie d’invite n’est pas suffisante pour la tâche. Pour plus d’informations sur l’utilisation OpenAIEmbedding, consultez le guide d’incorporation.

from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

Achèvement du chat

Les modèles tels que GPT-4o et GPT-4.1 comprennent les conversations plutôt que les invites uniques. Le transformateur OpenAIChatCompletion expose cette fonctionnalité à grande échelle.

from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "What's your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Améliorer le débit avec le traitement par lots de requêtes

L'exemple fait plusieurs requêtes au service, une pour chaque invite de commande. Pour effectuer plusieurs invites dans une seule requête, utilisez le mode batch. Tout d’abord, dans l’objet OpenAIPrompt, au lieu de définir la colonne "Prompt" sur « Prompt », spécifiez « batchPrompt » pour la colonne "BatchPrompt". Pour ce faire, créez un dataframe avec une liste d’invites par ligne.

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

Ensuite, créez l’objet OpenAIPrompt . Au lieu de définir la colonne d’invite, définissez la colonne batchPrompt si votre colonne est de type Array[String].

batch_completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Lors de l'appel à la transformation, une requête est effectuée par ligne. Étant donné que chaque ligne contient plusieurs invites, chaque requête envoie toutes les invites de cette ligne. Les résultats contiennent une ligne pour chaque ligne de la requête.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Utilisation d'un minidoseur automatique

Si vos données sont au format de colonne, vous pouvez la transposer au format de ligne à l’aide de FixedMiniBatcherTransformerSynapseML.

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

Ingénierie des instructions pour la traduction

Le service Azure OpenAI peut résoudre de nombreuses tâches de langage naturel grâce à l'ingénierie des invites. Cet exemple montre comment demander la traduction linguistique :

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

Demander la réponse aux questions

Cet exemple invite le modèle à répondre aux questions générales :

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))