Qualité des données dans les vues matérialisées du lac

Dans les architectures de médaillon, vous devez appliquer la qualité des données à chaque étape. Une mauvaise qualité des données peut entraîner des insights incorrects et des inefficacités opérationnelles.

Cet article explique comment implémenter des contrôles de qualité des données dans des vues de lac matérialisées (MLV) dans Microsoft Fabric.

Implémenter la qualité des données

Dans les vues de lac matérialisées (MLV) dans Microsoft Fabric, vous conservez la qualité des données en définissant des contraintes sur vos vues. Sans vérifications explicites, les problèmes de données mineurs peuvent augmenter le temps de traitement ou échouer le pipeline.

Lorsqu’une ligne enfreint une contrainte, vous pouvez utiliser l’une des actions suivantes :

  • FAIL : arrête l’actualisation MLV lors de la première violation de contrainte. Il s’agit du comportement par défaut, même lorsque vous ne spécifiez FAILpas .

  • DROP : poursuit le traitement et supprime les enregistrements qui violent la contrainte. La vue traçabilité affiche le nombre d’enregistrements supprimés.

Remarque

Si vous définissez à la fois les actions DROP et FAIL dans une bibliothèque MLV, l’action FAIL est prioritaire.

Définir des vérifications de qualité des données dans une vue matérialisée du lac

Lorsque vous créez une vue matérialisée du lac, vous pouvez définir des contraintes ( règles de qualité des données qui valident chaque ligne lors d’une actualisation). Une contrainte est une expression booléenne que chaque ligne doit satisfaire. Les lignes qui passent sont écrites dans la table de sortie. Les lignes qui échouent sont gérées en fonction du paramètre de violation : elles sont supprimées en mode silencieux ou provoquent l’échec de l’actualisation entière.

L’exemple suivant définit la contrainte cust_blank, qui vérifie si le customerName champ n’est pas null. La contrainte exclut les lignes avec une valeur Null customerName du traitement.

CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched  
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
    c.customerID,
    c.customerName,
    c.contact, 
    CASE  
       WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE  
       ELSE FALSE  
    END AS has_orders 
FROM bronze.customers c LEFT JOIN bronze.orders o 
ON c.customerID = o.customerID;

Fonctions intégrées du système

Les fonctions Spark/SQL intégrées telles que UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() et DATE_FORMAT() sont entièrement prises en charge dans tous les contextes MLV pour l’actualisation CREATE et LINEAGE.

CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)

Remarque

Les fonctions système sont l’option la plus simple et la plus fiable. Ils ne nécessitent aucune inscription, fonctionnent dans tous les contextes et sont entièrement pris en charge pendant l’actualisation de LINEAGE.

UDFs – définies et enregistrées dans le même notebook

Les UDFs enregistrées avec spark.udf.register() dans le même notebook sont prises en charge pour l'instruction CREATE dans tous les contextes. Pour l’actualisation LINEAGE, seuls les contextes PySpark sont pris en charge, car la définition UDF s’exécute dans le cadre de l’exécution planifiée du notebook.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# UDF: Extract domain from email
def extract_email_domain(email):
    if email is None or '@' not in email:
        return None
    return email.split('@')[1]

# Registration
spark.udf.register(
    "udf_email_domain",
    extract_email_domain,
    StringType()
)

