register (UDFRegistration)

Registra una función Python (incluidas las funciones lambda) o una función definida por el usuario como una función SQL.

Sintaxis

register(name, f, returnType=None)

Parámetros

Parámetro Tipo Descripción
name str Nombre de la función definida por el usuario en instrucciones SQL.
f function, udf, o pandas_udf Una función Python o una función definida por el usuario. La función definida por el usuario puede ser row-at-a-time o vectorized.
returnType DataType o str, opcional Tipo de valor devuelto de la función definida por el usuario registrada. Puede ser un DataType objeto o una cadena de tipo con formato DDL. Solo es válido cuando f es una función de Python sin formato, no cuando f ya es una función definida por el usuario.

Devoluciones

function

Notas

Para registrar una función de Python no determinista, cree primero una función definida por el usuario no determinista para la función Python y, a continuación, regístrela como una función SQL.

Ejemplos

# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]

spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]

# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]

# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]

# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)

# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]

# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
    return v.sum()

spark.udf.register("sum_udf", sum_udf)
spark.sql(
    "SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]