Condividi tramite


Funzioni definite dall'utente in Databricks Connect per Python

Nota

Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 e versioni successive.

Databricks Connect per Python supporta funzioni definite dall'utente. Quando viene eseguita un'operazione DataFrame che include UDF, queste vengono serializzate da Databricks Connect e inviate al server come parte della richiesta.

Per informazioni sulle funzioni definite dall'utente in Databricks Connect per Scala, vedere Funzioni definite dall'utente in Databricks Connect per Scala.

Nota

Poiché la funzione definita dall'utente viene serializzata e deserializzata, la versione Python del client deve corrispondere alla versione python nel calcolo di Azure Databricks. Per le versioni supportate, vedere la matrice di supporto della versione.

Definire una funzione definita dall'utente

Per creare una UDF (funzione definita dall'utente) in Databricks Connect per Python, utilizzare una delle funzioni supportate seguenti:

Ad esempio, il seguente script Python configura una semplice funzione definita dall'utente (UDF) che eleva al quadrato i valori nella colonna.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Gestire le dipendenze delle funzioni definite dall'utente

Importante

Questa funzionalità è disponibile in anteprima pubblica e richiede Databricks Connect per Python 16.4 o versione successiva e un cluster che esegue Databricks Runtime 16.4 o versione successiva. Per usare questa funzionalità, abilita l'anteprima delle funzioni definite dall'utente Python avanzate nel Unity Catalog nell'area di lavoro.

Databricks Connect supporta la specifica delle dipendenze Python necessarie per UDFs (funzioni definite dall'utente). Queste dipendenze vengono installate nel calcolo di Databricks come parte dell'ambiente Python della funzione definita dall'utente.

Questa funzionalità consente agli utenti di specificare le dipendenze necessarie per la funzione definita dall'utente (UDF) oltre ai pacchetti forniti nell'ambiente di base. Può anche essere usato per installare una versione diversa del pacchetto da quello fornito nell'ambiente di base.

Le dipendenze possono essere installate dalle origini seguenti:

  • Pacchetti PyPI
    • I pacchetti PyPI possono essere specificati in base a PEP 508, ad esempio , dicepyjokes<1 o simplejson==3.19.*.
  • Pacchetti archiviati nei volumi del catalogo Unity
    • Sono supportate entrambe le distribuzioni compilate (.whl) e le distribuzioni di origine (.tar.gz).
    • I pacchetti dei volumi del catalogo Unity possono essere specificati come dbfs:<path>, ad esempio dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl o dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz.
    • All'utente deve essere concessa READ_FILE l'autorizzazione per il file nel volume re:[UC]. La concessione di questa autorizzazione a tutti gli utenti dell'account abilita automaticamente questa autorizzazione per i nuovi utenti.
  • Pacchetti, cartelle e file Python locali
    • Le distribuzioni predefinite locali (.whl), le distribuzioni di origine (.tar.gz), le cartelle e i file Python possono essere specificati come local:<path>, ad esempio, local:/path/to/my_private_dep-3.20.2-py3-none-any.whllocal:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folderlocal:/path/to/my_file.py.
    • Sono supportati sia percorsi assoluti che relativi, ad esempio: local:/path/to/my_file.py o local:./path/to/my_file.py.

Per includere dipendenze personalizzate nella funzione definita dall'utente, definirle in un ambiente usando withDependencies, quindi utilizzare quell'ambiente per creare una sessione Spark. Le dipendenze vengono installate nell'ambiente di calcolo Databricks e saranno disponibili in tutte le UDF che utilizzano questa sessione Spark.

Il codice seguente dichiara il pacchetto dice PyPI come dipendenza:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

In alternativa, per specificare una dipendenza di una rotellina in un volume:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Comportamento nei notebook e nei lavori di Databricks

Nei notebook e nei processi, le dipendenze della funzione definita dall'utente devono essere installate direttamente nel REPL. Databricks Connect convalida l'ambiente Python REPL verificando che tutte le dipendenze specificate siano già installate e generi un'eccezione se non sono installate. La convalida dell'ambiente notebook viene eseguita sia per le dipendenze dai volumi di PyPI che per le dipendenze di Unity Catalog, ma non per le dipendenze locali.

Limitazioni

  • Il supporto delle dipendenze UDF per pyspark.sql.streaming.DataStreamWriter.foreach richiede Databricks Connect per Python 18.0 o versione successiva e un cluster che esegue Databricks Runtime 18.0 o versione successiva.
  • Il supporto delle dipendenze UDF per pyspark.sql.streaming.DataStreamWriter.foreachBatch richiede Databricks Connect per Python 18.0 o versione successiva e un cluster che esegue Databricks Runtime 18.0 o versione successiva. La funzionalità non è supportata in serverless.
  • Il supporto delle dipendenze UDF per pacchetti, cartelle e file Python locali richiede Databricks Connect per Python 18.1 o versione successiva e un cluster che esegue Databricks Runtime 18.1 o versione successiva.
  • Le dipendenze UDF non sono supportate per le funzioni di aggregazione pandas definite dall'utente sulle funzioni di finestra.
  • I pacchetti dei volumi del catalogo Unity e i pacchetti locali devono essere inseriti in un pacchetto seguendo le specifiche standard per la creazione di pacchetti Python da PEP-427 o versione successiva per le distribuzioni con rotellina e PEP-241 o versione successiva per le distribuzioni di origine tar. Per altre informazioni sugli standard di creazione di pacchetti Python, vedere la documentazione di PyPA.

Esempi

L'esempio seguente definisce le dipendenze di PyPI e di volume in un ambiente, crea una sessione con tale ambiente, quindi definisce e chiama funzioni definite dall'utente che utilizzano queste dipendenze.

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]

local_deps = [
    # Example library from: https://pypi.org/project/simplejson/#files
    "local:./test/simplejson-3.20.2-py3-none-any.whl",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Gestione automatica delle dipendenze delle UDF

Importante

Questa funzionalità è disponibile in anteprima pubblica e richiede Databricks Connect per Python 18.1 o versione successiva, Python 3.12 nel computer locale e un cluster che esegue Databricks Runtime 18.1 o versione successiva. Per usare questa funzionalità, abilita l'anteprima delle funzioni definite dall'utente Python avanzate nel Unity Catalog nell'area di lavoro.

L'API Databricks Connect withAutoDependencies() consente l'individuazione automatica e il caricamento di moduli locali e dipendenze PyPI pubbliche usate nelle istruzioni import nelle UDF. Rimuove la necessità di specificare manualmente le dipendenze.

Il codice seguente abilita la gestione automatica delle dipendenze:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Il withAutoDependencies() metodo accetta i parametri seguenti:

  • upload_local: quando è impostato su True, i moduli locali importati dai UDF vengono individuati, inseriti in un pacchetto e caricati automaticamente nella sandbox UDF.
  • use_index: Quando impostato su True, le dipendenze PyPI pubbliche utilizzate nelle funzioni definite dall'utente (UDFs) vengono automaticamente individuate e installate sulle risorse di calcolo di Azure Databricks. Il processo di individuazione usa i pacchetti installati nel computer locale per determinare le versioni, garantendo la coerenza tra l'ambiente locale e l'ambiente di esecuzione remoto.

Limitazioni

  • Le importazioni dinamiche (ad esempio , importlib.import_module("foo")) non sono supportate.
  • I pacchetti dello spazio dei nomi (ad esempio azure.eventhub e google.cloud.aiplatform) non sono supportati.
  • Le dipendenze installate tramite riferimenti a URL diretti non sono supportate. Sono inclusi quelli installati dai file wheel locali.
  • Le dipendenze installate da indici di pacchetti privati non sono supportate. I pacchetti installati in questo modo non possono essere distinti dai pacchetti installati dal pyPI pubblico.
  • L'individuazione delle dipendenze non funziona in una shell Python. Sono supportati solo gli script Python, la shell IPython e i notebook di Jupyter.

Esempi

L'esempio seguente illustra la gestione automatica delle dipendenze con moduli locali e pacchetti PyPI. In questo esempio è necessario che sia stato installato simplejson e dice (usando pip install simplejson dice).

Creare prima di tutto moduli helper locali:

# my_helper.py
def double(x):
    return 2 * x
# my_json.py
import simplejson

def loads(x):
    return simplejson.loads(x)

def dumps(x):
    return simplejson.dumps(x)

Quindi, nello script principale, importare questi moduli e usarli nelle funzioni definite dall'utente:

# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
    return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
    "doubled": double_and_json_parse(col("id")),
    "summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Registrazione

Per restituire le dipendenze individuate, impostare la SPARK_CONNECT_LOG_LEVEL variabile di ambiente su info o debug. In alternativa, configurare il framework di registrazione Python:

import logging
logging.basicConfig(level=logging.INFO)

I log pertinenti vengono generati dal databricks.connect.auto_dependencies modulo, ad esempio:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Ambiente di base Python

Le funzioni definite dall'utente vengono eseguite nel calcolo di Databricks e non nel client. L'ambiente Python di base in cui vengono eseguite le funzioni definite dall'utente dipende dal calcolo di Databricks.

Per i cluster, l'ambiente Python di base è l'ambiente Python della versione di Databricks Runtime in esecuzione nel cluster. La versione di Python e l'elenco dei pacchetti in questo ambiente di base sono disponibili nelle sezioni Ambiente di sistema e Librerie Python installate delle note sulla versione di Databricks Runtime.

Per il calcolo serverless, l'ambiente Python di base corrisponde alla versione dell'ambiente serverless in base alla tabella seguente. Le versioni di Databricks Connect non elencate in questa tabella non supportano ancora serverless o hanno raggiunto la fine del supporto. Vedere la matrice di supporto delle versioni e le versioni di Databricks Connect giunte a fine supporto.

Versione di Databricks Connect Ambiente serverless UDF
18.0, Python 3.12 Versione 5
Da 17.2 a 17.3, Python 3.12 Versione 4
Da 16.4.1 a 17, Python 3.12 Versione 3
Da 15.4.10 a meno di 16, Python 3.12 Versione 3
Da 15.4.10 fino a sotto 16, Python 3.11 Versione 2