Datakvalitet i materialiserade sjövyer

I medaljongarkitekturer måste du framtvinga datakvalitet i varje steg. Dålig datakvalitet kan leda till felaktiga insikter och driftineffektivitet.

Den här artikeln beskriver hur du implementerar datakvalitetskontroller i materialiserade lakevyer (MLV:er) i Microsoft Fabric.

Säkerställa datakvalitet

I materialiserade sjövyer (MLV:er) i Microsoft Fabric underhåller du datakvaliteten genom att definiera begränsningar för dina vyer. Utan tydliga kontroller kan mindre dataproblem öka bearbetningstiden eller leda till att pipelinen misslyckas.

När en rad bryter mot en begränsning kan du använda någon av följande åtgärder:

  • FAIL: Stoppar MLV-uppdateringen vid den första begränsningsöverträdelsen. Detta är standardbeteendet, även om du inte anger FAIL.

  • DROP: Fortsätter bearbetningen och tar bort poster som bryter mot villkoret. Släktträdsvisningen visar antalet förlorade poster.

Anmärkning

Om du definierar både DROP- och FAIL-åtgärder i en MLV har åtgärden FAIL företräde.

Definiera datakvalitetskontroller i en materialiserad sjövy

När du skapar en materialiserad sjövy kan du definiera begränsningar – datakvalitetsregler som verifierar varje rad under en uppdatering. En begränsning är ett booleskt uttryck som varje rad måste uppfylla. Rader som passerar skrivs till utdatatabellen. Rader som misslyckas hanteras enligt inställningen vid överträdelse: de tas antingen bort tyst eller gör att hela uppdateringen misslyckas.

I följande exempel definieras villkoret cust_blank, som kontrollerar om fältet customerName inte är null. Villkoret exkluderar rader med null customerName från bearbetning.

CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched  
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
    c.customerID,
    c.customerName,
    c.contact, 
    CASE  
       WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE  
       ELSE FALSE  
    END AS has_orders 
FROM bronze.customers c LEFT JOIN bronze.orders o 
ON c.customerID = o.customerID;

Systeminbyggda funktioner

Inbyggda Spark/SQL-funktioner som UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() och DATE_FORMAT() stöds fullt ut i alla MLV-kontexter för både CREATE- och LINEAGE-uppdatering.

CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)

Anmärkning

Systemfunktioner är det enklaste och mest tillförlitliga alternativet. De kräver ingen registrering, fungerar i alla kontexter och stöds fullt ut under LINEAGE-uppdateringen.

UDF:er – definierade och registrerade i samma notebook-fil

UDF:er som registrerats med spark.udf.register() i samma notebook-fil stöds för CREATE i alla kontexter. För LINEAGE-uppdatering stöds endast PySpark-kontexter eftersom UDF-definitionen körs som en del av den schemalagda notebook-körningen.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# UDF: Extract domain from email
def extract_email_domain(email):
    if email is None or '@' not in email:
        return None
    return email.split('@')[1]

# Registration
spark.udf.register(
    "udf_email_domain",
    extract_email_domain,
    StringType()
)

@fmlv.materialized_lake_view(
    name="udf_testing_silver.mlv_high_value_customers",
    comment="High-value customers identified by UDF criteria",
    table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
    return spark.sql("""
        SELECT 
            c.customer_id,
            c.name,
            c.email,
            udf_email_domain(c.email) as email_domain,
            c.segment,
            c.lifetime_value,
            total_transactions.total_amount,
            total_transactions.txn_count
        FROM udf_testing_bronze.customers c
        INNER JOIN (
            SELECT 
                customer_id,
                SUM(amount) as total_amount,
                COUNT(*) as txn_count
            FROM udf_testing_bronze.transactions
            WHERE udf_is_positive(amount)
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ) total_transactions ON c.customer_id = total_transactions.customer_id
        WHERE udf_validate_customer(c.email, c.age)
            AND c.segment IN ('premium', 'vip')
    """)

print("✓ Created mlv_high_value_customers")

Bibliotek från tredje part – Pandas UDF:er

Bibliotek från tredje part som Pandas UDF:er tillåter att datakvalitetsregler implementeras med vektoriserad bearbetning. De möjliggör avancerade valideringar som anpassad affärslogik, statistiska kontroller eller mönsteridentifiering som inte är möjliga med inbyggda funktioner. Detta hjälper dig att skapa skalbara och återanvändbara datakvalitetsbegränsningar när MLV skapas och uppdateras.

  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  import pandas as pd
  from pyspark.sql.types import BooleanType
  def pandas_check_impl(val):
    # Reject if value < median of [100, 200, 300]
    return val >= pd.Series([100, 200, 300]).median()
  spark.udf.register("pandas_check", pandas_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_pandas",
    comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
  )
  @fmlv.check(
    name="dq_pandas_check",
    condition="pandas_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_pandas():
    # Define the function

    # Register the function as a Spark UDF

    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


  df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
  # All amounts should be >= median (200)
  assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
  print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")

Anpassade bibliotek – Python Wheel (.whl)

Funktioner som paketeras som jar-filer eller hjulfiler kan installeras i Infrastrukturklustret (via miljöinställningar) och användas i MLV-definitioner. CREATE och LINEAGE REFRESH stöds för PySpark-kontexter. Mer information finns i Hantera anpassade bibliotek i Fabric-miljöer.

  %%pyspark
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  from pyspark.sql.types import BooleanType
  from custom_dq_lib import threshold_check
  def custom_check_impl(val):
    return threshold_check(val, threshold=200)
  spark.udf.register("custom_check", custom_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
    comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
    replace=True
  )
  @fmlv.check(
    name="dq_custom_check",
    condition="custom_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_custom():
    # Wrap the custom function as Spark UDF


    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


    df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
    # All amounts should be >= threshold (200)
    assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
    print("PySpark MLV INNER JOIN with custom DQ library passed")

Fabric-användardatafunktioner

Användardatafunktioner i Fabric (UDF) definieras och hanteras centralt i Fabric-arbetsytan. De är tillgängliga för alla notebooks eller pipelines utan att behöva registreras om per session, vilket gör dem idealiska för MLV-pipelines för produktion. Den här funktionen stöds endast i PySpark-kontexter för både CREATE- och LINEAGE-uppdatering. Läs mer om Användardatafunktioner här. För mer information, se Översikt över Fabric-användardatafunktioner.

  %%pyspark
  import fmlv
  from pyspark.sql import functions as F
  from notebookutils import udf

  myFuncs = udf.getFunctions("UserDataFunction_1")

  def add_greeting_column(df):
    pdf = df.toPandas()
    pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
    return spark.createDataFrame(pdf)
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

  # -------------------------------------------------------------
  # Define base PySpark MLV (no SQL)
  # -------------------------------------------------------------
  @fmlv.materialized_lake_view(
    name="silver.base_pysparkmlv",
    comment="Base MLV created using PySpark",
    replace=True
  )
  def base_pysparkmlv():

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("country", StringType(), True)
    ])

    data = [
        (1, "Alice", 100.0, "US"),
        (2, "Bob", 200.0, "UK"),
        (3, "Charlie", 300.0, "UK")
    ]

    return spark.createDataFrame(data, schema)

  @fmlv.materialized_lake_view(
    name="silver.mlv_udfn_null_test",
    replace=False
  )
  @fmlv.check(
    name="null_check",
    condition="greeting IS NOT NULL",
    action="FAIL"
  )
  def mlv_udfn_null_test():
    df = spark.table("silver.base_pysparkmlv")
    return add_greeting_column(df)