Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
A create_auto_cdc_flow() função cria um fluxo que usa a funcionalidade CDC (captura de dados de mudanças) do Lakeflow Spark Declarative Pipelines para processar dados de origem de um feed de dados de alteração (CDF).
Observação
Esta função substitui a função apply_changes()anterior. As duas funções têm a mesma assinatura. O Databricks recomenda a atualização para usar o novo nome.
Importante
Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o create_auto_cdc_flow() esquema da tabela de destino, você deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que os sequence_by campos.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do pipeline.
Sintaxe
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Para create_auto_cdc_flow processamento, o comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com o apply_as_deletes parâmetro.
Para saber mais sobre o processamento de CDC com um feed de alterações, consulte As APIs AUTO CDC: Simplifique a captura de dados de alterações com pipelines. Para um exemplo de utilização da create_auto_cdc_flow() função, veja exemplos de AUTO CDC.
Parâmetros
| Parâmetro | Tipo | Description |
|---|---|---|
target |
str |
Required. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a create_auto_cdc_flow() função. |
source |
str |
Required. A fonte de dados que contém os registros do CDC. |
keys |
list |
Required. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
|
sequence_by |
str, col() ou struct() |
Required. Os nomes das colunas que especificam a ordem lógica dos eventos CDC nos dados fonte. O Lakeflow Spark Declarative Pipelines usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem. A coluna especificada deve ser um tipo de dados classificável. Você pode especificar:
|
ignore_null_updates |
bool |
Controla como null os valores nas atualizações recebidas do CDC são tratados. Quando ignore_null_updates é True, null as colunas numa atualização recebida são ignoradas — o valor existente na linha alvo é preservado. Isto também se aplica a colunas aninhadas com null valores. Quando ignore_null_updates é False, null as colunas numa atualização recebida sobrescrevem valores existentes no destino.Definido para True quando os eventos de origem incluem apenas as colunas que mudaram, para que as colunas não alteradas não sejam sobrescrevidas por null.A predefinição é False. |
apply_as_deletes |
str ou expr() |
Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:
Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma marca de exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas lápides. O intervalo de retenção é definido por padrão para dois dias e pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds.Se usar o Auto Loader como fonte para o seu pipeline CDC, o Auto Loader não garante a ordem do processamento dos ficheiros. Para detalhes, consulte Lidar com dados fora de ordem. Definido pipelines.cdc.tombstoneGCThresholdInSeconds para um valor que excede o atraso máximo esperado entre a chegada do evento e a execução do pipeline. Isto garante que as lápides de eliminação sejam mantidas tempo suficiente para lidar corretamente com eventos de eliminação que chegam tarde ou fora de ordem. |
apply_as_truncates |
str ou expr() |
Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Você pode especificar:
Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade. O apply_as_truncates parâmetro é suportado apenas para SCD tipo 1. SCD tipo 2 não suporta operações de truncamento. |
column_list ou except_column_list |
list |
Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função. |
stored_as_scd_type |
str ou int |
Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O padrão é SCD tipo 1. |
track_history_column_list ou track_history_except_column_list |
list |
Um subconjunto de colunas de saída a serem acompanhadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do rastreamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função. |
name |
str |
O nome do fluxo. Se não for fornecido, será usado o mesmo valor que target. |
once |
bool |
Opcionalmente, defina o fluxo como um fluxo único, como um backfill. O uso once=True altera o fluxo de duas maneiras:
|