Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans les architectures de médaillon, vous devez appliquer la qualité des données à chaque étape. Une mauvaise qualité des données peut entraîner des insights incorrects et des inefficacités opérationnelles.
Cet article explique comment implémenter des contrôles de qualité des données dans des vues de lac matérialisées (MLV) dans Microsoft Fabric.
Implémenter la qualité des données
Dans les vues de lac matérialisées (MLV) dans Microsoft Fabric, vous conservez la qualité des données en définissant des contraintes sur vos vues. Sans vérifications explicites, les problèmes de données mineurs peuvent augmenter le temps de traitement ou échouer le pipeline.
Lorsqu’une ligne enfreint une contrainte, vous pouvez utiliser l’une des actions suivantes :
FAIL : arrête l’actualisation MLV lors de la première violation de contrainte. Il s’agit du comportement par défaut, même lorsque vous ne spécifiez
FAILpas .DROP : poursuit le traitement et supprime les enregistrements qui violent la contrainte. La vue traçabilité affiche le nombre d’enregistrements supprimés.
Remarque
Si vous définissez à la fois les actions DROP et FAIL dans une bibliothèque MLV, l’action FAIL est prioritaire.
Définir des vérifications de qualité des données dans une vue matérialisée du lac
Lorsque vous créez une vue matérialisée du lac, vous pouvez définir des contraintes ( règles de qualité des données qui valident chaque ligne lors d’une actualisation). Une contrainte est une expression booléenne que chaque ligne doit satisfaire. Les lignes qui passent sont écrites dans la table de sortie. Les lignes qui échouent sont gérées en fonction du paramètre de violation : elles sont supprimées en mode silencieux ou provoquent l’échec de l’actualisation entière.
L’exemple suivant définit la contrainte cust_blank, qui vérifie si le customerName champ n’est pas null. La contrainte exclut les lignes avec une valeur Null customerName du traitement.
CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
c.customerID,
c.customerName,
c.contact,
CASE
WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE
ELSE FALSE
END AS has_orders
FROM bronze.customers c LEFT JOIN bronze.orders o
ON c.customerID = o.customerID;
Fonctions intégrées du système
Les fonctions Spark/SQL intégrées telles que UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() et DATE_FORMAT() sont entièrement prises en charge dans tous les contextes MLV pour l’actualisation CREATE et LINEAGE.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Remarque
Les fonctions système sont l’option la plus simple et la plus fiable. Ils ne nécessitent aucune inscription, fonctionnent dans tous les contextes et sont entièrement pris en charge pendant l’actualisation de LINEAGE.
UDFs – définies et enregistrées dans le même notebook
Les UDFs enregistrées avec spark.udf.register() dans le même notebook sont prises en charge pour l'instruction CREATE dans tous les contextes. Pour l’actualisation LINEAGE, seuls les contextes PySpark sont pris en charge, car la définition UDF s’exécute dans le cadre de l’exécution planifiée du notebook.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Bibliothèques tierces - UDF Pandas (Fonctions Définies par l'Utilisateur)
Les bibliothèques tierces telles que les UDF Pandas permettent d’implémenter des règles de qualité des données avec un traitement vectorisé. Ils permettent des validations avancées telles que la logique métier personnalisée, les vérifications statistiques ou la détection de modèles qui ne sont pas possibles avec les fonctions intégrées. Cela permet de créer des contraintes de qualité des données évolutives et réutilisables lors de la création et de l’actualisation de MLV.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Bibliothèques personnalisées – Python Wheel (.whl)
Les fonctions empaquetées en tant que fichiers jar ou fichiers de roue peuvent être installées sur le cluster Fabric (via les paramètres d’environnement) et utilisées dans les définitions MLV. CREATE et LINEAGE REFRESH sont pris en charge pour les contextes PySpark. Pour plus d’informations, consultez Gérer les bibliothèques personnalisées dans les environnements Fabric .
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Fonctions de données utilisateur Fabric
Les fonctions de données utilisateur fabric (UDF) sont définies et gérées de manière centralisée dans l’espace de travail Fabric. Ils sont disponibles pour tout notebook ou pipeline sans nécessiter de réenregistrement à chaque session, ce qui les rend idéaux pour les pipelines MLV de production. Cette fonctionnalité est prise en charge uniquement dans les contextes PySpark pour la mise à jour des types CREATE et LINEAGE. En savoir plus sur les fonctions de données utilisateur ici. Pour plus d’informations, consultez vue d’ensemble des fonctions de données utilisateur Fabric.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)