Condividi tramite


Qualità dei dati nelle viste lake materializzate

Nelle architetture medallion è necessario applicare la qualità dei dati in ogni fase. Una scarsa qualità dei dati può portare a intuizioni errate e inefficienze operative.

Questo articolo illustra come implementare verifiche di qualità dei dati in viste lake materializzate (MLV) in Microsoft Fabric.

Implementare la qualità dei dati

Nella vista materializzata del lago (MLV) in Microsoft Fabric, si mantiene la qualità dei dati definendo vincoli sulla vista. Senza controlli espliciti, i problemi di dati secondari possono aumentare il tempo di elaborazione o interrompere la pipeline.

Quando una riga viola un vincolo, è possibile usare una di queste azioni:

  • FAIL: arresta l'aggiornamento MLV alla prima violazione del vincolo. Si tratta del comportamento predefinito, anche quando non si specifica FAIL.

    Attenzione

    La creazione o l'aggiornamento di un MLV con un'azione FAIL può comportare un errore di "tabella delta non trovata". In questo caso, ricreare MLV ed evitare di eseguire l'azione FAIL.

  • DROP: continua l'elaborazione e rimuove i record che violano il vincolo. La visualizzazione derivazione mostra il numero di record eliminati.

Annotazioni

Se si definiscono sia le azioni DROP che FAIL in un MLV, l'azione FAIL ha la precedenza.

Definire controlli di qualità dei dati in una vista lake materializzata

DQ MLV (Data Quality MLV): MLV che include uno o più vincoli di qualità dei dati dichiarati in una clausola WITH DATA QUALITY. Ogni vincolo definisce un'espressione booleana e un'azione da eseguire quando una riga la viola. Le righe che passano vengono scritte nella tabella di output; le righe con errori vengono eliminate silenziosamente o causano l'interruzione dell'intero aggiornamento, a seconda dell'impostazione in caso di violazione. Nell'esempio seguente viene definito il vincolo cust_blank, che controlla se il customerName campo non è Null. Il vincolo esclude le righe con un valore Null customerName dall'elaborazione.

La tabella riepiloga le operazioni supportate. '—' indica che l'operazione non è supportata. 'N/A' indica che il tipo di funzione non è applicabile a tale contesto.

Scenario Descrizione Creazione di SpSQL Non-DQ SpSQL Non-DQ LINEAGE Creazione PySpark Non-DQ Lineage non DQ PySpark Creazione DQ spSQL SpSQL DQ LINEAGE Creazione di DQ in PySpark PySpark DQ LINEAGE
Funzioni di sistema Incorporato: UPPER, LOWER, COALESCE e così via.
UDF - Stessa NB spark.udf.register() nello stesso notebook - -
Funzione definita dall'utente - Diff NB Funzione definita dall'utente in NB separata, registrata nello stesso NB - -
Librerie di terze parti Funzione definita dall'utente vettorializzata nel framework pandas - -
Ruota personalizzata Funzione inserita in un pacchetto nel file wheel python - -
Jar personalizzato Funzione compilata in Java/Scala JAR - -
Libreria PyPI Funzione dal pacchetto PyPI pubblico N/A -
Funzione Definita dall'Utente di Fabric Funzione definita in Funzioni dati utente di Fabric N/A N/A N/A N/A

Funzioni predefinite del sistema

Le funzioni Spark/SQL predefinite, ad esempio UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() e DATE_FORMAT() sono completamente supportate in tutti i contesti MLV per l'aggiornamento CREATE e LINEAGE.

CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al')
ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)

Annotazioni

Le funzioni di sistema sono l'opzione più semplice e affidabile. Non richiedono alcuna registrazione, funzionano in tutti i contesti e sono completamente supportati durante l'aggiornamento LINEAGE.