@fmlv.materialized_lake_view(
    name="udf_testing_silver.mlv_high_value_customers",
    comment="High-value customers identified by UDF criteria",
    table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
    return spark.sql("""
        SELECT 
            c.customer_id,
            c.name,
            c.email,
            udf_email_domain(c.email) as email_domain,
            c.segment,
            c.lifetime_value,
            total_transactions.total_amount,
            total_transactions.txn_count
        FROM udf_testing_bronze.customers c
        INNER JOIN (
            SELECT 
                customer_id,
                SUM(amount) as total_amount,
                COUNT(*) as txn_count
            FROM udf_testing_bronze.transactions
            WHERE udf_is_positive(amount)
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ) total_transactions ON c.customer_id = total_transactions.customer_id
        WHERE udf_validate_customer(c.email, c.age)
            AND c.segment IN ('premium', 'vip')
    """)

print("✓ Created mlv_high_value_customers")

Bibliothèques tierces - UDF Pandas (Fonctions Définies par l'Utilisateur)

Les bibliothèques tierces telles que les UDF Pandas permettent d’implémenter des règles de qualité des données avec un traitement vectorisé. Ils permettent des validations avancées telles que la logique métier personnalisée, les vérifications statistiques ou la détection de modèles qui ne sont pas possibles avec les fonctions intégrées. Cela permet de créer des contraintes de qualité des données évolutives et réutilisables lors de la création et de l’actualisation de MLV.

  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  import pandas as pd
  from pyspark.sql.types import BooleanType
  def pandas_check_impl(val):
    # Reject if value < median of [100, 200, 300]
    return val >= pd.Series([100, 200, 300]).median()
  spark.udf.register("pandas_check", pandas_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_pandas",
    comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
  )
  @fmlv.check(
    name="dq_pandas_check",
    condition="pandas_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_pandas():
    # Define the function

    # Register the function as a Spark UDF

    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


  df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
  # All amounts should be >= median (200)
  assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
  print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")

Bibliothèques personnalisées – Python Wheel (.whl)

Les fonctions empaquetées en tant que fichiers jar ou fichiers de roue peuvent être installées sur le cluster Fabric (via les paramètres d’environnement) et utilisées dans les définitions MLV. CREATE et LINEAGE REFRESH sont pris en charge pour les contextes PySpark. Pour plus d’informations, consultez Gérer les bibliothèques personnalisées dans les environnements Fabric .

  %%pyspark
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
  from datetime import datetime

  from pyspark.sql.types import BooleanType
  from custom_dq_lib import threshold_check
  def custom_check_impl(val):
    return threshold_check(val, threshold=200)
  spark.udf.register("custom_check", custom_check_impl, BooleanType())

  @fmlv.materialized_lake_view(
    name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
    comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
    replace=True
  )
  @fmlv.check(
    name="dq_custom_check",
    condition="custom_check(l3)",
    action="DROP"
  )
  def pyspark_from_two_sqlmlv_inner_custom():
    # Wrap the custom function as Spark UDF


    # Read source tables
    df1 = spark.table("silver.base_sqlmlv")
    df2 = spark.table("silver.base_sqlmlv")

    # Rename columns for unique join
    df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
    df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])

    # Perform INNER JOIN
    df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
    return df


    df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
    # All amounts should be >= threshold (200)
    assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
    print("PySpark MLV INNER JOIN with custom DQ library passed")

Fonctions de données utilisateur Fabric

Les fonctions de données utilisateur fabric (UDF) sont définies et gérées de manière centralisée dans l’espace de travail Fabric. Ils sont disponibles pour tout notebook ou pipeline sans nécessiter de réenregistrement à chaque session, ce qui les rend idéaux pour les pipelines MLV de production. Cette fonctionnalité est prise en charge uniquement dans les contextes PySpark pour la mise à jour des types CREATE et LINEAGE. En savoir plus sur les fonctions de données utilisateur ici. Pour plus d’informations, consultez vue d’ensemble des fonctions de données utilisateur Fabric.

  %%pyspark
  import fmlv
  from pyspark.sql import functions as F
  from notebookutils import udf

  myFuncs = udf.getFunctions("UserDataFunction_1")

  def add_greeting_column(df):
    pdf = df.toPandas()
    pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
    return spark.createDataFrame(pdf)
  import fmlv
  from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

  # -------------------------------------------------------------
  # Define base PySpark MLV (no SQL)
  # -------------------------------------------------------------
  @fmlv.materialized_lake_view(
    name="silver.base_pysparkmlv",
    comment="Base MLV created using PySpark",
    replace=True
  )
  def base_pysparkmlv():

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("country", StringType(), True)
    ])

    data = [
        (1, "Alice", 100.0, "US"),
        (2, "Bob", 200.0, "UK"),
        (3, "Charlie", 300.0, "UK")
    ]

    return spark.createDataFrame(data, schema)

  @fmlv.materialized_lake_view(
    name="silver.mlv_udfn_null_test",
    replace=False
  )
  @fmlv.check(
    name="null_check",
    condition="greeting IS NOT NULL",
    action="FAIL"
  )
  def mlv_udfn_null_test():
    df = spark.table("silver.base_pysparkmlv")
    return add_greeting_column(df)