Använda Livy-API:et för att skicka och köra Livy-batchjobb

Gäller för:✅ Fabric Data Engineering och Data Science

Lär dig hur du skickar Spark-batchjobb med livy-API:et för Fabric Data Engineering. Livy API stöder för närvarande inte Azure Service Principal (SPN).

Förutsättningar

Livy-API:et definierar en enhetlig slutpunkt för åtgärder. Ersätt platshållarna {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} och {Fabric_LakehouseID} med lämpliga värden när du följer exemplen i den här artikeln.

Konfigurera Visual Studio Code för Livy API Batch

  1. Välj Lakehouse Settings i din Fabric Lakehouse.

    Skärmbild som visar Lakehouse-inställningar.

  2. Gå till Livy-endpoint-avsnittet.

    screenshot som visar Lakehouse Livy-slutpunktens och sessionsjobbets anslutningssträng.

  3. Kopiera Batch-jobbets anslutningssträng (den andra röda rutan i bilden) till din kod.

  4. Gå till Microsoft Entra admin center och kopiera både program-ID:t (klient- och katalog-ID:t) till koden.

    Skärmbild som visar Översikt över Livy API-appen i Microsoft Entra admin center.

Skapa en Spark Batch-kod och ladda upp till lakehouse

  1. Skapa en .ipynb notebook-fil i Visual Studio Code och infoga följande kod

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
        #Spark session builder
        spark_session = (SparkSession
            .builder
            .appName("batch_demo") 
            .getOrCreate())
    
        spark_context = spark_session.sparkContext
        spark_context.setLogLevel("DEBUG")  
    
        tableName = spark_context.getConf().get("spark.targetTable")
    
        if tableName is not None:
            print("tableName: " + str(tableName))
        else:
            print("tableName is None")
    
        df_valid_totalPrice = spark_session.sql("SELECT * FROM green_tripdata_2022 where total_amount > 0")
        df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("lpep_pickup_datetime").substr(1, 4))
    
    
        deltaTablePath = f"Tables/{tableName}CleanedTransactions"
        df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Spara filen Python lokalt. Den här Python-kodlösningen innehåller två Spark-instruktioner som arbetar med data i en Lakehouse-miljö och som måste laddas upp till din Lakehouse. Du behöver ABFS-sökvägen (Azure Blob File System) för nyttolasten för att referera till i ditt Livy API-batchjobb i Visual Studio Code och ditt tabellnamn i Lakehouse i SQL-instruktionen SELECT.

    Skärmbild som visar Python-nyttolastcellen.

  3. Ladda upp Python-fil till avsnittet för filer i Lakehouse. I Lakehouse-utforskaren väljer du Filer. Välj >Hämta data>sedanUppladdningsfiler. Välj filer via filväljaren.

    Skärmbild som visar innehåll i Filavsnittet i Lakehouse.

  4. När filen finns i avsnittet Filer i Lakehouse väljer du de tre punkterna (ellipsen) till höger om nyttolastens filnamn och väljer Egenskaper.

    Skärmbild som visar ABFS-sökvägen för nyttolasten i filens egenskaper i Lakehouse.

  5. Kopiera denna ABFS-sökväg till cell i anteckningsboken i steg 1.

Autentisera en Livy API Spark-batchsession med antingen en Microsoft Entra användartoken eller en Microsoft Entra SPN-token

