register (UDFRegistration)

Inscrit une fonction Python (y compris les fonctions lambda) ou une fonction définie par l’utilisateur en tant que fonction SQL.

Syntaxe

register(name, f, returnType=None)

Paramètres

Paramètre Type Description
name str Nom de la fonction définie par l’utilisateur dans les instructions SQL.
f function, ou udfpandas_udf Une fonction Python ou une fonction définie par l’utilisateur. La fonction définie par l’utilisateur peut être row-at-a-time ou vectorisée.
returnType DataType ou str, facultatif Type de retour de la fonction définie par l’utilisateur inscrite. Il peut s’agir d’un objet ou d’une DataType chaîne de type au format DDL. Valide uniquement lorsque f est une fonction Python simple, et non quand f est déjà une fonction définie par l’utilisateur.

Retours

function

Remarques

Pour inscrire une fonction Python non déterministe, commencez par générer une fonction définie par l’utilisateur non déterministe pour la fonction Python, puis inscrivez-la en tant que fonction SQL.

Exemples

# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]

spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]

# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]

# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]

# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)

# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]

# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
spark.sql(
    "SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]