Del via


mapInPandas

Maps an iterator of batches in the current DataFrame using a Python native function that is performed on pandas DataFrames both as input and output, and returns the result as a DataFrame.

Syntax

mapInPandas(func: "PandasMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None)

Parameters

Parameter Type Description
func function a Python native function that takes an iterator of pandas.DataFrames, and outputs an iterator of pandas.DataFrames.
schema DataType or str the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.
barrier bool, optional, default False Use barrier mode execution, ensuring that all Python workers in the stage will be launched concurrently.
profile ResourceProfile, optional The optional ResourceProfile to be used for mapInPandas.

Returns

DataFrame

Examples

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

def mean_age(iterator):
    for pdf in iterator:
        yield pdf.groupby("id").mean().reset_index()

df.mapInPandas(mean_age, "id: bigint, age: double").show()
# +---+----+
# | id| age|
# +---+----+
# |  1|21.0|
# |  2|30.0|
# +---+----+

df.mapInPandas(filter_func, df.schema, barrier=True).collect()
# [Row(id=1, age=21)]