Freigeben über


Benutzerdefinierte Funktionen in Databricks Connect für Python

Hinweis

Dieser Artikel behandelt Databricks Connect für Databricks Runtime 13.3 und höher.

Databricks Connect für Python unterstützt benutzerdefinierte Funktionen (UDF). Wenn ein DataFrame-Vorgang ausgeführt wird, der UDFs enthält, werden die UDFs von Databricks Connect serialisiert und als Teil der Anforderung an den Server gesendet.

Informationen zu UDFs für Databricks Connect für Scala finden Sie unter Benutzerdefinierte Funktionen in Databricks Connect für Scala.

Hinweis

Da die benutzerdefinierte Funktion serialisiert und deserialisiert wird, muss die Python-Version des Clients mit der Python-Version auf der Azure Databricks-Berechnung übereinstimmen. Unterstützte Versionen finden Sie in der Versionsunterstützungsmatrix.

Definieren einer UDF

Verwenden Sie eine der folgenden unterstützten Funktionen, um eine UDF in Databricks Connect für Python zu erstellen:

Beispielsweise richtet die folgende Python eine einfache UDF ein, die die Werte in einer Spalte quadratiert.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Verwalten von UDF-Abhängigkeiten

Von Bedeutung

Dieses Feature befindet sich in der öffentlichen Vorschau und erfordert Databricks Connect für Python 16.4 oder höher und ein Cluster mit Databricks Runtime 16.4 oder höher. Um dieses Feature zu verwenden, aktivieren Sie die Vorschau enhanced Python UDFs im Unity-Katalog in Ihrem Arbeitsbereich.

Databricks Connect unterstützt die Angabe von Python-Abhängigkeiten, die für UDFs erforderlich sind. Diese Abhängigkeiten werden im Databricks-Computing als Teil der Python-Umgebung der UDF installiert.

Mit diesem Feature können Benutzer Abhängigkeiten angeben, die die UDF zusätzlich zu den in der Basisumgebung bereitgestellten Paketen benötigt. Es kann auch verwendet werden, um eine andere Version des Pakets zu installieren, als die in der Basisumgebung bereitgestellte Version.

Abhängigkeiten können aus den folgenden Quellen installiert werden:

  • PyPI-Pakete
    • PyPI-Pakete können nach PEP 508 angegeben werden, dicez. B. , pyjokes<1 oder simplejson==3.19.*.
  • Pakete, die in Unity-Katalogvolumen gespeichert sind
    • Sowohl integrierte Verteilungen (.whl) als auch Quellverteilungen (.tar.gz) werden unterstützt.
    • Unity-Katalogvolumespakete können als dbfs:<path> angegeben werden, z. B. dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl oder dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz.
    • Dem Benutzer muss die Berechtigung READ_FILE für die Datei im Volume "re:[UC]" erteilt werden. Durch die Gewährung dieser Berechtigung für alle Kontobenutzer wird dies automatisch für neue Benutzer aktiviert.
  • Lokale Pakete, Ordner und Python-Dateien
    • Lokale erstellte Verteilungen (.whl), Quellverteilungen (.tar.gz), Ordner und Python-Dateien können als local:<path>, z. B.: local:/path/to/my_private_dep-3.20.2-py3-none-any.whl, , local:/path/to/my_private_dep-4.0.0.tar.gz, local:/path/to/my_folder, angegeben local:/path/to/my_file.pywerden.
    • Sowohl absolute als auch relative Pfade werden unterstützt, z. B.: local:/path/to/my_file.py oder local:./path/to/my_file.py.

Um benutzerdefinierte Abhängigkeiten in Ihre UDF einzuschließen, geben Sie diese in einer Umgebung mit withDependencies an und verwenden Sie dann diese Umgebung, um eine Spark-Sitzung zu erstellen. Die Abhängigkeiten werden auf Ihren Databricks-Berechnungen installiert und sind in allen UDFs verfügbar, die diese Spark-Sitzung verwenden.

Der folgende Code deklariert das PyPI-Paket dice als Abhängigkeit:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Oder, um eine Abhängigkeit eines Rads in einem Volume anzugeben:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Verhalten in Databricks-Notebooks und -Jobs

In Notebooks und Jobs müssen die UDF Abhängigkeiten direkt in der REPL installiert werden. Databricks Connect überprüft die REPL Python-Umgebung, indem überprüft wird, ob alle angegebenen Abhängigkeiten bereits installiert sind, und löst eine Ausnahme aus, wenn keines installiert ist. Die Überprüfung der Notizbuchumgebung wird sowohl für PyPI- als auch für Unity Catalog-Volumenabhängigkeiten ausgeführt, jedoch nicht für lokale Abhängigkeiten.

Einschränkungen

  • Unterstützung für UDF-Abhängigkeiten von pyspark.sql.streaming.DataStreamWriter.foreach setzt Databricks Connect für Python 18.0 oder höher und ein Cluster mit Databricks Runtime 18.0 oder höher voraus.
  • Die Unterstützung von UDF-Abhängigkeiten für pyspark.sql.streaming.DataStreamWriter.foreachBatch erfordert Databricks Connect in der Python-Version 18.0 oder höher sowie ein Cluster mit Databricks Runtime 18.0 oder höher. Das Feature wird auf serverlosen Servern nicht unterstützt.
  • UDF-Abhängigkeiten für lokale Pakete, Ordner und Python-Dateien erfordern Databricks Connect für Python 18.1 oder höher und ein Cluster mit Databricks Runtime 18.1 oder höher.
  • UDF-Abhängigkeiten werden für Pandas-Aggregations-UDFs bei Fensterfunktionen nicht unterstützt.
  • Unity Catalog-Volumina und lokale Pakete müssen nach den Standard-Python-Verpackungsspezifikationen von PEP-427 oder höher für Wheel-Built-Distributionen und PEP-241 oder höher für Tar-Quell-Distributionen verpackt werden. Weitere Informationen zu Python-Verpackungsstandards finden Sie in der PyPA-Dokumentation.

