Merk
Tilgang til denne siden krever autorisasjon. Du kan prøve å logge på eller endre kataloger.
Tilgang til denne siden krever autorisasjon. Du kan prøve å endre kataloger.
I medaljongarkitekturer må du håndheve datakvalitet på alle trinn. Dårlig datakvalitet kan føre til feil innsikt og operasjonell ineffektivitet.
Denne artikkelen forklarer hvordan man implementerer datakvalitetskontroller i materialiserte lake views (MLV-er) i Microsoft Fabric.
Implementere datakvalitet
I materialiserte lake-visninger (MLV) i Microsoft Fabric opprettholder du datakvaliteten ved å definere begrensninger på visningene dine. Uten eksplisitte kontroller kan mindre dataproblemer øke behandlingstiden eller få pipelinen til å feile.
Når en rad bryter en begrensning, kan du bruke en av disse handlingene:
FAIL: Stopper MLV-oppdatering ved første brudd på begrensningen. Dette er standardoppførselen, selv når du ikke spesifiserer
FAIL.DROP: Fortsetter behandlingen og fjerner poster som bryter begrensningen. Linjevisningen viser antall droppede poster.
Merk deg
Hvis du definerer både DROP- og FAIL-handlinger i en MLV, har FAIL-handlingen forrang.
Definer datakvalitetskontroller i en materialisert innsjøvisning
Når du lager en materialisert innsjøvisning, kan du definere begrensninger — datakvalitetsregler som validerer hver rad under en oppdatering. En begrensning er et boolsk uttrykk som hver rad må oppfylle. Radene som passerer skrives til utdatatabellen. Rader som feiler håndteres i henhold til on-viol-innstillingen: de droppes enten stille eller fører til at hele oppdateringen feiler.
Følgende eksempel definerer betingelsen cust_blank, som kontrollerer om customerName feltet ikke er null. Begrensningen utelukker rader med null customerName fra behandling.
CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
c.customerID,
c.customerName,
c.contact,
CASE
WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE
ELSE FALSE
END AS has_orders
FROM bronze.customers c LEFT JOIN bronze.orders o
ON c.customerID = o.customerID;
Systemintegrerte funksjoner
Innebygde Spark/SQL-funksjoner som UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() og DATE_FORMAT() støttes fullt ut i alle MLV-kontekster for både CREATE og LINEAGE refresh.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Merk deg
Systemfunksjoner er det enkleste og mest pålitelige alternativet. De krever ingen registrering, fungerer i alle sammenhenger og støttes fullt ut under LINEAGE-oppdatering.
UDF-er – Definert og registrert i samme notatbok
UDF-er registrert med spark.udf.register() i samme notatbok støttes for CREATE i alle kontekster. For LINEAGE-oppdatering støttes kun PySpark-kontekster fordi UDF-definisjonen kjører som en del av den planlagte notatbokutførelsen.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Tredjepartsbiblioteker – Pandas UDF-er
Tredjepartsbiblioteker som Pandas UDF-er gjør det mulig å implementere datakvalitetsregler med vektorisert behandling. De muliggjør avanserte valideringer som tilpasset forretningslogikk, statistiske kontroller eller mønstergjenkjenning som ikke er mulig med innebygde funksjoner. Dette bidrar til å bygge skalerbare og gjenbrukbare datakvalitetsbegrensninger under MLV-opprettelse og oppdatering.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Egendefinerte biblioteker – Python Wheel (.whl)
Funksjoner pakket som jar-filer eller hjulfiler kan installeres i Fabric-klyngen (via miljøinnstillinger) og brukes i MLV-definisjoner. CREATE og LINEAGE REFRESH støttes for PySpark-kontekster. Se Administrer tilpassede biblioteker i Fabric-miljøer for mer informasjon.
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Funksjoner for brukerdata for stoff
Fabric User Data Functions (UDF) er sentralt definert og administreres i Fabric-arbeidsområdet. De er tilgjengelige for enhver bærbar PC eller pipeline uten å måtte registreres på nytt per økt, noe som gjør dem ideelle for produksjons-MLV-pipelines. Denne funksjonen støttes kun i PySpark-kontekster for både CREATE og LINEAGE refresh. Lær mer om User Data Functions her. For mer informasjon, se oversikt over Fabric User Data Functions.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)