Del via


dropDuplicatesWithinWatermark

Return a new DataFrame with duplicate rows removed, optionally only considering certain columns, within watermark.

Syntax

dropDuplicatesWithinWatermark(subset: Optional[List[str]] = None)

Parameters

Parameter Type Description
subset List of column names, optional List of columns to use for duplicate comparison (default All columns).

Returns

DataFrame: DataFrame without duplicates.

Notes

This only works with streaming DataFrame, and watermark for the input DataFrame must be set via withWatermark.

For a streaming DataFrame, this will keep all data across triggers as intermediate state to drop duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated as long as the time distance of earliest and latest events are smaller than the delay threshold of watermark." Users are encouraged to set the delay threshold of watermark longer than max timestamp differences among duplicated events.

Note: too late data older than watermark will be dropped.

Supports Spark Connect.

Examples

from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
    "value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]

df.dropDuplicatesWithinWatermark()

df.dropDuplicatesWithinWatermark(['value'])