Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Nota
Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 e versioni successive.
Databricks Connect per Python supporta funzioni definite dall'utente. Quando viene eseguita un'operazione DataFrame che include UDF, queste vengono serializzate da Databricks Connect e inviate al server come parte della richiesta.
Per informazioni sulle funzioni definite dall'utente in Databricks Connect per Scala, vedere Funzioni definite dall'utente in Databricks Connect per Scala.
Nota
Poiché la funzione definita dall'utente viene serializzata e deserializzata, la versione Python del client deve corrispondere alla versione python nel calcolo di Azure Databricks. Per le versioni supportate, vedere la matrice di supporto della versione.
Definire una funzione definita dall'utente
Per creare una UDF (funzione definita dall'utente) in Databricks Connect per Python, utilizzare una delle funzioni supportate seguenti:
- Funzioni pySpark definite dall'utente
- Funzioni di streaming PySpark
Ad esempio, il seguente script Python configura una semplice funzione definita dall'utente (UDF) che eleva al quadrato i valori nella colonna.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Gestire le dipendenze delle funzioni definite dall'utente
Importante
Questa funzionalità è disponibile in anteprima pubblica e richiede Databricks Connect per Python 16.4 o versione successiva e un cluster che esegue Databricks Runtime 16.4 o versione successiva. Per usare questa funzionalità, abilita l'anteprima delle funzioni definite dall'utente Python avanzate nel Unity Catalog nell'area di lavoro.
Databricks Connect supporta la specifica delle dipendenze Python necessarie per UDFs (funzioni definite dall'utente). Queste dipendenze vengono installate nel calcolo di Databricks come parte dell'ambiente Python della funzione definita dall'utente.
Questa funzionalità consente agli utenti di specificare le dipendenze necessarie per la funzione definita dall'utente (UDF) oltre ai pacchetti forniti nell'ambiente di base. Può anche essere usato per installare una versione diversa del pacchetto da quello fornito nell'ambiente di base.
Le dipendenze possono essere installate dalle origini seguenti:
- Pacchetti PyPI
- I pacchetti PyPI possono essere specificati in base a PEP 508, ad esempio ,
dicepyjokes<1osimplejson==3.19.*.
- I pacchetti PyPI possono essere specificati in base a PEP 508, ad esempio ,
- Pacchetti archiviati nei volumi del catalogo Unity
- Sono supportate entrambe le distribuzioni compilate (
.whl) e le distribuzioni di origine (.tar.gz). - I pacchetti dei volumi del catalogo Unity possono essere specificati come
dbfs:<path>, ad esempiodbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlodbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - All'utente deve essere concessa
READ_FILEl'autorizzazione per il file nel volume re:[UC]. La concessione di questa autorizzazione a tutti gli utenti dell'account abilita automaticamente questa autorizzazione per i nuovi utenti.
- Sono supportate entrambe le distribuzioni compilate (
- Pacchetti, cartelle e file Python locali
- Le distribuzioni predefinite locali (
.whl), le distribuzioni di origine (.tar.gz), le cartelle e i file Python possono essere specificati comelocal:<path>, ad esempio,local:/path/to/my_private_dep-3.20.2-py3-none-any.whllocal:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folderlocal:/path/to/my_file.py. - Sono supportati sia percorsi assoluti che relativi, ad esempio:
local:/path/to/my_file.pyolocal:./path/to/my_file.py.
- Le distribuzioni predefinite locali (
Per includere dipendenze personalizzate nella funzione definita dall'utente, definirle in un ambiente usando withDependencies, quindi utilizzare quell'ambiente per creare una sessione Spark. Le dipendenze vengono installate nell'ambiente di calcolo Databricks e saranno disponibili in tutte le UDF che utilizzano questa sessione Spark.
Il codice seguente dichiara il pacchetto dice PyPI come dipendenza:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
In alternativa, per specificare una dipendenza di una rotellina in un volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportamento nei notebook e nei lavori di Databricks
Nei notebook e nei processi, le dipendenze della funzione definita dall'utente devono essere installate direttamente nel REPL. Databricks Connect convalida l'ambiente Python REPL verificando che tutte le dipendenze specificate siano già installate e generi un'eccezione se non sono installate. La convalida dell'ambiente notebook viene eseguita sia per le dipendenze dai volumi di PyPI che per le dipendenze di Unity Catalog, ma non per le dipendenze locali.
Limitazioni
- Il supporto delle dipendenze UDF per
pyspark.sql.streaming.DataStreamWriter.foreachrichiede Databricks Connect per Python 18.0 o versione successiva e un cluster che esegue Databricks Runtime 18.0 o versione successiva. - Il supporto delle dipendenze UDF per
pyspark.sql.streaming.DataStreamWriter.foreachBatchrichiede Databricks Connect per Python 18.0 o versione successiva e un cluster che esegue Databricks Runtime 18.0 o versione successiva. La funzionalità non è supportata in serverless. - Il supporto delle dipendenze UDF per pacchetti, cartelle e file Python locali richiede Databricks Connect per Python 18.1 o versione successiva e un cluster che esegue Databricks Runtime 18.1 o versione successiva.
- Le dipendenze UDF non sono supportate per le funzioni di aggregazione pandas definite dall'utente sulle funzioni di finestra.
- I pacchetti dei volumi del catalogo Unity e i pacchetti locali devono essere inseriti in un pacchetto seguendo le specifiche standard per la creazione di pacchetti Python da PEP-427 o versione successiva per le distribuzioni con rotellina e PEP-241 o versione successiva per le distribuzioni di origine tar. Per altre informazioni sugli standard di creazione di pacchetti Python, vedere la documentazione di PyPA.
Esempi
L'esempio seguente definisce le dipendenze di PyPI e di volume in un ambiente, crea una sessione con tale ambiente, quindi definisce e chiama funzioni definite dall'utente che utilizzano queste dipendenze.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Gestione automatica delle dipendenze delle UDF
Importante
Questa funzionalità è disponibile in anteprima pubblica e richiede Databricks Connect per Python 18.1 o versione successiva, Python 3.12 nel computer locale e un cluster che esegue Databricks Runtime 18.1 o versione successiva. Per usare questa funzionalità, abilita l'anteprima delle funzioni definite dall'utente Python avanzate nel Unity Catalog nell'area di lavoro.
L'API Databricks Connect withAutoDependencies() consente l'individuazione automatica e il caricamento di moduli locali e dipendenze PyPI pubbliche usate nelle istruzioni import nelle UDF. Rimuove la necessità di specificare manualmente le dipendenze.
Il codice seguente abilita la gestione automatica delle dipendenze:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Il withAutoDependencies() metodo accetta i parametri seguenti:
-
upload_local: quando è impostato suTrue, i moduli locali importati dai UDF vengono individuati, inseriti in un pacchetto e caricati automaticamente nella sandbox UDF. -
use_index: Quando impostato suTrue, le dipendenze PyPI pubbliche utilizzate nelle funzioni definite dall'utente (UDFs) vengono automaticamente individuate e installate sulle risorse di calcolo di Azure Databricks. Il processo di individuazione usa i pacchetti installati nel computer locale per determinare le versioni, garantendo la coerenza tra l'ambiente locale e l'ambiente di esecuzione remoto.
Limitazioni
- Le importazioni dinamiche (ad esempio ,
importlib.import_module("foo")) non sono supportate. - I pacchetti dello spazio dei nomi (ad esempio
azure.eventhubegoogle.cloud.aiplatform) non sono supportati. - Le dipendenze installate tramite riferimenti a URL diretti non sono supportate. Sono inclusi quelli installati dai file wheel locali.
- Le dipendenze installate da indici di pacchetti privati non sono supportate. I pacchetti installati in questo modo non possono essere distinti dai pacchetti installati dal pyPI pubblico.
- L'individuazione delle dipendenze non funziona in una shell Python. Sono supportati solo gli script Python, la shell IPython e i notebook di Jupyter.
Esempi
L'esempio seguente illustra la gestione automatica delle dipendenze con moduli locali e pacchetti PyPI. In questo esempio è necessario che sia stato installato simplejson e dice (usando pip install simplejson dice).
Creare prima di tutto moduli helper locali:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Quindi, nello script principale, importare questi moduli e usarli nelle funzioni definite dall'utente:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Registrazione
Per restituire le dipendenze individuate, impostare la SPARK_CONNECT_LOG_LEVEL variabile di ambiente su info o debug. In alternativa, configurare il framework di registrazione Python:
import logging
logging.basicConfig(level=logging.INFO)
I log pertinenti vengono generati dal databricks.connect.auto_dependencies modulo, ad esempio:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Ambiente di base Python
Le funzioni definite dall'utente vengono eseguite nel calcolo di Databricks e non nel client. L'ambiente Python di base in cui vengono eseguite le funzioni definite dall'utente dipende dal calcolo di Databricks.
Per i cluster, l'ambiente Python di base è l'ambiente Python della versione di Databricks Runtime in esecuzione nel cluster. La versione di Python e l'elenco dei pacchetti in questo ambiente di base sono disponibili nelle sezioni Ambiente di sistema e Librerie Python installate delle note sulla versione di Databricks Runtime.
Per il calcolo serverless, l'ambiente Python di base corrisponde alla versione dell'ambiente serverless in base alla tabella seguente. Le versioni di Databricks Connect non elencate in questa tabella non supportano ancora serverless o hanno raggiunto la fine del supporto. Vedere la matrice di supporto delle versioni e le versioni di Databricks Connect giunte a fine supporto.
| Versione di Databricks Connect | Ambiente serverless UDF |
|---|---|
| 18.0, Python 3.12 | Versione 5 |
| Da 17.2 a 17.3, Python 3.12 | Versione 4 |
| Da 16.4.1 a 17, Python 3.12 | Versione 3 |
| Da 15.4.10 a meno di 16, Python 3.12 | Versione 3 |
| Da 15.4.10 fino a sotto 16, Python 3.11 | Versione 2 |