Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
La pipeline dichiarativa di Lakeflow Spark semplifica il Change Data Capture (CDC) con le API AUTO CDC e AUTO CDC FROM SNAPSHOT. Queste API automatizzano la complessità del calcolo delle dimensioni a modifica lenta (SCD) di tipo 1 e tipo 2 sia da un feed CDC che da snapshot di database. Per altre informazioni su questi concetti, vedere Change Data Capture e snapshot.
Annotazioni
Le AUTO CDC API sostituiscono le APPLY CHANGES API e hanno la stessa sintassi. Le APPLY CHANGES API sono ancora disponibili, ma Databricks consiglia di usare le AUTO CDC API sul posto.
L'API usata dipende dall'origine dei dati delle modifiche:
-
AUTO CDC: usare questa opzione quando il database di origine dispone di un feed CDC abilitato.AUTO CDCelabora le modifiche da un feed di dati di modifica (CDF). È supportato sia nelle interfacce SQL della pipeline che in Python. -
AUTO CDC FROM SNAPSHOT: usare questa opzione quando CDC non è abilitato nel database di origine e sono disponibili solo gli snapshot. Questa API confronta gli snapshot per determinare le modifiche e quindi elaborarle. È supportato solo nell'interfaccia Python.
Entrambe le API supportano l'aggiornamento delle tabelle tramite scD Type 1 e Type 2:
- Usare scD Type 1 per aggiornare direttamente i record. La cronologia non viene mantenuta per i registri aggiornati.
- Usare SCD Type 2 per conservare una cronologia dei record, su tutti gli aggiornamenti o sugli aggiornamenti di un set specificato di colonne.
Le AUTO CDC API non sono supportate dalle pipeline dichiarative di Apache Spark.
Per la sintassi e altri riferimenti, vedere AUTO CDC INTO (pipeline),create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.
Annotazioni
Questa pagina descrive come aggiornare le tabelle nelle pipeline in base alle modifiche apportate ai dati di origine. Per informazioni su come registrare ed eseguire query sulle informazioni sulle modifiche a livello di riga per le tabelle Delta, vedere Utilizzare il feed di dati delle modifiche di Delta Lake su Azure Databricks.
Requisiti
Per usare le API CDC, la pipeline deve essere configurata per usare SDP serverless o le edizioni Pro SDP Advanced o .
Funzionamento di AUTO CDC
Per eseguire l'elaborazione CDC con AUTO CDC, creare una tabella di streaming e quindi usare l'istruzione AUTO CDC ... INTO in SQL o la create_auto_cdc_flow() funzione in Python per specificare l'origine, le chiavi e la sequenziazione per il feed di modifiche. Per una spiegazione del funzionamento della sequenziazione e della logica SCD, vedere Acquisizione dati modificati e istantanee. Vedere gli esempi di AUTO CDC.
Per l'idratazione iniziale da un'origine con un change feed, utilizzare AUTO CDC in un flusso once e quindi continuare a elaborare il change feed. Vedere Replicare una tabella RDBMS esterna usando AUTO CDC.
Per informazioni dettagliate sulla sintassi, vedere AUTO CDC INTO (pipeline) o create_auto_cdc_flow.
Funzionamento di AUTO CDC FROM SNAPSHOT
AUTO CDC FROM SNAPSHOT determina le modifiche nei dati di origine confrontando gli snapshot in ordine. È supportato solo nell'interfaccia della pipeline Python. È possibile leggere gli snapshot da una tabella Delta, da file di archiviazione cloud o da JDBC direttamente.
Per eseguire l'elaborazione CDC con AUTO CDC FROM SNAPSHOT, creare una tabella di streaming e quindi usare la funzione create_auto_cdc_from_snapshot_flow() per specificare lo snapshot, le chiavi e altri argomenti. Per informazioni dettagliate sui due modelli di inserimento e su quando usarli, vedere Modelli di elaborazione snapshot. Vedere gli esempi di AUTO CDC FROM SNAPSHOT.
Per informazioni dettagliate sulla sintassi, vedere create_auto_cdc_from_snapshot_flow.
Usare più colonne per la sequenziazione
Per sequenziare in base a più colonne, ad esempio un timestamp e un ID per eliminare parità, usare un operatore STRUCT per combinarli. L'API ordina per primo il primo campo e, in caso di parità, considera il secondo campo e così via.
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Pitone
sequence_by = struct("timestamp_col", "id_col")
Esempi di AUTO CDC
Gli esempi seguenti illustrano l'elaborazione SCD tipo 1 e tipo 2 usando una sorgente di flusso di dati delle modifiche. I dati di esempio creano nuovi record utente, eliminano un record utente e aggiornano i record utente. Nell'esempio SCD Type 1 le ultime UPDATE operazioni arrivano in ritardo e vengono eliminate dalla tabella di destinazione, dimostrando la gestione degli eventi fuori ordine.
Di seguito sono riportati i record di input usati in questi esempi. Questi dati vengono creati eseguendo la query nella sezione Creare dati di esempio .
| userId | nome | city | operazione | numeroSequenza |
|---|---|---|---|---|
| 124 | Raul | Oaxaca | INSERT | 1 |
| 123 | Isabel | Monterrey | INSERT | 1 |
| 125 | Mercedes | Tijuana | INSERT | 2 |
| 126 | Lily | Cancun | INSERT | 2 |
| 123 | nullo | nullo | DELETE | 6 |
| 125 | Mercedes | Guadalajara | UPDATE | 6 |
| 125 | Mercedes | Mexicali | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
Quando si rimuove il commento dalla riga finale della query di generazione dei dati di esempio, inserisce il seguente record che specifica di troncare la tabella a sequenceNum=3:
| userId | nome | city | operazione | numeroSequenza |
|---|---|---|---|---|
| nullo | nullo | nullo | TRONCARE | 3 |
Annotazioni
Tutti gli esempi seguenti includono opzioni per specificare le operazioni DELETE e TRUNCATE, ma ognuna è facoltativa.
Creare dati di esempio
Eseguire le istruzioni seguenti per creare un set di dati di esempio. Questo codice non deve essere eseguito come parte di una definizione della pipeline. Eseguirlo dalla cartella di esplorazione della pipeline, anziché dalla cartella delle trasformazioni.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Elaborare gli aggiornamenti di SCD Tipo 1
ScD Type 1 mantiene solo la versione più recente di ogni record. L'esempio seguente legge dal feed di dati delle modifiche creato in precedenza e applica le modifiche a una destinazione della tabella di streaming. Sviluppare le pipeline dichiarative di Lakeflow Spark per eseguire questo codice.
Pitone
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Dopo aver eseguito l'esempio scD Type 1, la tabella di destinazione contiene i record seguenti:
| userId | nome | city |
|---|---|---|
| 124 | Raul | Oaxaca |
| 125 | Mercedes | Guadalajara |
| 126 | Lily | Cancun |
L'utente 123 (Isabel) è stato eliminato e non viene visualizzato. L'utente 125 (Mercedes) mostra solo l'ultima città (Guadalajara) perché SCD Type 1 sovrascrive i valori precedenti. Il precedente UPDATE a sequenceNum=5 è stato eliminato perché è arrivato un successivo aggiornamento a sequenceNum=6.
Dopo aver eseguito l'esempio con il TRUNCATE record senza commento, la tabella viene cancellata in sequenceNum=3. Ciò significa che i record 124 e 126 non si trovano nella tabella e la tabella di destinazione finale contiene solo il record seguente:
| userId | nome | city |
|---|---|---|
| 125 | Mercedes | Guadalajara |
Elaborare gli aggiornamenti di tipo 2 di SCD
Il tipo SCD 2 mantiene una cronologia completa delle modifiche creando nuove righe per ogni versione di un record, con __START_AT colonne e __END_AT che indicano quando ogni versione era attiva.
Pitone
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Dopo aver eseguito l'esempio scD Type 2, la tabella di destinazione contiene i record seguenti:
| userId | nome | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Monterrey | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | Raul | Oaxaca | 1 | nullo |
| 125 | Mercedes | Tijuana | 2 | 5 |
| 125 | Mercedes | Mexicali | 5 | 6 |
| 125 | Mercedes | Guadalajara | 6 | nullo |
| 126 | Lily | Cancun | 2 | nullo |
La tabella mantiene la cronologia completa. L'utente 123 ha due versioni (terminate alla sequenza 6 quando sono state eliminate). L'utente 125 ha tre versioni che mostrano le modifiche della città. I record con __END_AT = null sono attualmente attivi.
Tenere traccia di un sottoinsieme di colonne con SCD Type 2
Per impostazione predefinita, scD Type 2 crea una nuova versione ogni volta che viene modificato qualsiasi valore di colonna. È possibile specificare un subset di colonne da tenere traccia, in modo che le modifiche apportate ad altre colonne aggiornino la versione corrente sul posto anziché generare un nuovo record di cronologia.
L'esempio seguente esclude la city colonna dal rilevamento della cronologia:
Pitone
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Poiché city le modifiche non vengono rilevate, gli aggiornamenti delle città sovrascrivono la riga corrente anziché creare una nuova versione. La tabella di destinazione contiene i record seguenti:
| userId | nome | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | Raul | Oaxaca | 1 | nullo |
| 125 | Mercedes | Guadalajara | 2 | nullo |
| 126 | Lily | Cancun | 2 | nullo |
Esempi di AUTO CDC FROM SNAPSHOT
Nelle sezioni seguenti vengono forniti esempi dell'utilizzo di AUTO CDC FROM SNAPSHOT per processare gli snapshot nelle tabelle di destinazione SCD di tipo 1 o di tipo 2. Per informazioni generali su quando usare questa API, vedere Change Data Capture and snapshots (Modificare acquisizione dati e snapshot).
Esempio: Elaborazione degli snapshot utilizzando il tempo di acquisizione della pipeline
Usare questo approccio quando gli snapshot arrivano regolarmente e in ordine ed è possibile basarsi sul timestamp di esecuzione della pipeline per il controllo delle versioni. Viene inserito un nuovo snapshot con ogni aggiornamento della pipeline.
È possibile leggere gli snapshot da più tipi di origine, tra cui tabelle Delta, file di archiviazione cloud e connessioni JDBC.
Passaggio 1: Creare dati di esempio
Creare una tabella contenente i dati dello snapshot. Esegui il codice seguente da un notebook o da Databricks SQL nella cartella explorations della pipeline.
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
Passaggio 2: Eseguire AUTO CDC dallo SNAPSHOT
Sviluppare pipeline dichiarative di Lakeflow Spark per eseguire il codice in questo passaggio.
Scegliere un tipo di origine per la visualizzazione snapshot (il codice di creazione di esempio genera una tabella Delta):
Opzione A: Lettura da una tabella Delta
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
Opzione B: Leggere dall'archiviazione cloud
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
Opzione C: Lettura da JDBC (solo calcolo classico)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
Tutte le opzioni, scrivere nella destinazione
Aggiungere quindi la tabella di destinazione e il flusso:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
Dopo la prima esecuzione della pipeline, tutti i record vengono inseriti come righe attive:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | nullo |
| 2 | Monterrey | 0 | nullo |
| 3 | Tijuana | 0 | nullo |
Annotazioni
Per usare invece scD Type 1 e mantenere solo lo stato corrente, impostare stored_as_scd_type=1. In questo caso, la tabella di destinazione non include le colonne __START_AT e __END_AT.
Passaggio 3: Simulare un nuovo snapshot ed eseguire di nuovo
Aggiornare la tabella di origine per simulare un nuovo snapshot in arrivo (eseguire questo codice da un notebook o da un file SQL nella explorations cartella della pipline):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
Esegui di nuovo la pipeline.
AUTO CDC FROM SNAPSHOT confronta il nuovo snapshot con quello precedente e rileva che l'utente 1 è stato eliminato, gli utenti 2 e 3 sono stati aggiornati e gli utenti 4 e 6 sono stati inseriti. Viene generato un feed di modifiche e AUTO CDC viene usato per creare la tabella di output.
Dopo la seconda esecuzione con SCD Type 2, la tabella di destinazione contiene i record seguenti:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | Monterrey | 0 | 1 |
| 2 | Carmel | 1 | nullo |
| 3 | Tijuana | 0 | 1 |
| 3 | Los Angeles | 1 | nullo |
| 4 | Valle della Morte | 1 | nullo |
| 6 | Kings Canyon | 1 | nullo |
L'utente 1 è stato terminato (eliminato). Gli utenti 2 e 3 hanno ognuna due versioni che mostrano le modifiche della città. Gli utenti 4 e 6 sono stati appena inseriti.
Dopo la seconda esecuzione con scD Type 1, la tabella di destinazione mostra solo lo stato corrente:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Valle della Morte |
| 6 | Kings Canyon |
Esempio: Elaborare snapshot usando le funzioni di versione
Usare questo approccio quando è necessario un controllo esplicito sull'ordinamento degli snapshot. Ad esempio, usare questo approccio quando arrivano più snapshot contemporaneamente o gli snapshot non arrivano in ordine. Si scrive una funzione che specifica lo snapshot da elaborare successivamente e il relativo numero di versione. L'API elabora gli snapshot in ordine di versione crescente:
- Se più snapshot sono in archiviazione, vengono tutti elaborati in ordine.
- Se uno snapshot arriva fuori ordine (ad esempio,
snapshot_3arriva doposnapshot_4), viene ignorato. - Se non sono presenti nuovi snapshot, la funzione restituisce
Nonee non viene eseguita alcuna elaborazione.
Passaggio 1: Preparare i file di snapshot
Creare dei file CSV contenenti dati snapshot e aggiungerli a un volume o a una posizione di archiviazione nel cloud. Denominare i file in ordine cronologico (ad esempio, snapshot_1.csv, snapshot_2.csv).
Ogni file deve contenere colonne per userId e city. Per esempio:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | Monterrey |
| 3 | Tijuana |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | Carmel |
| 3 | Los Angeles |
| 4 | Valle della Morte |
Passaggio 2: Eseguire AUTO CDC FROM SNAPSHOT con una funzione di versione
Creare un nuovo notebook e incollare il codice della pipeline seguente. Quindi, sviluppare pipeline dichiarative di Lakeflow Spark.
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
Annotazioni
Per usare invece scD Type 1, impostare stored_as_scd_type=1.
Dopo l'elaborazione snapshot_1.csv, la tabella di destinazione contiene i record seguenti:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | nullo |
| 2 | Monterrey | 1 | nullo |
| 3 | Tijuana | 1 | nullo |
Dopo l'elaborazione snapshot_2.csv, la tabella di destinazione contiene i record seguenti:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | Monterrey | 1 | 2 |
| 2 | Carmel | 2 | nullo |
| 3 | Tijuana | 1 | 2 |
| 3 | Los Angeles | 2 | nullo |
| 4 | Valle della Morte | 2 | nullo |
Annotazioni
Tenere presente che, per scD Type 1, la tabella è esattamente simile allo snapshot più recente. La differenza è che le query downstream possono usare il feed di modifiche per elaborare solo i record modificati.
Passaggio 3: Aggiungere nuovi snapshot
Aggiungere un nuovo file CSV al percorso di archiviazione con dati modificati( ad esempio, valori città modificati, nuove righe o righe rimosse). Eseguire quindi di nuovo la pipeline per elaborare il nuovo snapshot.
Limitazioni
- La colonna di sequenziazione deve essere un tipo di dati ordinabile.
NULLI valori di sequenziazione non sono supportati. -
AUTO CDC FROM SNAPSHOTè supportato solo nell'interfaccia della pipeline Python; l'interfaccia SQL non è supportata.
Risorse aggiuntive
- Acquisizione delle modifiche dei dati e istantanee: Scopri i concetti di CDC, le istantanee e i tipi SCD.
-
Replicare una tabella RDBMS esterna usando
AUTO CDC: Scopri come effettuare l'idratazione iniziale con unonceflusso e quindi continuare a elaborare le modifiche. - Argomenti avanzati di AUTO CDC: informazioni sulle operazioni di modifica sulle destinazioni AUTO CDC, la lettura dei feed di dati delle modifiche e l'elaborazione delle metriche.
- Esercitazione: Creare una pipeline ETL usando Change Data Capture