Autentisera en Livy API Spark-batchsession med en Microsoft Entra SPN-token

  1. Skapa en .ipynb notebook-fil i Visual Studio Code och infoga följande kod.

    import sys
    from msal import ConfidentialClientApplication
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Service Principal Application ID
    
    # Certificate paths - Update these paths to your certificate files
    certificate_path = "PATH_TO_YOUR_CERTIFICATE.pem"      # Public certificate file
    private_key_path = "PATH_TO_YOUR_PRIVATE_KEY.pem"      # Private key file
    certificate_thumbprint = "YOUR_CERTIFICATE_THUMBPRINT" # Certificate thumbprint
    
    # OAuth settings
    audience = "https://analysis.windows.net/powerbi/api/.default"
    authority = f"https://login.windows.net/{tenant_id}"
    
    def get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint=None):
        """
        Get an app-only access token for a Service Principal using OAuth 2.0 client credentials flow.
    
        This function uses certificate-based authentication which is more secure than client secrets.
    
        Args:
            client_id (str): The Service Principal's client ID  
            audience (str): The audience for the token (resource scope)
            authority (str): The OAuth authority URL
            certificate_path (str): Path to the certificate file (.pem format)
            private_key_path (str): Path to the private key file (.pem format)
            certificate_thumbprint (str): Certificate thumbprint (optional but recommended)
    
        Returns:
            str: The access token for API authentication
    
        Raises:
            Exception: If token acquisition fails
        """
        try:
            # Read the certificate from PEM file
            with open(certificate_path, "r", encoding="utf-8") as f:
                certificate_pem = f.read()
    
            # Read the private key from PEM file
            with open(private_key_path, "r", encoding="utf-8") as f:
                private_key_pem = f.read()
    
            # Create the confidential client application
            app = ConfidentialClientApplication(
                client_id=client_id,
                authority=authority,
                client_credential={
                    "private_key": private_key_pem,
                    "thumbprint": certificate_thumbprint,
                    "certificate": certificate_pem
                }
            )
    
            # Acquire token using client credentials flow
            token_response = app.acquire_token_for_client(scopes=[audience])
    
            if "access_token" in token_response:
                print("Successfully acquired access token")
                return token_response["access_token"]
            else:
                raise Exception(f"Failed to retrieve token: {token_response.get('error_description', 'Unknown error')}")
    
        except FileNotFoundError as e:
            print(f"Certificate file not found: {e}")
            sys.exit(1)
        except Exception as e:
            print(f"Error retrieving token: {e}", file=sys.stderr)
            sys.exit(1)
    
    # Get the access token
    token = get_access_token(client_id, audience, authority, certificate_path, private_key_path, certificate_thumbprint)
    
  2. Kör notebook-cellen. Du bör se att token från Microsoft Entra visas.

    Skärmdump som visar en Microsoft Entra SPN-token som returneras efter att cellen har körts.

Autentisera en Livy API Spark-session med hjälp av en Microsoft Entra användartoken

  1. Skapa en .ipynb notebook-fil i Visual Studio Code och infoga följande kod.

    from msal import PublicClientApplication
    import requests
    import time
    
    # Configuration - Replace with your actual values
    tenant_id = "Entra_TenantID"  # Microsoft Entra tenant ID
    client_id = "Entra_ClientID"  # Application ID (can be the same as above or different)
    
    # Required scopes for Livy API access
    scopes = [
        "https://api.fabric.microsoft.com/Lakehouse.Execute.All",      # Required — execute operations in lakehouses
        "https://api.fabric.microsoft.com/Lakehouse.Read.All",         # Required — read lakehouse metadata
        "https://api.fabric.microsoft.com/Code.AccessFabric.All",      # Required — general Fabric API access from Spark Runtime
        "https://api.fabric.microsoft.com/Code.AccessStorage.All",     # Required — access OneLake and Azure storage from Spark Runtime
    ]
    
    # Optional scopes — add these only if your Spark jobs need access to the corresponding services:
    #    "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All"     # Optional — access Azure Key Vault from Spark Runtime
    #    "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All"     # Optional — access Azure Data Lake Storage Gen1 from Spark Runtime
    #    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All" # Optional — access Azure Data Explorer from Spark Runtime
    #    "https://api.fabric.microsoft.com/Code.AccessSQL.All"               # Optional — access Azure SQL audience tokens from Spark Runtime
    
    def get_access_token(tenant_id, client_id, scopes):
        """
        Get an access token using interactive authentication.
    
        This method will open a browser window for user authentication.
    
        Args:
            tenant_id (str): The Azure Active Directory tenant ID
            client_id (str): The application client ID
            scopes (list): List of required permission scopes
    
        Returns:
            str: The access token, or None if authentication fails
        """
        app = PublicClientApplication(
            client_id,
            authority=f"https://login.microsoftonline.com/{tenant_id}"
        )
    
        print("Opening browser for interactive authentication...")
        token_response = app.acquire_token_interactive(scopes=scopes)
    
        if "access_token" in token_response:
            print("Successfully authenticated")
            return token_response["access_token"]
        else:
            print(f"Authentication failed: {token_response.get('error_description', 'Unknown error')}")
            return None
    
    # Uncomment the lines below to use interactive authentication
    token = get_access_token(tenant_id, client_id, scopes)
    print("Access token acquired via interactive login")
    
  2. Kör notebook-cellen. Ett popup-fönster bör visas i webbläsaren så att du kan välja den identitet som du vill logga in med.

    Skärmbild som visar inloggningsskärmen till Microsoft Entra app.

  3. När du har valt den identitet som du vill logga in med måste du godkänna api-behörigheterna för Microsoft Entra appregistrering.

    Screenshot som visar Microsoft Entra app-API-behörigheter.

  4. Stäng webbläsarfönstret när autentiseringen har slutförts.

    Skärmbild som visar att autentiseringen är klar.

  5. I Visual Studio Code kommer du att se att Microsoft Entra-token har returnerats.

    Screenshot som visar Microsoft Entra-tokenen som returneras efter att ha kört cellen och loggat in.

