Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In Medallion-Architekturen müssen Sie die Datenqualität in jeder Phase sicherstellen. Schlechte Datenqualität kann zu falschen Erkenntnissen und betrieblichen Ineffizienzen führen.
In diesem Artikel wird erläutert, wie Datenqualitätsprüfungen in materialisierten Seeansichten (MLVs) in Microsoft Fabric implementiert werden.
Implementieren der Datenqualität
In materialisierten Seeansichten (MLVs) in Microsoft Fabric behalten Sie die Datenqualität bei, indem Sie Einschränkungen für Ihre Ansichten definieren. Ohne explizite Überprüfungen können kleinere Datenprobleme die Verarbeitungszeit erhöhen oder die Pipeline fehlschlagen.
Wenn eine Zeile gegen eine Einschränkung verstößt, können Sie eine der folgenden Aktionen verwenden:
FAIL: Stoppt die MLV-Aktualisierung bei der ersten Einschränkungsverletzung. Dies ist das Standardverhalten, auch wenn Sie nicht angeben
FAIL.Vorsicht
Das Erstellen oder Aktualisieren eines MLV mit einer
FAILAktion kann zu einem Fehler "Delta-Tabelle nicht gefunden" führen. Falls dies geschieht, erstellen Sie den MLV neu und vermeiden Sie dieseFAILAktion.DROP: Führt die Verarbeitung fort und entfernt Datensätze, die gegen die Einschränkung verstoßen. In der Linienansicht wird die Anzahl der verworfenen Datensätze angezeigt.
Hinweis
Wenn Sie sowohl DROP- als auch FAIL-Aktionen in einem MLV definieren, hat die FAIL-Aktion Vorrang.
Definieren von Datenqualitätsprüfungen in einer materialisierten Seeansicht
DQ MLV (Data Quality MLV): Ein MLV, der eine oder mehrere Datenqualitätseinschränkungen enthält, die in einer WITH DATA QUALITY-Klausel deklariert sind. Jede Einschränkung definiert einen booleschen Ausdruck und eine Aktion, die ausgeführt werden soll, wenn eine Zeile gegen sie verstößt. Erfolgreiche Zeilen werden in die Ausgabetabelle geschrieben; fehlerhafte Zeilen werden entweder stillschweigend verworfen oder der gesamte Aktualisierungsvorgang wird abgebrochen, abhängig von der Einstellung bei Verstoß.
Im folgenden Beispiel wird die Einschränkung cust_blankdefiniert, die überprüft, ob das customerName Feld nicht NULL ist. Die Einschränkung schließt Zeilen aus, bei denen customerName null ist.
In der Tabelle werden unterstützte Vorgänge zusammengefasst. "—" bedeutet, dass der Vorgang nicht unterstützt wird. "N/A" bedeutet, dass der Funktionstyp für diesen Kontext nicht anwendbar ist.
| Szenario | Beschreibung | SpSQL Non-DQ Erstellen | SpSQL Non-DQ LINEAGE | PySpark Non-DQ Create | PySpark Non-DQ LINEAGE | SpSQL DQ Create | SpSQL DQ LINEAGE | PySpark DQ Create | PySpark DQ LINEAGE |
|---|---|---|---|---|---|---|---|---|---|
| Systemfunktionen | Integriert: UPPER, LOWER, COALESCE, etc. | Ja | Ja | Ja | Ja | Ja | Ja | Ja | Ja |
| UDF - Gleiche NB | spark.udf.register() in demselben Notebook | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| UDF - Diff NB | UDF definiert in separatem NB, registriert in demselben NB | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| Bibliotheken von Drittanbietern | Vektorisierte UDF mit Pandas-Framework | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| Benutzerdefiniertes Rad | In Python-Raddatei verpackte Funktion | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| Benutzerdefinierte Jar-Datei | In Java/Scala JAR kompilierte Funktion | Ja | — | Ja | Ja | Ja | — | Ja | Ja |
| PyPI-Bibliothek | Funktion aus öffentlichem PyPI-Paket | Ja | N/A | Ja | Ja | Ja | — | Ja | Ja |
| Fabric UDF | In Fabric-Benutzerdatenfunktionen definierte Funktion | N/A | N/A | Ja | Ja | N/A | N/A | Ja | Ja |
Integrierte Systemfunktionen
Integrierte Spark/SQL-Funktionen wie UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() und DATE_FORMAT() werden in allen MLV-Kontexten sowohl für CREATE- als auch LINEAGE-Aktualisierung vollständig unterstützt.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al')
ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Hinweis
Systemfunktionen sind die einfachste und zuverlässigste Option. Sie benötigen keine Registrierung, funktionieren in allen Kontexten und werden während der LINEAGE-Aktualisierung vollständig unterstützt.
UDFs – Definiert und im selben Notizbuch registriert
UDFs, die mit spark.udf.register() im selben Notizbuch registriert sind, werden für CREATE in allen Kontexten unterstützt. Für die LINEAGE-Aktualisierung werden nur PySpark-Kontexte unterstützt, da die UDF-Definition als Teil der geplanten Notizbuchausführung ausgeführt wird.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Bibliotheken von Drittanbietern – Pandas UDFs
Drittanbieterbibliotheken wie Pandas UDFs ermöglichen die Implementierung von Datenqualitätsregeln mit vektorisierter Verarbeitung. Sie ermöglichen erweiterte Validierungen wie benutzerdefinierte Geschäftslogik, statistische Überprüfungen oder Mustererkennungen, die mit integrierten Funktionen nicht möglich sind. Dadurch können sie skalierbare und wiederverwendbare Datenqualitätseinschränkungen während der MLV-Erstellung und -Aktualisierung erstellen.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Benutzerdefinierte Bibliotheken – Python Wheel (.whl)
Funktionen, die als Jar-Dateien oder Raddateien verpackt sind, können im Fabric-Cluster (über Umgebungseinstellungen) installiert und in MLV-Definitionen verwendet werden. CREATE und lINEAGE REFRESH werden für PySpark-Kontexte unterstützt. Weitere Informationen finden Sie unter Verwalten von benutzerdefinierten Bibliotheken in Fabric-Umgebungen .
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Fabric-Benutzerdatenfunktionen
Fabric User Data Functions (UDFs) werden zentral definiert und im Fabric-Arbeitsbereich verwaltet. Sie sind für jedes Notebook oder jede Pipeline verfügbar, ohne dass sie pro Sitzung neu registriert werden müssen, wodurch sie ideal für Produktions-MLV-Pipelines geeignet sind. Dieses Feature wird nur in PySpark-Kontexten für die CREATE- und LINEAGE-Aktualisierungsvorgänge unterstützt. Weitere Informationen zu Benutzerdatenfunktionen finden Sie hier. Weitere Informationen finden Sie in der Übersicht über Fabric-Benutzerdatenfunktionen.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)