Beispiele

Im folgenden Beispiel werden PyPI- und Volumesabhängigkeiten in einer Umgebung definiert, eine Sitzung mit dieser Umgebung erstellt und dann UDFs definiert und aufgerufen, die diese Abhängigkeiten verwenden:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]

local_deps = [
    # Example library from: https://pypi.org/project/simplejson/#files
    "local:./test/simplejson-3.20.2-py3-none-any.whl",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Automatische Verwaltung von UDF-Abhängigkeiten

Von Bedeutung

Dieses Feature befindet sich in der öffentlichen Vorschau und erfordert Databricks Connect für Python 18.1 oder höher, Python 3.12 auf Ihrem lokalen Computer und einen Cluster mit Databricks Runtime 18.1 oder höher. Um dieses Feature zu verwenden, aktivieren Sie die Vorschau enhanced Python UDFs im Unity-Katalog in Ihrem Arbeitsbereich.

Die Databricks Connect-API withAutoDependencies() ermöglicht die automatische Ermittlung und Upload lokaler Module und öffentlicher PyPI-Abhängigkeiten, die in den Importanweisungen in Ihren UDFs verwendet werden. Es entfernt die Notwendigkeit, Abhängigkeiten manuell anzugeben.

Der folgende Code aktiviert die automatische Abhängigkeitsverwaltung:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Die withAutoDependencies() Methode akzeptiert die folgenden Parameter:

  • upload_local: Bei Festlegung auf True, werden lokale Module, die in Ihren UDFs importiert wurden, automatisch erkannt, verpackt und in die UDF-Sandbox hochgeladen.
  • use_index: Bei Festlegung auf True, werden öffentliche PyPI-Abhängigkeiten, die in Ihren UDFs verwendet werden, automatisch ermittelt und auf Azure Databricks berechnet. Der Ermittlungsprozess verwendet die installierten Pakete auf Ihrem lokalen Computer, um Versionen zu ermitteln und die Konsistenz zwischen Ihrer lokalen Umgebung und der Remoteausführungsumgebung sicherzustellen.

Einschränkungen

  • Dynamische Importe (z. B importlib.import_module("foo"). ) werden nicht unterstützt.
  • Namespacepakete (z. B azure.eventhub . und google.cloud.aiplatform) werden nicht unterstützt.
  • Abhängigkeiten, die mit Direct-URL-Verweisen installiert wurden, werden nicht unterstützt. Dies schließt diejenigen ein, die aus lokalen Wheel-Dateien installiert wurden.
  • Abhängigkeiten, die über private Paketindizes installiert wurden, werden nicht unterstützt. Auf diese Weise installierte Pakete können nicht von Paketen unterschieden werden, die vom öffentlichen PyPI installiert wurden.
  • Die Abhängigkeitsermittlung funktioniert nicht in einer Python-Shell. Nur Python-Skripts, IPython-Shell und Jupyter-Notizbücher werden unterstützt.

Beispiele

Im folgenden Beispiel wird die automatische Abhängigkeitsverwaltung mit lokalen Modulen und PyPI-Paketen veranschaulicht. In diesem Beispiel ist es erforderlich, dass Sie simplejson und dice installiert haben (unter Verwendung von pip install simplejson dice).

Erstellen Sie zunächst lokale Hilfsmodule:

# my_helper.py
def double(x):
    return 2 * x
# my_json.py
import simplejson

def loads(x):
    return simplejson.loads(x)

def dumps(x):
    return simplejson.dumps(x)

Importieren Sie dann in Ihrem Hauptskript diese Module, und verwenden Sie sie in UDFs:

# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
    return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
    "doubled": double_and_json_parse(col("id")),
    "summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Protokollierung

Um ermittelte Abhängigkeiten auszugeben, legen Sie die SPARK_CONNECT_LOG_LEVEL Umgebungsvariable auf info oder .debug Alternativ können Sie das Python-Protokollierungsframework konfigurieren:

import logging
logging.basicConfig(level=logging.INFO)

Die relevanten Protokolle werden vom databricks.connect.auto_dependencies Modul ausgegeben, z. B.:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Python-Basisumgebung

UDFs werden auf den Databricks-Berechnungen und nicht auf dem Client ausgeführt. Die Basis-Python-Umgebung, in der UDFs ausgeführt werden, hängt von der Berechnung von Databricks ab.

Bei Clustern ist die Python-Basisumgebung die Python-Umgebung der Databricks-Runtime-Version, die auf dem Cluster ausgeführt wird. Die Python-Version und die Liste der Pakete in dieser Basisumgebung finden Sie in den Abschnitten " Systemumgebung " und " Installed Python libraries " der Versionshinweise zu Databricks Runtime.

Bei serverloser Berechnung entspricht die Basis-Python-Umgebung der serverlosen Umgebungsversion gemäß der folgenden Tabelle. Databricks Connect-Versionen, die in dieser Tabelle nicht aufgeführt sind, unterstützen serverlos noch nicht oder haben das Ende des Supports erreicht. Siehe Versionsunterstützungsmatrix und End-of-Support Databricks Connect-Versionen.

Databricks Connect-Version Serverlose UDF-Umgebung
18.0, Python 3.12 Version 5
17.2 bis 17.3, Python 3.12 Version 4
16.4.1 bis unter 17, Python 3.12 Version 3
15.4.10 bis unter 16, Python 3.12 Version 3
15.4.10 bis unter 16, Python 3.11 Version 2