Förstå Code.* omfånget för Livy-API:n

När dina Spark-jobb körs via Livy-API:et styr omfången Code.* vilka externa tjänster Spark Runtime kan komma åt för den autentiserade användarens räkning. Två krävs; resten är valfria beroende på din arbetsbelastning.

Nödvändig kod.* omfång

Scope Beskrivning
Code.AccessFabric.All Tillåter hämtning av åtkomsttoken till Microsoft Fabric. Krävs för alla Livy API-åtgärder.
Code.AccessStorage.All Tillåter att åtkomsttoken hämtas till OneLake och Azure lagring. Krävs för att läsa och skriva data i lakehouses.

Valfri kod.* omfång

Lägg bara till dessa omfång om dina Spark-jobb behöver komma åt motsvarande Azure tjänster vid körning.

Scope Beskrivning När det bör användas
Code.AccessAzureKeyvault.All Tillåter att åtkomsttoken hämtas till Azure Key Vault. Spark-koden hämtar hemligheter, nycklar eller certifikat från Azure Key Vault.
Code.AccessAzureDataLake.All Tillåter att åtkomsttoken hämtas till Azure Data Lake Storage Gen1. Spark-koden läser från eller skriver till Azure Data Lake Storage Gen1 konton.
Code.AccessAzureDataExplorer.All Tillåter att åtkomsttoken hämtas till Azure Data Explorer (Kusto). Spark-koden frågar eller matar in data till/från Azure Data Explorer kluster.
Code.AccessSQL.All Tillåter att åtkomsttoken hämtas till Azure SQL. Spark-koden måste ansluta till Azure SQL databaser.

Anmärkning

Omfången Lakehouse.Execute.All och Lakehouse.Read.All krävs också men ingår inte i Code.* familjen. De ger behörighet att köra åtgärder i och läsa metadata från Fabric lakehouses respektive.

