Muistiinpano
Tämän sivun käyttö edellyttää valtuutusta. Voit yrittää kirjautua sisään tai vaihtaa hakemistoa.
Tämän sivun käyttö edellyttää valtuutusta. Voit yrittää vaihtaa hakemistoa.
Medallion-arkkitehtuureissa datan laatu on valvottava jokaisessa vaiheessa. Huono tietojen laatu voi johtaa virheellisiin merkityksellisiin tietoihin ja operatiiviseen tehotkuuteen.
Tässä artikkelissa selitetään, miten datan laadun tarkistukset toteutetaan materialisoiduissa järvinäkymissä (MLV) Microsoft Fabricissa.
Tietojen laadun toteuttaminen
Microsoft Fabricin materialisoiduissa järvinäkymissä (MLV) ylläpidämme datan laatua määrittämällä rajoituksia näkymiisi. Ilman tarkistuksia pienet dataongelmat voivat pidentää käsittelyaikaa tai vikata putken.
Kun rivi rikkoo rajoitteen, voit käyttää jotakin seuraavista toiminnoista:
EPÄONNISTUMINEN: Estää MLV-päivityksen ensimmäisen rajoiterikkomuksen kohdalla. Tämä on oletuskäyttäytyminen, vaikka et määrittäisikään
FAIL.DROP: Jatkaa käsittelyä ja poistaa tietueita, jotka rikkovat rajoitetta. Sukulinjanäkymä näyttää pudonneiden tietueiden määrän.
Huomautus
Jos määrittelet sekä DROP- että FAIL-toiminnot MLV:ssä, FAIL-toiminto menee etusijalle.
Määrittele datan laadun tarkistukset materialisoidussa järvinäkymässä
Kun luot materialisoidun järven näkymän, voit määritellä rajoitteita — datan laatusääntöjä, jotka vahvistavat jokaisen rivin päivityksen aikana. Rajoite on Boolen lauseke, jonka jokaisen rivin on täytettävä. Läpäisevät rivit kirjoitetaan tulostauluun. Rivejä, jotka epäonnistuvat, käsitellään on-violation -asetuksen mukaisesti: ne joko pudotetaan hiljaisesti tai aiheuttavat koko päivityksen epäonnistumisen.
Seuraavassa esimerkissä määritetään rajoite cust_blank, joka tarkistaa, onko customerName kenttä tyhjäarvo. Rajoite sulkee pois nollan customerName rivit käsittelystä.
CREATE OR REPLACE MATERIALIZED LAKE VIEW IF NOT EXISTS silver.customers_enriched
(CONSTRAINT cust_blank CHECK (customerName is not null) on MISMATCH DROP)
AS
SELECT
c.customerID,
c.customerName,
c.contact,
CASE
WHEN COUNT(o.orderID) OVER (PARTITION BY c.customerID) > 0 THEN TRUE
ELSE FALSE
END AS has_orders
FROM bronze.customers c LEFT JOIN bronze.orders o
ON c.customerID = o.customerID;
Järjestelmän sisäänrakennetut toiminnot
Sisäänrakennetut Spark/SQL-toiminnot, kuten UPPER(), LOWER(), TRIM(), COALESCE(), INITCAP() ja DATE_FORMAT() ovat täysin tuettuja kaikissa MLV-konteksteissa sekä CREATE- että LINEAGE-päivityksessä.
CREATE MATERIALIZED LAKE VIEW sample_lakehouse.silver.names (
CONSTRAINT substring_check
CHECK (SUBSTRING(name, 1, 2) = 'Al') ON MISMATCH drop
) AS
SELECT id, name
FROM (VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Ann')) AS t(id, name)
Huomautus
Järjestelmätoiminnot ovat yksinkertaisin ja luotettavin vaihtoehto. Ne eivät vaadi rekisteröitymistä, toimivat kaikissa tilanteissa ja ovat täysin tuettuja LINEAGE-päivityksen aikana.
UDF:t – määritelty ja rekisteröity samassa muistikirjassa
UDF:t, jotka on rekisteröity spark.udf.register():llä samassa muistikirjassa, tuetaan CREATE:lle kaikissa konteksteissa. LINEAGE-päivityksessä tuetaan vain PySpark-konteksteja, koska UDF-määritelmä suoritetaan osana aikataulutettua muistikirjan suoritusta.
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
spark = SparkSession.builder.getOrCreate()
# UDF: Extract domain from email
def extract_email_domain(email):
if email is None or '@' not in email:
return None
return email.split('@')[1]
# Registration
spark.udf.register(
"udf_email_domain",
extract_email_domain,
StringType()
)
@fmlv.materialized_lake_view(
name="udf_testing_silver.mlv_high_value_customers",
comment="High-value customers identified by UDF criteria",
table_properties={"delta.enableChangeDataFeed": "true"}
)
def mlv_high_value_customers():
return spark.sql("""
SELECT
c.customer_id,
c.name,
c.email,
udf_email_domain(c.email) as email_domain,
c.segment,
c.lifetime_value,
total_transactions.total_amount,
total_transactions.txn_count
FROM udf_testing_bronze.customers c
INNER JOIN (
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as txn_count
FROM udf_testing_bronze.transactions
WHERE udf_is_positive(amount)
GROUP BY customer_id
HAVING SUM(amount) > 1000
) total_transactions ON c.customer_id = total_transactions.customer_id
WHERE udf_validate_customer(c.email, c.age)
AND c.segment IN ('premium', 'vip')
""")
print("✓ Created mlv_high_value_customers")
Kolmannen osapuolen kirjastot – Pandas UDF:t
Kolmannen osapuolen kirjastot, kuten Pandas UDF:t, mahdollistavat datan laatusääntöjen toteuttamisen vektoroidulla prosessoinnilla. Ne mahdollistavat edistyneet validoinnit, kuten räätälöidyt liiketoimintalogiikan, tilastolliset tarkistukset tai kuvioiden tunnistuksen, joita ei ole mahdollista sisäänrakennetuilla funktioilla. Tämä auttaa rakentamaan skaalautuvia ja uudelleenkäytettäviä datan laaturajoitteita MLV:n luomisessa ja päivityksessä.
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
import pandas as pd
from pyspark.sql.types import BooleanType
def pandas_check_impl(val):
# Reject if value < median of [100, 200, 300]
return val >= pd.Series([100, 200, 300]).median()
spark.udf.register("pandas_check", pandas_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_pandas",
comment="PySpark MLV INNER JOIN using pandas-based constraint and DROP violations"
)
@fmlv.check(
name="dq_pandas_check",
condition="pandas_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_pandas():
# Define the function
# Register the function as a Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_pandas")
# All amounts should be >= median (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with pandas-based DQ DROP passed")
Mukautetut kirjastot – Python Wheel (.whl)
Jar- tai wheel-tiedostoina pakatut funktiot voidaan asentaa Fabric-klusteriin (ympäristöasetusten kautta) ja käyttää MLV-määrittelyissä. CREATE ja LINEAGE REFRESH on tuettu PySpark-konteksteissa. Katso lisätietoja kohdasta Hallinnoi mukautettuja kirjastoja Fabric-ympäristöissä .
%%pyspark
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, DoubleType, BooleanType
from datetime import datetime
from pyspark.sql.types import BooleanType
from custom_dq_lib import threshold_check
def custom_check_impl(val):
return threshold_check(val, threshold=200)
spark.udf.register("custom_check", custom_check_impl, BooleanType())
@fmlv.materialized_lake_view(
name="silver.pyspark_from_two_sqlmlv_inner_custom_whl",
comment="PySpark MLV INNER JOIN using custom DQ library and DROP violations",
replace=True
)
@fmlv.check(
name="dq_custom_check",
condition="custom_check(l3)",
action="DROP"
)
def pyspark_from_two_sqlmlv_inner_custom():
# Wrap the custom function as Spark UDF
# Read source tables
df1 = spark.table("silver.base_sqlmlv")
df2 = spark.table("silver.base_sqlmlv")
# Rename columns for unique join
df_left = df1.select([df1[col].alias(f"l{i+1}") for i, col in enumerate(df1.columns)])
df_right = df2.select([df2[col].alias(f"r{i+1}") for i, col in enumerate(df2.columns)])
# Perform INNER JOIN
df = df_left.join(df_right, df_left.l1 == df_right.r1, "inner")
return df
df = spark.table("silver.pyspark_from_two_sqlmlv_inner_custom_whl")
# All amounts should be >= threshold (200)
assert all(df.select("l3").rdd.map(lambda r: r[0] >= 200).collect()), "Unexpected low-value rows found"
print("PySpark MLV INNER JOIN with custom DQ library passed")
Fabric-käyttäjätietofunktiot
Fabric-käyttäjätietotoiminnot (UDF:t) määritellään ja hallitaan keskitetysti Fabric-työtilassa. Ne ovat saatavilla mihin tahansa kannettavaan tai putkeen ilman, että niitä tarvitsee rekisteröidä uudelleen per istunto, mikä tekee niistä ihanteellisia tuotanto-MLV-putkistoille. Tämä ominaisuus on tuettu vain PySpark-konteksteissa sekä CREATE- että LINEAGE-päivityksessä. Lue lisää User Data Functions -toiminnoista täältä. Lisätietoja löytyy Fabric User Data Functions -yleiskatsauksesta.
%%pyspark
import fmlv
from pyspark.sql import functions as F
from notebookutils import udf
myFuncs = udf.getFunctions("UserDataFunction_1")
def add_greeting_column(df):
pdf = df.toPandas()
pdf["greeting"] = pdf["name"].apply(lambda n: myFuncs.hello_fabric(n))
return spark.createDataFrame(pdf)
import fmlv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# -------------------------------------------------------------
# Define base PySpark MLV (no SQL)
# -------------------------------------------------------------
@fmlv.materialized_lake_view(
name="silver.base_pysparkmlv",
comment="Base MLV created using PySpark",
replace=True
)
def base_pysparkmlv():
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("country", StringType(), True)
])
data = [
(1, "Alice", 100.0, "US"),
(2, "Bob", 200.0, "UK"),
(3, "Charlie", 300.0, "UK")
]
return spark.createDataFrame(data, schema)
@fmlv.materialized_lake_view(
name="silver.mlv_udfn_null_test",
replace=False
)
@fmlv.check(
name="null_check",
condition="greeting IS NOT NULL",
action="FAIL"
)
def mlv_udfn_null_test():
df = spark.table("silver.base_pysparkmlv")
return add_greeting_column(df)