Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página descreve como usar tabelas Delta como fontes e coletores para Spark Structured Streaming com readStream e writeStream. O Delta Lake resolve problemas comuns de desempenho e confiabilidade para sistemas de streaming e arquivos. Os benefícios incluem:
- Agrupar pequenos arquivos produzidos pela captura de baixa latência e melhorar o desempenho.
- Mantenha o processamento do tipo "exatamente uma vez" com mais de um fluxo (ou trabalhos em lote simultâneos).
- Descubra com eficiência novos arquivos ao usar arquivos como uma fonte de fluxo.
Para saber como carregar dados usando tabelas de streaming no Databricks SQL, consulte Usar tabelas de streaming no Databricks SQL.
Para obter junções estáticas de fluxo com o Delta Lake, consulte junções estáticas do Stream.
Usar tabelas Delta como um coletor
Você pode gravar dados em uma tabela Delta usando o Streaming Estruturado. O log de transações do Delta Lake garante o processamento exatamente uma única vez, mesmo quando há outros fluxos de dados ou consultas em lote executando simultaneamente na mesma tabela.
Ao gravar em uma tabela Delta usando um coletor de Streaming Estruturado, você poderá ver confirmações vazias com epochId = -1. Eles são esperados e normalmente ocorrem:
- No primeiro lote de cada execução da consulta de streaming (isso acontece em todos os lotes para
Trigger.AvailableNow) - Quando um esquema é alterado (como adicionar uma coluna).
Essas confirmações vazias são intencionais e não indicam um erro. Elas não afetam a correção ou o desempenho da consulta de forma significativa.
Note
A função VACUUM do Delta Lake remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretório, como <table-name>/_checkpoints.
Monitorar a lista de pendências com métricas
Use as seguintes métricas para monitorar a lista de pendências de um processo de consulta de streaming:
-
numBytesOutstanding: número de bytes ainda a serem processados na lista de pendências. -
numFilesOutstanding: número de arquivos ainda a serem processados na lista de pendências. -
numNewListedFiles: número de arquivos Delta Lake listados para calcular a lista de pendências deste lote. -
backlogEndOffset: a versão da tabela Delta usada para calcular a lista de pendências.
Em um notebook, exiba essas métricas na guia Dados Brutos no painel de progresso da consulta de streaming:
{
"sources": [
{
"description": "DeltaSource[file:/path/to/source]",
"metrics": {
"numBytesOutstanding": "3456",
"numFilesOutstanding": "8"
}
}
]
}
Modo de acréscimo
Por padrão, os fluxos são executados no modo de acréscimo e só adicionam novos registros à tabela.
Use o toTable método ao transmitir para tabelas:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Modo completo
Utilize o Structured Streaming com o modo completo para substituir toda a tabela após cada lote. Por exemplo, você pode atualizar continuamente uma tabela de resumo agregada de eventos por cliente:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
Para aplicativos sem requisitos estritos de latência, você pode salvar recursos e custos de computação com gatilhos únicos, como AvailableNow. Por exemplo, use este disparador para atualizar as tabelas de agregação de resumo em um agendamento determinado, processando apenas os novos dados que chegaram desde a última atualização. Consulte AvailableNow: Processamento em lote incremental.
Manipular alterações nas tabelas Delta de origem
O Structured Streaming processa incrementalmente tabelas Delta. Quando uma consulta de streaming lê de uma tabela Delta, novos registros são processados de forma idempotente à medida que novas versões de tabela são confirmadas na tabela de origem. O Structured Streaming aceita apenas entradas de adição e lança uma exceção se ocorrerem modificações na tabela Delta de origem. Por exemplo, se uma UPDATEDELETE, , MERGE INTOou OVERWRITE operação modificar uma tabela Delta de origem que é lida por uma consulta de streaming, o fluxo falhará com um erro.
Há quatro abordagens típicas para lidar com alterações upstream nas tabelas Delta de origem, dependendo do seu caso de uso. Uma tabela de referência e detalhes sobre cada uma delas são fornecidos abaixo:
| Abordagem | Vantagens | Cons |
|---|---|---|
skipChangeCommits |
Simples, não exige que você escreva uma lógica complexa. Útil para processamento apenas de acréscimo, em que as alterações upstream são tratadas separadamente, ou para tratamento temporário de um registro problemático. | Não propaga as alterações e apenas processa acréscimos. |
| Atualização completa | Além disso, simples, não exige que você escreva uma lógica complexa. Útil para pequenos conjuntos de dados com alterações upstream raras. | Caro para conjuntos de dados grandes. Requer o reprocessamento de todas as tabelas downstream. |
| Alterar feed de dados | Processar todos os tipos de alteração (inserções, atualizações e exclusões). O Databricks recomenda o streaming do feed CDC de uma tabela Delta em vez de diretamente da tabela sempre que possível. | Exige que você escreva uma lógica mais complexa para lidar com cada tipo de alteração. |
| Visões materializadas | Alternativa simples ao Streaming Estruturado que tem propagação de alteração automática. | Latência maior. Disponível apenas no Lakeflow Spark Declarative Pipelines e no Databricks SQL. |
Ignorar confirmações de alteração upstream com skipChangeCommits
Defina skipChangeCommits para ignorar transações que excluam ou modificam registros existentes e processem somente acréscimos. Isso é útil quando as alterações nos dados existentes não precisam ser propagadas por meio do fluxo ou quando você prefere uma lógica separada para lidar com essas alterações. Você pode ativar e desativar skipChangeCommits se precisar ignorar temporariamente as alterações pontuais.
Databricks recomenda o uso do skipChangeCommits para a maioria das cargas de trabalho que não usam feeds de alteração de dados.
Python
(spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
)
Scala
spark.readStream
.option("skipChangeCommits", "true")
.table("source_table")
Important
Se o esquema de uma tabela Delta for alterado depois que uma leitura de fluxo começar na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar o fluxo para resolver incompatibilidade de esquema e continuar o processamento.
No Databricks Runtime 12.2 LTS e abaixo, não é possível transmitir a partir de uma tabela Delta com o mapeamento de colunas ativado que tenha passado por uma evolução de esquema não aditiva, como renomear ou eliminar colunas. Para obter detalhes, consulte Mapeamento e streaming de colunas.
Note
No Databricks Runtime 12.2 LTS e superior, skipChangeCommits substitui ignoreChanges. No Databricks Runtime 11.3 LTS e inferior, ignoreChanges é a única opção suportada. Consulte a opção Legado: ignoreChanges para obter detalhes.
Opção herdada: ignoreDeletes
ignoreDeletes é uma opção legada que trata apenas de transações que excluem dados em limites de partição (ou seja, exclusões completas de partição). Se você precisar lidar com exclusões não particionais, atualizações ou outras modificações, use skipChangeCommits em vez disso.
Python
(spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
)
Scala
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Opção herdada: ignoreChanges
ignoreChanges está disponível no Databricks Runtime 11.3 LTS e inferior. No Databricks Runtime 12.2 LTS e superior, ele é substituído por skipChangeCommits.
Com ignoreChanges habilitado, arquivos de dados reescritos na tabela de origem são remetidos após uma operação de modificação de dados, como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. Linhas inalteradas geralmente são emitidas junto com novas linhas, portanto, os consumidores downstream devem ser capazes de lidar com duplicatas. As exclusões não são propagadas por downstream.
ignoreChanges tem precedência sobre ignoreDeletes.
Por outro lado, skipChangeCommits desconsidera totalmente as operações de alteração de arquivo. Arquivos de dados reescritos na tabela de origem devido a operações de modificação de dados, como UPDATE, MERGE INTO, DELETEe OVERWRITE são totalmente ignorados. Para refletir as alterações nas tabelas de origem do fluxo, você deve implementar uma lógica separada para propagar essas alterações.
O Databricks recomenda o uso skipChangeCommits para todas as novas cargas de trabalho. Para migrar uma carga de trabalho de ignoreChanges para skipChangeCommits, refatore sua lógica de streaming.
Atualização completa de tabelas downstream
Se as alterações upstream forem raras e os dados forem pequenos o suficiente para reprocessar, você poderá excluir o ponto de verificação de streaming e a tabela de saída e reiniciar o fluxo desde o início. Isso faz com que o fluxo reprocesse todos os dados da tabela de origem. Lembre-se de que essa abordagem também requer o reprocessamento de todas as tabelas downstream que dependem da saída desse fluxo.
Essa abordagem é mais adequada para conjuntos de dados menores ou cargas de trabalho em que as alterações upstream são pouco frequentes e o custo de uma atualização completa é aceitável.
Usar o feed de dados de alteração
Para cargas de trabalho que processam todos os tipos de alterações (inserções, atualizações e exclusões), use o feed de dados de alteração do Delta Lake. O feed de dados de alteração registra alterações no nível de linha em uma tabela Delta, permitindo que você transmita essas alterações e escreva a lógica para lidar com cada tipo de alteração em tabelas downstream. Essa é a abordagem mais robusta porque seu código lida explicitamente com todos os tipos de evento de alteração. Consulte Use o feed de dados de alterações do Delta Lake no Azure Databricks.
Se você estiver usando pipelines declarativos do Lakeflow Spark, consulte as APIs AUTO CDC: Simplificar a captura de dados de alteração com pipelines.
Important
No Databricks Runtime 12.2 LTS e abaixo, você não pode fazer streaming do feed de dados de alteração para uma tabela Delta que tem o mapeamento de colunas habilitado e sofreu uma evolução de esquema não aditiva, como o renomeio ou exclusão de colunas. Consulte o mapeamento e o streaming de colunas.
Usar visões materializadas
Exibições materializadas lidam automaticamente com alterações upstream recompilando resultados quando os dados de origem são alterados. Se você não precisar da menor latência possível e quiser evitar o gerenciamento da complexidade do streaming, uma exibição materializada poderá simplificar sua arquitetura. As exibições materializadas estão disponíveis nos pipelines do Lakeflow Spark Declarative Pipelines e no DATAbricks SQL. Confira Exibições materializadas.
Example
Por exemplo, suponha que você tenha uma tabela user_events com as colunas date, user_email e action que é particionada por date. Você transmite a saída da tabela user_events e precisa excluir dados dela devido à GDPR.
skipChangeCommits permite que você exclua dados em várias partições (neste exemplo, filtrando em user_email). Use a seguinte sintaxe:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Se você atualizar um user_email com a instrução UPDATE, o arquivo que contém o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.
O Databricks recomenda usar skipChangeCommits em vez de ignoreDeletes, a menos que você tenha certeza de que as exclusões são sempre quedas completas de partição.
Usar foreachBatch para gravações de tabela idempotente
Note
O Databricks recomenda configurar uma gravação de streaming separada para cada coletor que você quer atualizar em vez de usar foreachBatch. Escrever em vários receptores em foreachBatch reduz a paralelização e aumenta a latência geral, pois as gravações em várias tabelas são serializadas em foreachBatch.
As tabelas Delta oferecem suporte às seguintes opções DataFrameWriter para que as gravações em várias tabelas na foreachBatch sejam idempotentes:
-
txnAppId: uma cadeia de caracteres exclusiva que você pode passar em cada gravação DataFrame. Por exemplo, você pode usar a ID do StreamingQuery comotxnAppId.txnAppIdpode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário e não precisa estar relacionada à ID do fluxo. -
txnVersion: um número que aumenta de forma monotônica e atua como a versão da transação.
O Delta Lake usa txnAppId e txnVersion para identificar e ignorar gravações duplicadas. Por exemplo, depois que uma falha interrompe uma gravação em lote, você pode executar novamente o lote com os mesmos txnAppId e txnVersion para identificar corretamente e ignorar as duplicatas. Veja Usar o foreachBatch para gravar em coletores de dados arbitrários.
Warning
Se você excluir o ponto de verificação de streaming e reiniciar a consulta com um novo ponto de verificação, deverá fornecer um txnAppId diferente. Novos pontos de verificação começam com uma ID de lote de 0. O Delta Lake usa a ID e o txnAppId do lote como uma chave exclusiva e ignora lotes com valores já vistos.
O exemplo de código a seguir demonstra esse padrão:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}
Upsert de consultas de streaming usando foreachBatch
Você pode usar merge e foreachBatch para gravar upserts complexos de uma consulta de streaming em uma tabela Delta. Veja Usar o foreachBatch para gravar em coletores de dados arbitrários.
Essa abordagem tem muitos aplicativos:
- Melhore o desempenho de gravação com
updateo modo de saída, enquantocompleteo modo de saída requer a reescrita de toda a tabela de resultados para cada microbatch. - Aplique continuamente um fluxo de alterações a uma tabela Delta usando uma consulta de mesclagem para gravar dados de alteração.
foreachBatchConsulte SCD (dados de alteração lenta) e CDC (captura de dados de alteração) com Delta Lake. - Gerenciar a deduplicação durante o processamento de fluxo. Você pode usar uma consulta de mesclagem somente inserção para gravar continuamente dados em
foreachBatchuma tabela Delta com eliminação automática de duplicação. Consulte a eliminação de duplicação de dados ao gravar em tabelas Delta.
Note
Verifique se a instrução
mergedentro deforeachBatché idempotente. Caso contrário, as reinicializações da consulta de streaming podem aplicar a operação no mesmo lote de dados várias vezes. Consulte UsarforeachBatchpara gravações de tabela idempotente.Quando
mergeé usado emforeachBatch, a métrica da taxa de dados de entrada pode retornar um múltiplo da taxa real em que os dados são gerados na origem.mergelê dados de entrada várias vezes, o que multiplica as métricas. Para evitar a multiplicação das métricas, armazene o DataFrame no cache antes demergee, em seguida, remova-o do cache depois demerge.A taxa de dados de entrada está disponível por meio
StreamingQueryProgresse no grafo de taxa de streaming do notebook. Consulte Monitoring Structured Streaming consultas em Azure Databricks.
Por exemplo, você pode usar MERGE instruções SQL em foreachBatch:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Você também pode usar as APIs do Delta Lake para upserts de streaming:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Definir a versão inicial da tabela para processar alterações
Por padrão, os fluxos começam com a versão mais recente da tabela Delta disponível. Isso inclui um instantâneo completo da tabela naquele momento e todas as alterações futuras. O Databricks recomenda que você use a versão da tabela inicial padrão para a maioria das cargas de trabalho.
Opcionalmente, você pode usar as opções a seguir para especificar o ponto de partida da fonte de streaming delta lake sem processar a tabela inteira.
startingVersion: a versão da tabela Delta de onde começar a leitura. Todas as alterações de tabela confirmadas em ou após a versão especificada são lidas pelo fluxo de dados. Se a versão especificada não estiver disponível, o fluxo não será iniciado.Para localizar as versões de confirmação disponíveis, execute
DESCRIBE HISTORYe verifique oversion. Para retornar apenas as alterações mais recentes, especifiquelatest. Para obter informações sobre as versões da tabela Delta, consulte Trabalhar com o histórico de tabelas.startingTimestamp: o carimbo de data/hora do qual começar a ler. Todas as alterações de tabela confirmadas a partir do timestamp especificado são lidas pelo fluxo. Se o carimbo de data e hora fornecido preceder todas as confirmações de tabela, a leitura de streaming começará com o carimbo de data e hora mais antigo disponível. Defina uma das opções:- Uma cadeia de caracteres de um carimbo de data/hora. Por exemplo,
"2019-01-01T00:00:00.000Z". - Uma cadeia de caracteres de data. Por exemplo,
"2019-01-01".
- Uma cadeia de caracteres de um carimbo de data/hora. Por exemplo,
Você não pode definir ambos startingVersion e startingTimestamp ao mesmo tempo. Essas configurações se aplicam apenas a novas consultas de streaming. Se uma consulta de streaming tiver sido iniciada e o progresso tiver sido registrado em seu ponto de verificação, essas configurações serão ignoradas.
Important
Embora você possa iniciar a fonte de streaming de uma versão ou carimbo de data/hora especificado, o esquema da fonte de streaming é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão ou o carimbo de data/hora especificados. Caso contrário, a fonte de streaming poderá retornar resultados incorretos ao ler os dados com um esquema incorreto.
Example
Por exemplo, vamos supor que você tenha uma tabela user_events. Se você quiser ler as alterações desde a versão 5, use:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Se você quiser ler as alterações desde 18/10/2018, use:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Processar snapshot inicial sem descartar dados
Este recurso está disponível no Databricks Runtime 11.3 LTS e superior.
Em uma consulta de streaming com estado com uma marca d'água definida, processar arquivos por tempo de modificação pode processar registros na ordem errada. Isso pode fazer com que a marca d'água marque incorretamente registros como eventos tardios e os descarte. Isso só pode ocorrer quando o instantâneo Delta inicial é processado na ordem padrão.
Para fluxos com uma tabela de origem Delta, a consulta primeiro processa todos os dados presentes na tabela e cria uma versão chamada instantâneo inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, o último tempo de modificação não representa necessariamente a ordem de tempo do evento de registro.
Para evitar a perda de dados durante o processamento do instantâneo inicial, habilite a opção withEventTimeOrder.
withEventTimeOrder divide o intervalo de tempo do evento dos dados de instantâneo inicial em grupos de tempo. Cada microlote processa um bucket filtrando dados dentro da faixa de tempo. As maxFilesPerTrigger opções e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microlote, mas apenas aproximadamente devido à abordagem de processamento.
O diagrama a seguir mostra esse processo:
Restrições
- Você não poderá alterar
withEventTimeOrderse a consulta de fluxo tiver sido iniciada e o instantâneo inicial estiver sendo processado ativamente. Para reiniciar comwithEventTimeOrderalterado, é necessário excluir o ponto de verificação. - Até que o processamento do instantâneo inicial seja concluído, se
withEventTimeOrderestiver habilitado, você não poderá rebaixar um fluxo para uma versão do Databricks Runtime que não dê suporte a esse recurso. Para fazer downgrade, aguarde até que o instantâneo inicial seja concluído ou exclua o ponto de verificação e reinicie a consulta. - Não há suporte para esse recurso nos seguintes cenários:
- A coluna de tempo do evento é uma coluna gerada e há transformações de não projeção entre a origem Delta e a marca d'água.
- Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
Desempenho
Se withEventTimeOrder estiver habilitado, o desempenho inicial do processamento de instantâneos poderá ser mais lento. Cada microlote verifica o instantâneo inicial para filtrar dados dentro do intervalo de tempo de evento correspondente. Para melhorar o desempenho da filtragem:
- Use uma coluna de origem Delta como tempo do evento para que a técnica de pular dados possa ser aplicada. Consulte Ignorando dados.
- Particione a tabela ao longo da coluna de tempo do evento.
Use a interface do Spark para ver quantos arquivos Delta são escanados para um microlote específico.
Example
Suponha que você tenha uma tabela user_events com uma coluna event_time. Sua consulta de streaming é uma consulta de agregação. Se você quiser garantir que não haja nenhuma queda de dados durante o processamento de instantâneo inicial, poderá usar:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Você pode definir withEventTimeOrder com uma configuração do Spark no cluster para aplicá-la a todas as consultas de streaming: spark.databricks.delta.withEventTimeOrder.enabled true.
Limitar a taxa de entrada para melhorar o desempenho do processamento
Por padrão, o Streaming Estruturado processa o máximo de arquivos possível em cada microlote. Para limitar a quantidade de dados processados por lote e gerenciar o uso de memória, estabilizar a latência ou reduzir os custos de armazenamento em nuvem, use as seguintes opções:
-
maxFilesPerTrigger: o número de novos arquivos a serem considerados em cada microlote. O padrão é 1000. -
maxBytesPerTrigger: a quantidade de dados processada em cada microlote. Essa opção define um "soft max", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer a consulta de streaming avançar em casos em que a menor unidade de entrada for maior que esse limite. Isso não está definido por padrão.
Se você usar tanto maxBytesPerTrigger quanto maxFilesPerTrigger, o microlote processará dados até que o limite maxFilesPerTrigger ou o limite maxBytesPerTrigger seja atingido.
Note
Por padrão, se logRetentionDuration limpar as transações na tabela de origem e a consulta de streaming tentar processar essas versões, a consulta não conseguirá evitar a perda de dados. Você pode definir a opção failOnDataLoss como false para ignorar dados perdidos e continuar o processamento. Consulte Configurar retenção de dados para consultas de viagem no tempo.
Controlar o custo de armazenamento em nuvem
As consultas de streaming têm vários modos de gatilho disponíveis que permitem balancear o custo e a latência, incluindo processingTime, availableNowe realTime. Consulte o custo de armazenamento em nuvem do Control.