Skicka in en Livy batch och övervaka batchjobbet.

  1. Lägg till ytterligare en notebook-cell och infoga den här koden.

    # submit payload to existing batch session
    
    import requests
    import time
    import json
    
    api_base_url = "https://api.fabric.microsoft.com/v1"  # Base URL for Fabric APIs
    
    # Fabric Resource IDs - Replace with your workspace and lakehouse IDs  
    workspace_id = "Fabric_WorkspaceID"
    lakehouse_id = "Fabric_LakehouseID"
    
    # Construct the Livy Batch API URL
    # URL pattern: {base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/{api_version}/batches
    livy_base_url = f"{api_base_url}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livyApi/versions/2023-12-01/batches"
    
    # Set up authentication headers
    headers = {"Authorization": f"Bearer {token}"}
    
    print(f"Livy Batch API URL: {livy_base_url}")
    
    new_table_name = "TABLE_NAME"  # Name for the new table
    
    # Configure the batch job
    print("Configuring batch job parameters...")
    
    # Batch job configuration - Modify these values for your use case
    payload_data = {
        # Job name - will appear in the Fabric UI
        "name": f"livy_batch_demo_{new_table_name}",
    
        # Path to your Python file in the lakehouse
        "file": "<ABFSS_PATH_TO_YOUR_PYTHON_FILE>",  # Replace with your Python file path
    
        # Optional: Spark configuration parameters
        "conf": {
            "spark.targetTable": new_table_name,  # Custom configuration for your application
        },
    }
    
    print("Batch Job Configuration:")
    print(json.dumps(payload_data, indent=2))
    
    try:
        # Submit the batch job
        print("\nSubmitting batch job...")
        post_batch = requests.post(livy_base_url, headers=headers, json=payload_data)
    
        if post_batch.status_code == 202:
            batch_info = post_batch.json()
            print("Livy batch job submitted successfully!")
            print(f"Batch Job Info: {json.dumps(batch_info, indent=2)}")
    
            # Extract batch ID for monitoring
            batch_id = batch_info['id']
            livy_batch_get_url = f"{livy_base_url}/{batch_id}"
    
            print(f"\nBatch Job ID: {batch_id}")
            print(f"Monitoring URL: {livy_batch_get_url}")
    
        else:
            print(f"Failed to submit batch job. Status code: {post_batch.status_code}")
            print(f"Response: {post_batch.text}")
    
    except requests.exceptions.RequestException as e:
        print(f"Network error occurred: {e}")
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response text: {post_batch.text}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
  2. Kör notebook-cellen. Du bör se flera rader skrivas ut medan Livy Batch-jobbet skapas och körs.

    Screenshot som visar resultat i Visual Studio Code efter att Livy Batch-jobbet har skickats.

  3. Om du vill se ändringarna går du tillbaka till Lakehouse.

Integrering med Fabric miljöer

Som standard körs den här Livy API-sessionen mot standardstartpoolen för arbetsytan. Du kan också använda Fabric Miljöer Skapa, konfigurera och använda en miljö i Microsoft Fabric för att anpassa Spark-poolen som Livy API-sessionen använder för dessa Spark-jobb. Om du vill använda din Fabric-miljö uppdaterar du den föregående notebook-cellen med den här radändringen.

payload_data = {
    "name":"livybatchdemo_with"+ newlakehouseName,
    "file":"abfss://YourABFSPathToYourPayload.py", 
    "conf": {
        "spark.targetLakehouse": "Fabric_LakehouseID",
        "spark.fabric.environmentDetails" : "{\"id\" : \""EnvironmentID"\"}"  # remove this line to use starter pools instead of an environment, replace "EnvironmentID" with your environment ID
        }
    }

Visa dina jobb i övervakningshubben

Du kan komma åt övervakningshubben för att visa olika Apache Spark-aktiviteter genom att välja Övervaka i navigeringslänkarna till vänster.

  1. När batchjobbet har slutförts kan du visa sessionsstatusen genom att gå till Övervaka.

    Skärmbild som visar tidigare Livy API-inlämningar i övervakningshubben.

  2. Välj och öppna det senaste aktivitetsnamnet.

    Skärmbild som visar den senaste Livy API-aktiviteten i övervakningshubben.

  3. I det här Livy API-sessionsfallet kan du se din tidigare batchöverföring, körningsinformation, Spark-versioner och konfiguration. Observera den stoppade statusen längst upp till höger.

    Skärmbild som visar den senaste livy-API-aktivitetsinformationen i övervakningshubben.

För att sammanfatta hela processen behöver du en fjärrklient, till exempel Visual Studio Code, en Microsoft Entra apptoken, Livy API-slutpunkts-URL, autentisering mot ditt Lakehouse, en Spark-nyttolast i Lakehouse och slutligen en Batch Livy API-session.