Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Inscrit une fonction Python (y compris les fonctions lambda) ou une fonction définie par l’utilisateur en tant que fonction SQL.
Syntaxe
register(name, f, returnType=None)
Paramètres
| Paramètre | Type | Description |
|---|---|---|
name |
str | Nom de la fonction définie par l’utilisateur dans les instructions SQL. |
f |
function, ou udfpandas_udf |
Une fonction Python ou une fonction définie par l’utilisateur. La fonction définie par l’utilisateur peut être row-at-a-time ou vectorisée. |
returnType |
DataType ou str, facultatif | Type de retour de la fonction définie par l’utilisateur inscrite. Il peut s’agir d’un objet ou d’une DataType chaîne de type au format DDL. Valide uniquement lorsque f est une fonction Python simple, et non quand f est déjà une fonction définie par l’utilisateur. |
Retours
function
Remarques
Pour inscrire une fonction Python non déterministe, commencez par générer une fonction définie par l’utilisateur non déterministe pour la fonction Python, puis inscrivez-la en tant que fonction SQL.
Exemples
# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]
spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]
# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]
# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]
# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)
# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
return v.sum()
spark.udf.register("sum_udf", sum_udf)
spark.sql(
"SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]