Freigeben über


Datenqualität in materialisierten Seeansichten

In Medallion-Architekturen müssen Sie die Datenqualität in jeder Phase sicherstellen. Schlechte Datenqualität kann zu falschen Erkenntnissen und betrieblichen Ineffizienzen führen.

In diesem Artikel wird erläutert, wie Datenqualitätsprüfungen in materialisierten Seeansichten (MLVs) in Microsoft Fabric implementiert werden.

Implementieren der Datenqualität

In materialisierten Seeansichten (MLVs) in Microsoft Fabric behalten Sie die Datenqualität bei, indem Sie Einschränkungen für Ihre Ansichten definieren. Ohne explizite Überprüfungen können kleinere Datenprobleme die Verarbeitungszeit erhöhen oder die Pipeline fehlschlagen.

Wenn eine Zeile gegen eine Einschränkung verstößt, können Sie eine der folgenden Aktionen verwenden:

  • FAIL: Stoppt die MLV-Aktualisierung bei der ersten Einschränkungsverletzung. Dies ist das Standardverhalten, auch wenn Sie nicht angeben FAIL.

    Vorsicht

    Das Erstellen oder Aktualisieren eines MLV mit einer FAIL Aktion kann zu einem Fehler "Delta-Tabelle nicht gefunden" führen. Falls dies geschieht, erstellen Sie den MLV neu und vermeiden Sie diese FAIL Aktion.

  • DROP: Führt die Verarbeitung fort und entfernt Datensätze, die gegen die Einschränkung verstoßen. In der Linienansicht wird die Anzahl der verworfenen Datensätze angezeigt.

Hinweis

Wenn Sie sowohl DROP- als auch FAIL-Aktionen in einem MLV definieren, hat die FAIL-Aktion Vorrang.

Definieren von Datenqualitätsprüfungen in einer materialisierten Seeansicht

DQ MLV (Data Quality MLV): Ein MLV, der eine oder mehrere Datenqualitätseinschränkungen enthält, die in einer WITH DATA QUALITY-Klausel deklariert sind. Jede Einschränkung definiert einen booleschen Ausdruck und eine Aktion, die ausgeführt werden soll, wenn eine Zeile gegen sie verstößt. Erfolgreiche Zeilen werden in die Ausgabetabelle geschrieben; fehlerhafte Zeilen werden entweder stillschweigend verworfen oder der gesamte Aktualisierungsvorgang wird abgebrochen, abhängig von der Einstellung bei Verstoß. Im folgenden Beispiel wird die Einschränkung cust_blankdefiniert, die überprüft, ob das customerName Feld nicht NULL ist. Die Einschränkung schließt Zeilen aus, bei denen customerName null ist.

In der Tabelle werden unterstützte Vorgänge zusammengefasst. "—" bedeutet, dass der Vorgang nicht unterstützt wird. "N/A" bedeutet, dass der Funktionstyp für diesen Kontext nicht anwendbar ist.

Szenario Beschreibung SpSQL Non-DQ Erstellen SpSQL Non-DQ LINEAGE PySpark Non-DQ Create PySpark Non-DQ LINEAGE SpSQL DQ Create SpSQL DQ LINEAGE PySpark DQ Create PySpark DQ LINEAGE
Systemfunktionen Integriert: UPPER, LOWER, COALESCE, etc. Ja Ja Ja Ja Ja Ja Ja Ja
UDF - Gleiche NB spark.udf.register() in demselben Notebook Ja Ja Ja Ja Ja Ja
UDF - Diff NB UDF definiert in separatem NB, registriert in demselben NB Ja Ja Ja Ja Ja Ja
Bibliotheken von Drittanbietern Vektorisierte UDF mit Pandas-Framework Ja Ja Ja Ja Ja Ja
Benutzerdefiniertes Rad In Python-Raddatei verpackte Funktion Ja Ja Ja Ja Ja Ja
Benutzerdefinierte Jar-Datei In Java/Scala JAR kompilierte Funktion Ja Ja Ja Ja Ja Ja
PyPI-Bibliothek Funktion aus öffentlichem PyPI-Paket Ja N/A Ja Ja Ja Ja Ja
Fabric UDF In Fabric-Benutzerdatenfunktionen definierte Funktion N/A N/A Ja Ja N/A N/A Ja Ja

Integrierte Systemfunktionen

Integrierte Spark/SQL-Funktionen wie UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() und DATE_FORMAT() werden in allen MLV-Kontexten sowohl für CREATE- als auch LINEAGE-Aktualisierung vollständig unterstützt.

CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al')
ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)

Hinweis

Systemfunktionen sind die einfachste und zuverlässigste Option. Sie benötigen keine Registrierung, funktionieren in allen Kontexten und werden während der LINEAGE-Aktualisierung vollständig unterstützt.

UDFs – Definiert und im selben Notizbuch registriert

UDFs, die mit spark.udf.register() im selben Notizbuch registriert sind, werden für CREATE in allen Kontexten unterstützt. Für die LINEAGE-Aktualisierung werden nur PySpark-Kontexte unterstützt, da die UDF-Definition als Teil der geplanten Notizbuchausführung ausgeführt wird.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# UDF: Extract domain from email
def extract_email_domain(email):
    if email is None or '@' not in email:
        return None
    return email.split('@')[1]

