Del via


observe

Define (named) metrics to observe on the DataFrame. This method returns an 'observed' DataFrame that returns the same result as the input, with the following guarantees: It will compute the defined aggregates (metrics) on all the data that is flowing through the Dataset at that point. It will report the value of the defined aggregate columns as soon as we reach a completion point.

Syntax

observe(observation: Union["Observation", str], *exprs: Column)

Parameters

Parameter Type Description
observation Observation or str str to specify the name, or an Observation instance to obtain the metric.
exprs Column column expressions (Column).

Returns

DataFrame: the observed DataFrame.

Notes

When observation is Observation, this method only supports batch queries. When observation is a string, this method works for both batch and streaming queries. Continuous execution is currently not supported yet.

Examples

from pyspark.sql import Observation, functions as sf
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
observation = Observation("my metrics")
observed_df = df.observe(observation,
    sf.count(sf.lit(1)).alias("count"), sf.max("age"))
observed_df.count()
# 2
observation.get
# {'count': 2, 'max(age)': 5}