UDF (Funzioni definite dall'utente) – Definite e registrate nello stesso notebook

Le funzioni definite dall'utente registrate con spark.udf.register() nello stesso notebook sono supportate per CREATE in tutti i contesti. Per l'aggiornamento LINEAGE, sono supportati solo i contesti PySpark perché la definizione della UDF viene eseguita come parte dell'esecuzione programmata del notebook.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# UDF: Extract domain from email
def extract_email_domain(email):
    if email is None or '@' not in email:
        return None
    return email.split('@')[1]

# Registration
spark.udf.register(
    "udf_email_domain",
    extract_email_domain,
    StringType()
)

@fmlv.materialized_lake_view(
    name="udf_testing_silver.mlv_high_value_customers",
    comment="High-value customers identified by UDF criteria",
    table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
    return spark.sql("""
        SELECT 
            c.customer_id,
            c.name,
            c.email,
            udf_email_domain(c.email) as email_domain,
            c.segment,
            c.lifetime_value,
            total_transactions.total_amount,
            total_transactions.txn_count
        FROM udf_testing_bronze.customers c
        INNER JOIN (
            SELECT 
                customer_id,
                SUM(amount) as total_amount,
                COUNT(*) as txn_count
            FROM udf_testing_bronze.transactions
            WHERE udf_is_positive(amount)
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ) total_transactions ON c.customer_id = total_transactions.customer_id
        WHERE udf_validate_customer(c.email, c.age)
            AND c.segment IN ('premium', 'vip')
    """)

print("✓ Created mlv_high_value_customers")

Librerie di terze parti - UDF Pandas

Le librerie di terze parti come Pandas UDF consentono l'implementazione di regole di qualità dei dati mediante elaborazione vettorializzata. Abilitano convalide avanzate, ad esempio la logica di business personalizzata, i controlli statistici o il rilevamento dei criteri che non sono possibili con le funzioni predefinite. Ciò consente di creare vincoli di qualità dei dati scalabili e riutilizzabili durante la creazione e l'aggiornamento di MLV.

  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  import pandas as pd
  from pyspark.sql.types import BooleanType
  def pandas_check_impl(val):
    # Reject if value < median of [100, 200, 300]
    return val >= pd.Series([100, 200, 300]).median()
  spark.udf.register("pandas_check", pandas_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_pandas",
    comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
  )
  @fmlv.check(
    name="dq_pandas_check",
    condition="pandas_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_pandas():
    # Define the function

    # Register the function as a Spark UDF

    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


  df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
  # All amounts should be >= median (200)
  assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
  print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")

Librerie personalizzate - Python Wheel (.whl)

Le funzioni impacchettate come file JAR o wheel possono essere installate nel cluster Fabric (tramite le impostazioni dell'ambiente) e usate nelle definizioni MLV. CREATE e lINEAGE REFRESH sono supportati per i contesti PySpark. Per ulteriori dettagli, vedere Gestire librerie personalizzate in ambienti Fabric.

  %%pyspark
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  from pyspark.sql.types import BooleanType
  from custom_dq_lib import threshold_check
  def custom_check_impl(val):
    return threshold_check(val, threshold=200)
  spark.udf.register("custom_check", custom_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
    comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
    replace=True
  )
  @fmlv.check(
    name="dq_custom_check",
    condition="custom_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_custom():
    # Wrap the custom function as Spark UDF


    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


    df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
    # All amounts should be >= threshold (200)
    assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
    print("PySpark MLV INNER JOIN with custom DQ library passed")

Funzioni dati utente di Fabric

Le User Data Functions (UDF) di Fabric sono definite e gestite centralmente nel workspace di Fabric. Sono disponibili per qualsiasi notebook o pipeline senza dover essere nuovamente registrati per ogni sessione, rendendoli ideali per le pipeline MLV di produzione. Questa funzionalità è supportata solo nei contesti PySpark per l'aggiornamento CREATE e LINEAGE. Altre informazioni sulle funzioni dati utente sono disponibili qui. Per altre informazioni, vedere Panoramica di Funzioni dati utente di Fabric.

  %%pyspark
  import fmlv
  from pyspark.sql import functions as F
  from notebookutils import udf

  myFuncs = udf.getFunctions("UserDataFunction_1")

  def add_greeting_column(df):
    pdf = df.toPandas()
    pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
    return spark.createDataFrame(pdf)
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

  # -------------------------------------------------------------
  # Define base PySpark MLV (no SQL)
  # -------------------------------------------------------------
  @fmlv.materialized_lake_view(
    name="silver.base_pysparkmlv",
    comment="Base MLV created using PySpark",
    replace=True
  )
  def base_pysparkmlv():

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("country", StringType(), True)
    ])

    data = [
        (1, "Alice", 100.0, "US"),
        (2, "Bob", 200.0, "UK"),
        (3, "Charlie", 300.0, "UK")
    ]

    return spark.createDataFrame(data, schema)

  @fmlv.materialized_lake_view(
    name="silver.mlv_udfn_null_test",
    replace=False
  )
  @fmlv.check(
    name="null_check",
    condition="greeting IS NOT NULL",
    action="FAIL"
  )
  def mlv_udfn_null_test():
    df = spark.table("silver.base_pysparkmlv")
    return add_greeting_column(df)