Del via


Datakvalitet i materialisert utsikt over innsjøen

I medaljongarkitekturer må du håndheve datakvalitet på alle trinn. Dårlig datakvalitet kan føre til feil innsikt og operasjonell ineffektivitet.

Denne artikkelen forklarer hvordan man implementerer datakvalitetskontroller i materialiserte lake views (MLV-er) i Microsoft Fabric.

Implementere datakvalitet

I materialiserte lake-visninger (MLV) i Microsoft Fabric opprettholder du datakvaliteten ved å definere begrensninger på visningene dine. Uten eksplisitte kontroller kan mindre dataproblemer øke behandlingstiden eller få pipelinen til å feile.

Når en rad bryter en begrensning, kan du bruke en av disse handlingene:

  • FAIL: Stopper MLV-oppdatering ved første brudd på begrensningen. Dette er standardoppførselen, selv når du ikke spesifiserer FAIL.

  • DROP: Fortsetter behandlingen og fjerner poster som bryter begrensningen. Linjevisningen viser antall droppede poster.

Merk deg

Hvis du definerer både DROP- og FAIL-handlinger i en MLV, har FAIL-handlingen forrang.

Definer datakvalitetskontroller i en materialisert innsjøvisning

Når du lager en materialisert innsjøvisning, kan du definere begrensninger — datakvalitetsregler som validerer hver rad under en oppdatering. En begrensning er et boolsk uttrykk som hver rad må oppfylle. Radene som passerer skrives til utdatatabellen. Rader som feiler håndteres i henhold til on-viol-innstillingen: de droppes enten stille eller fører til at hele oppdateringen feiler.

Følgende eksempel definerer betingelsen cust_blank, som kontrollerer om customerName feltet ikke er null. Begrensningen utelukker rader med null customerName fra behandling.

CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched  
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
    c.customerID,
    c.customerName,
    c.contact, 
    CASE  
       WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE  
       ELSE FALSE  
    END AS has_orders 
FROM bronze.customers c LEFT JOIN bronze.orders o 
ON c.customerID = o.customerID;

Systemintegrerte funksjoner

Innebygde Spark/SQL-funksjoner som UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() og DATE_FORMAT() støttes fullt ut i alle MLV-kontekster for både CREATE og LINEAGE refresh.

CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)

Merk deg

Systemfunksjoner er det enkleste og mest pålitelige alternativet. De krever ingen registrering, fungerer i alle sammenhenger og støttes fullt ut under LINEAGE-oppdatering.

UDF-er – Definert og registrert i samme notatbok

UDF-er registrert med spark.udf.register() i samme notatbok støttes for CREATE i alle kontekster. For LINEAGE-oppdatering støttes kun PySpark-kontekster fordi UDF-definisjonen kjører som en del av den planlagte notatbokutførelsen.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# UDF: Extract domain from email
def extract_email_domain(email):
    if email is None or '@' not in email:
        return None
    return email.split('@')[1]

# Registration
spark.udf.register(
    "udf_email_domain",
    extract_email_domain,
    StringType()
)

@fmlv.materialized_lake_view(
    name="udf_testing_silver.mlv_high_value_customers",
    comment="High-value customers identified by UDF criteria",
    table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
    return spark.sql("""
        SELECT 
            c.customer_id,
            c.name,
            c.email,
            udf_email_domain(c.email) as email_domain,
            c.segment,
            c.lifetime_value,
            total_transactions.total_amount,
            total_transactions.txn_count
        FROM udf_testing_bronze.customers c
        INNER JOIN (
            SELECT 
                customer_id,
                SUM(amount) as total_amount,
                COUNT(*) as txn_count
            FROM udf_testing_bronze.transactions
            WHERE udf_is_positive(amount)
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ) total_transactions ON c.customer_id = total_transactions.customer_id
        WHERE udf_validate_customer(c.email, c.age)
            AND c.segment IN ('premium', 'vip')
    """)

print("✓ Created mlv_high_value_customers")

Tredjepartsbiblioteker – Pandas UDF-er

Tredjepartsbiblioteker som Pandas UDF-er gjør det mulig å implementere datakvalitetsregler med vektorisert behandling. De muliggjør avanserte valideringer som tilpasset forretningslogikk, statistiske kontroller eller mønstergjenkjenning som ikke er mulig med innebygde funksjoner. Dette bidrar til å bygge skalerbare og gjenbrukbare datakvalitetsbegrensninger under MLV-opprettelse og oppdatering.

  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  import pandas as pd
  from pyspark.sql.types import BooleanType
  def pandas_check_impl(val):
    # Reject if value < median of [100, 200, 300]
    return val >= pd.Series([100, 200, 300]).median()
  spark.udf.register("pandas_check", pandas_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_pandas",
    comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
  )
  @fmlv.check(
    name="dq_pandas_check",
    condition="pandas_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_pandas():
    # Define the function

    # Register the function as a Spark UDF

    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


  df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
  # All amounts should be >= median (200)
  assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
  print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")

Egendefinerte biblioteker – Python Wheel (.whl)

Funksjoner pakket som jar-filer eller hjulfiler kan installeres i Fabric-klyngen (via miljøinnstillinger) og brukes i MLV-definisjoner. CREATE og LINEAGE REFRESH støttes for PySpark-kontekster. Se Administrer tilpassede biblioteker i Fabric-miljøer for mer informasjon.

  %%pyspark
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  from pyspark.sql.types import BooleanType
  from custom_dq_lib import threshold_check
  def custom_check_impl(val):
    return threshold_check(val, threshold=200)
  spark.udf.register("custom_check", custom_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
    comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
    replace=True
  )
  @fmlv.check(
    name="dq_custom_check",
    condition="custom_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_custom():
    # Wrap the custom function as Spark UDF


    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


    df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
    # All amounts should be >= threshold (200)
    assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
    print("PySpark MLV INNER JOIN with custom DQ library passed")

Funksjoner for brukerdata for stoff

Fabric User Data Functions (UDF) er sentralt definert og administreres i Fabric-arbeidsområdet. De er tilgjengelige for enhver bærbar PC eller pipeline uten å måtte registreres på nytt per økt, noe som gjør dem ideelle for produksjons-MLV-pipelines. Denne funksjonen støttes kun i PySpark-kontekster for både CREATE og LINEAGE refresh. Lær mer om User Data Functions her. For mer informasjon, se oversikt over Fabric User Data Functions.

  %%pyspark
  import fmlv
  from pyspark.sql import functions as F
  from notebookutils import udf

  myFuncs = udf.getFunctions("UserDataFunction_1")

  def add_greeting_column(df):
    pdf = df.toPandas()
    pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
    return spark.createDataFrame(pdf)
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

  # -------------------------------------------------------------
  # Define base PySpark MLV (no SQL)
  # -------------------------------------------------------------
  @fmlv.materialized_lake_view(
    name="silver.base_pysparkmlv",
    comment="Base MLV created using PySpark",
    replace=True
  )
  def base_pysparkmlv():

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("country", StringType(), True)
    ])

    data = [
        (1, "Alice", 100.0, "US"),
        (2, "Bob", 200.0, "UK"),
        (3, "Charlie", 300.0, "UK")
    ]

    return spark.createDataFrame(data, schema)

  @fmlv.materialized_lake_view(
    name="silver.mlv_udfn_null_test",
    replace=False
  )
  @fmlv.check(
    name="null_check",
    condition="greeting IS NOT NULL",
    action="FAIL"
  )
  def mlv_udfn_null_test():
    df = spark.table("silver.base_pysparkmlv")
    return add_greeting_column(df)