Azure OpenAI voor big data

U kunt de Azure OpenAI-service gebruiken om veel taken in natuurlijke taal op te lossen door de voltooiings-API te vragen. De Azure OpenAI-service kan worden geïntegreerd met de gedistribueerde machine learning-bibliotheek SynapseML om de werkstromen gemakkelijker te schalen van enkele voorbeelden naar grote gegevenssets. Door deze integratie te gebruiken, kunt u het gedistribueerde Apache Spark-computingframework gebruiken om miljoenen prompts te verwerken met de OpenAI-service. In deze zelfstudie ziet u hoe u grote taalmodellen op een gedistribueerde schaal kunt toepassen met behulp van Azure OpenAI en Microsoft Fabric.

Vereisten

De belangrijkste vereisten voor deze quickstart zijn een werkende Azure OpenAI-resource en een Apache Spark-cluster waarop SynapseML is geïnstalleerd.

Deze handleiding als notitieblok importeren

De volgende stap bestaat uit het toevoegen van deze code aan uw Spark-cluster. U kunt een notebook maken in uw Spark-platform en de code naar dit notebook kopiëren om de demo uit te voeren. Download het notebook en importeer het in Synapse Analytics.

  1. Download deze demo als een notebook (selecteer Raw en sla het bestand vervolgens op)
  2. Importeer het notebook in de Synapse-werkruimte of importeer het in de Fabric-werkruimte
  3. Installeer SynapseML op uw cluster. Zie de installatie-instructies voor Synapse onderaan de SynapseML-website. Als u Fabric gebruikt, raadpleegt u de installatiehandleiding. Deze stap vereist het plakken van een extra cel boven aan het notitieblok dat u hebt geïmporteerd.
  4. Verbind uw notebook met een cluster en volg de instructies, bewerk de cellen en voer ze uit.

Servicegegevens invullen

Bewerk vervolgens de cel in het notebook zodat deze naar uw service wijst. Stel de variabelen service_name, deployment_name, location en key in om overeen te komen met uw OpenAI-service:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-4.1-mini"
deployment_name_embeddings = "text-embedding-3-small"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

Een dataset creëren van prompts

Maak vervolgens een gegevensframe dat bestaat uit een reeks rijen, met één prompt per rij.

U kunt gegevens ook rechtstreeks vanuit ADLS of andere databases laden. Zie de Handleiding voor het laden van Apache Spark-gegevens voor meer informatie over het laden en voorbereiden van Spark-gegevensframes.

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code that's",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

De OpenAIPrompt Apache Spark-client maken

Als u de Azure OpenAI-service wilt toepassen op uw dataframe, maakt u een OpenAIPrompt-object, dat fungeert als een gedistribueerde client. Stel de serviceparameters in met één waarde of een dataframekolom met behulp van de juiste setters op het OpenAIPrompt object. Stel in dit voorbeeld maxTokens in op 200. Een token is ongeveer vier tekens en deze limiet is van toepassing op de som van de prompt en het resultaat. Stel de promptCol parameter in met de naam van de promptkolom in het dataframe.

from synapse.ml.services.openai import OpenAIPrompt

completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Het dataframe transformeren met behulp van de OpenAIPrompt-client

Nadat u het dataframe en de promptclient hebt gemaakt, transformeert u uw invoergegevensset en voegt u een kolom toe met de naam completions met alle informatie die de service toevoegt. Selecteer alleen de tekst om het eenvoudig te maken.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

Uw uitvoer ziet er ongeveer als volgt uit. De voltooiingstekst verschilt van het voorbeeld.

prompt fout tekst
Hallo mijn naam is Nul Makaveli ik ben 18 jaar oud en ik wil rapper worden als ik groot ben, ik hou van schrijven en muziek maken, ik kom uit Los Angeles, CA.
De beste code is code die eenvoudig te begrijpen is. Nul begrijpelijk Dit is een subjectieve verklaring en er is geen definitief antwoord.
SynapseML is Nul Een machine learning-algoritme dat kan leren hoe u het toekomstige resultaat van gebeurtenissen kunt voorspellen.

Meer gebruiksvoorbeelden

Tekst insluitingen genereren

Naast het voltooien van tekst kunt u ook tekst insluiten voor gebruik in downstreamalgoritmen of vector ophalen architecturen. Door insluitingen te maken, kunt u documenten zoeken en ophalen uit grote verzamelingen. Gebruik deze benadering wanneer prompt engineering niet voldoende is voor de taak. Zie de OpenAIEmbedding voor meer informatie over het gebruik.

from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

Chataanvulling

Modellen zoals GPT-4o en GPT-4.1 begrijpen chats in plaats van enkele prompts. De OpenAIChatCompletion transformator maakt deze functionaliteit op schaal beschikbaar.

from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "What's your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Doorvoer verbeteren met batchverwerking van aanvragen

In het voorbeeld worden verschillende aanvragen naar de service verzonden, één voor elke prompt. Als u meerdere prompts in één aanvraag wilt voltooien, gebruikt u de batchmodus. Geef eerst in het OpenAIPrompt object, in plaats van de kolom Prompt in te stellen op 'Prompt', 'batchPrompt' op voor de kolom BatchPrompt. Hiervoor maakt u een dataframe met een lijst met prompts per rij.

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

Maak vervolgens het OpenAIPrompt object. In plaats van de promptkolom in te stellen, stelt u de kolom batchPrompt in als uw kolom van het type Array[String]is.

batch_completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

In de aanroep om te transformeren wordt een aanvraag per rij ingediend. Omdat elke rij meerdere prompts bevat, verzendt elke aanvraag alle prompts in die rij. De resultaten bevatten een rij voor elke rij in de aanvraag.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Een automatische minibatcher gebruiken

Als uw gegevens in een kolomformaat zijn, kunt u ze naar een rijformaat transponeren met behulp van SynapseML FixedMiniBatcherTransformer.

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

Prompt engineering voor vertaling

De Azure OpenAI-service kan veel verschillende natuurlijke taaltaken oplossen via prompt engineering. In dit voorbeeld wordt gevraagd om taalomzetting:

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

Uitnodiging tot beantwoording van vragen

In dit voorbeeld wordt het model gevraagd om algemene kennisvragen te beantwoorden:

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))