# Registration
spark.udf.register(
    "udf_email_domain",
    extract_email_domain,
    StringType()
)

@fmlv.materialized_lake_view(
    name="udf_testing_silver.mlv_high_value_customers",
    comment="High-value customers identified by UDF criteria",
    table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
    return spark.sql("""
        SELECT 
            c.customer_id,
            c.name,
            c.email,
            udf_email_domain(c.email) as email_domain,
            c.segment,
            c.lifetime_value,
            total_transactions.total_amount,
            total_transactions.txn_count
        FROM udf_testing_bronze.customers c
        INNER JOIN (
            SELECT 
                customer_id,
                SUM(amount) as total_amount,
                COUNT(*) as txn_count
            FROM udf_testing_bronze.transactions
            WHERE udf_is_positive(amount)
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ) total_transactions ON c.customer_id = total_transactions.customer_id
        WHERE udf_validate_customer(c.email, c.age)
            AND c.segment IN ('premium', 'vip')
    """)

print("✓ Created mlv_high_value_customers")

Bibliotheken von Drittanbietern – Pandas UDFs

Drittanbieterbibliotheken wie Pandas UDFs ermöglichen die Implementierung von Datenqualitätsregeln mit vektorisierter Verarbeitung. Sie ermöglichen erweiterte Validierungen wie benutzerdefinierte Geschäftslogik, statistische Überprüfungen oder Mustererkennungen, die mit integrierten Funktionen nicht möglich sind. Dadurch können sie skalierbare und wiederverwendbare Datenqualitätseinschränkungen während der MLV-Erstellung und -Aktualisierung erstellen.

  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  import pandas as pd
  from pyspark.sql.types import BooleanType
  def pandas_check_impl(val):
    # Reject if value < median of [100, 200, 300]
    return val >= pd.Series([100, 200, 300]).median()
  spark.udf.register("pandas_check", pandas_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_pandas",
    comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
  )
  @fmlv.check(
    name="dq_pandas_check",
    condition="pandas_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_pandas():
    # Define the function

    # Register the function as a Spark UDF

    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


  df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
  # All amounts should be >= median (200)
  assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
  print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")

Benutzerdefinierte Bibliotheken – Python Wheel (.whl)

Funktionen, die als Jar-Dateien oder Raddateien verpackt sind, können im Fabric-Cluster (über Umgebungseinstellungen) installiert und in MLV-Definitionen verwendet werden. CREATE und lINEAGE REFRESH werden für PySpark-Kontexte unterstützt. Weitere Informationen finden Sie unter Verwalten von benutzerdefinierten Bibliotheken in Fabric-Umgebungen .

  %%pyspark
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  from pyspark.sql.types import BooleanType
  from custom_dq_lib import threshold_check
  def custom_check_impl(val):
    return threshold_check(val, threshold=200)
  spark.udf.register("custom_check", custom_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
    comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
    replace=True
  )
  @fmlv.check(
    name="dq_custom_check",
    condition="custom_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_custom():
    # Wrap the custom function as Spark UDF


    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


    df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
    # All amounts should be >= threshold (200)
    assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
    print("PySpark MLV INNER JOIN with custom DQ library passed")

Fabric-Benutzerdatenfunktionen

Fabric User Data Functions (UDFs) werden zentral definiert und im Fabric-Arbeitsbereich verwaltet. Sie sind für jedes Notebook oder jede Pipeline verfügbar, ohne dass sie pro Sitzung neu registriert werden müssen, wodurch sie ideal für Produktions-MLV-Pipelines geeignet sind. Dieses Feature wird nur in PySpark-Kontexten für die CREATE- und LINEAGE-Aktualisierungsvorgänge unterstützt. Weitere Informationen zu Benutzerdatenfunktionen finden Sie hier. Weitere Informationen finden Sie in der Übersicht über Fabric-Benutzerdatenfunktionen.

  %%pyspark
  import fmlv
  from pyspark.sql import functions as F
  from notebookutils import udf

  myFuncs = udf.getFunctions("UserDataFunction_1")

  def add_greeting_column(df):
    pdf = df.toPandas()
    pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
    return spark.createDataFrame(pdf)
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

  # -------------------------------------------------------------
  # Define base PySpark MLV (no SQL)
  # -------------------------------------------------------------
  @fmlv.materialized_lake_view(
    name="silver.base_pysparkmlv",
    comment="Base MLV created using PySpark",
    replace=True
  )
  def base_pysparkmlv():

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("country", StringType(), True)
    ])

    data = [
        (1, "Alice", 100.0, "US"),
        (2, "Bob", 200.0, "UK"),
        (3, "Charlie", 300.0, "UK")
    ]

    return spark.createDataFrame(data, schema)

  @fmlv.materialized_lake_view(
    name="silver.mlv_udfn_null_test",
    replace=False
  )
  @fmlv.check(
    name="null_check",
    condition="greeting IS NOT NULL",
    action="FAIL"
  )
  def mlv_udfn_null_test():
    df = spark.table("silver.base_pysparkmlv")
    return add_greeting_column(df)