Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Nelle architetture medallion è necessario applicare la qualità dei dati in ogni fase. Una scarsa qualità dei dati può portare a intuizioni errate e inefficienze operative.
Questo articolo illustra come implementare verifiche di qualità dei dati in viste lake materializzate (MLV) in Microsoft Fabric.
Implementare la qualità dei dati
Nella vista materializzata del lago (MLV) in Microsoft Fabric, si mantiene la qualità dei dati definendo vincoli sulla vista. Senza controlli espliciti, i problemi di dati secondari possono aumentare il tempo di elaborazione o interrompere la pipeline.
Quando una riga viola un vincolo, è possibile usare una di queste azioni:
FAIL: arresta l'aggiornamento MLV alla prima violazione del vincolo. Si tratta del comportamento predefinito, anche quando non si specifica
FAIL.Attenzione
La creazione o l'aggiornamento di un MLV con un'azione
FAILpuò comportare un errore di "tabella delta non trovata". In questo caso, ricreare MLV ed evitare di eseguire l'azioneFAIL.DROP: continua l'elaborazione e rimuove i record che violano il vincolo. La visualizzazione derivazione mostra il numero di record eliminati.
Annotazioni
Se si definiscono sia le azioni DROP che FAIL in un MLV, l'azione FAIL ha la precedenza.
Definire controlli di qualità dei dati in una vista lake materializzata
DQ MLV (Data Quality MLV): MLV che include uno o più vincoli di qualità dei dati dichiarati in una clausola WITH DATA QUALITY. Ogni vincolo definisce un'espressione booleana e un'azione da eseguire quando una riga la viola. Le righe che passano vengono scritte nella tabella di output; le righe con errori vengono eliminate silenziosamente o causano l'interruzione dell'intero aggiornamento, a seconda dell'impostazione in caso di violazione.
Nell'esempio seguente viene definito il vincolo cust_blank, che controlla se il customerName campo non è Null. Il vincolo esclude le righe con un valore Null customerName dall'elaborazione.
La tabella riepiloga le operazioni supportate. '—' indica che l'operazione non è supportata. 'N/A' indica che il tipo di funzione non è applicabile a tale contesto.
| Scenario | Descrizione | Creazione di SpSQL Non-DQ | SpSQL Non-DQ LINEAGE | Creazione PySpark Non-DQ | Lineage non DQ PySpark | Creazione DQ spSQL | SpSQL DQ LINEAGE | Creazione di DQ in PySpark | PySpark DQ LINEAGE |
|---|---|---|---|---|---|---|---|---|---|
| Funzioni di sistema | Incorporato: UPPER, LOWER, COALESCE e così via. | Sì | Sì | Sì | Sì | Sì | Sì | Sì | Sì |
| UDF - Stessa NB | spark.udf.register() nello stesso notebook | Sì | - | Sì | Sì | Sì | - | Sì | Sì |
| Funzione definita dall'utente - Diff NB | Funzione definita dall'utente in NB separata, registrata nello stesso NB | Sì | - | Sì | Sì | Sì | - | Sì | Sì |
| Librerie di terze parti | Funzione definita dall'utente vettorializzata nel framework pandas | Sì | - | Sì | Sì | Sì | - | Sì | Sì |
| Ruota personalizzata | Funzione inserita in un pacchetto nel file wheel python | Sì | - | Sì | Sì | Sì | - | Sì | Sì |
| Jar personalizzato | Funzione compilata in Java/Scala JAR | Sì | - | Sì | Sì | Sì | - | Sì | Sì |
| Libreria PyPI | Funzione dal pacchetto PyPI pubblico | Sì | N/A | Sì | Sì | Sì | - | Sì | Sì |
| Funzione Definita dall'Utente di Fabric | Funzione definita in Funzioni dati utente di Fabric | N/A | N/A | Sì | Sì | N/A | N/A | Sì | Sì |
Funzioni predefinite del sistema
Le funzioni Spark/SQL predefinite, ad esempio UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() e DATE_FORMAT() sono completamente supportate in tutti i contesti MLV per l'aggiornamento CREATE e LINEAGE.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al')
ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Annotazioni
Le funzioni di sistema sono l'opzione più semplice e affidabile. Non richiedono alcuna registrazione, funzionano in tutti i contesti e sono completamente supportati durante l'aggiornamento LINEAGE.
UDF (Funzioni definite dall'utente) – Definite e registrate nello stesso notebook
Le funzioni definite dall'utente registrate con spark.udf.register() nello stesso notebook sono supportate per CREATE in tutti i contesti. Per l'aggiornamento LINEAGE, sono supportati solo i contesti PySpark perché la definizione della UDF viene eseguita come parte dell'esecuzione programmata del notebook.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Librerie di terze parti - UDF Pandas
Le librerie di terze parti come Pandas UDF consentono l'implementazione di regole di qualità dei dati mediante elaborazione vettorializzata. Abilitano convalide avanzate, ad esempio la logica di business personalizzata, i controlli statistici o il rilevamento dei criteri che non sono possibili con le funzioni predefinite. Ciò consente di creare vincoli di qualità dei dati scalabili e riutilizzabili durante la creazione e l'aggiornamento di MLV.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Librerie personalizzate - Python Wheel (.whl)
Le funzioni impacchettate come file JAR o wheel possono essere installate nel cluster Fabric (tramite le impostazioni dell'ambiente) e usate nelle definizioni MLV. CREATE e lINEAGE REFRESH sono supportati per i contesti PySpark. Per ulteriori dettagli, vedere Gestire librerie personalizzate in ambienti Fabric.
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Funzioni dati utente di Fabric
Le User Data Functions (UDF) di Fabric sono definite e gestite centralmente nel workspace di Fabric. Sono disponibili per qualsiasi notebook o pipeline senza dover essere nuovamente registrati per ogni sessione, rendendoli ideali per le pipeline MLV di produzione. Questa funzionalità è supportata solo nei contesti PySpark per l'aggiornamento CREATE e LINEAGE. Altre informazioni sulle funzioni dati utente sono disponibili qui. Per altre informazioni, vedere Panoramica di Funzioni dati utente di Fabric.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)