Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Este artigo discute a utilização de foreachBatch com o Structured Streaming para gravar a saída de uma consulta de streaming em fontes de dados que não têm um sink de streaming existente.
O padrão streamingDF.writeStream.foreachBatch(...) de código permite que você aplique funções de lote aos dados de saída de cada microlote da consulta de streaming. As funções usadas com foreachBatch têm dois parâmetros:
- Um DataFrame que tem os dados de saída de um microlote.
- A ID exclusiva do microlote.
Você deve usar foreachBatch para operações de mesclagem Delta Lake no Structured Streaming. Consulte Upsert de consultas de streaming usando foreachBatch.
Aplicar operações adicionais do DataFrame
Muitas operações de DataFrame e Dataset não são suportadas em DataFrames de streaming porque a Spark não suporta a geração de planos incrementais nesses casos. Usando foreachBatch(), é possível aplicar algumas destas operações em cada saída de micro-lote. Por exemplo, você pode usar foreachBatch() e a operação SQL MERGE INTO para gravar a saída de agregações de streaming em uma tabela Delta no modo de atualização. Veja mais detalhes em MERGE INTO.
Importante
-
foreachBatch()fornece apenas garantias de escrita pelo menos uma vez. No entanto, você pode usar obatchIdfornecido para a função como forma de desduplicar a saída e obter uma garantia exata uma vez. Em ambos os casos, você mesmo terá que raciocinar sobre a semântica de ponta a ponta. -
foreachBatch()não funciona com o modo de processamento contínuo, pois depende fundamentalmente da execução em microlote de uma consulta de streaming. Se você gravar dados no modo contínuo, useforeach()em vez disso. - Ao usar
foreachBatchcom um operador com estado, é importante consumir completamente cada lote antes de concluir o processamento. Consulte Consumir completamente cada DataFrame em lote
Um dataframe vazio pode ser invocado com foreachBatch(), e o código do utilizador precisa ser resiliente para permitir uma operação adequada. Pode ver um exemplo aqui:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Alterações de comportamento para foreachBatch no Databricks Runtime 14.0
No Databricks Runtime 14.0 e superior na computação configurada com o modo de acesso padrão, aplicam-se as seguintes alterações de comportamento:
- Os comandos
print()escrevem a saída nos logs do driver. - Não é possível acessar o
dbutils.widgetssubmódulo dentro da função. - Todos os arquivos, módulos ou objetos referenciados na função devem ser serializáveis e estar disponíveis no Spark.
Reutilizar fontes de dados em lote existentes
Usando foreachBatch(), você pode usar gravadores de dados em lote existentes para destinos de dados que podem não ter suporte a Streaming Estruturado. Eis alguns exemplos:
Muitas outras fontes de dados em lote podem ser usadas a partir de foreachBatch(). Consulte Conectar-se a fontes de dados e serviços externos.
Gravar em vários locais
Se você precisar gravar a saída de uma consulta de streaming em vários locais, o Databricks recomenda o uso de vários gravadores de Streaming Estruturado para melhor paralelização e taxa de transferência.
O uso de foreachBatch para gravar em múltiplos destinos serializa a execução de gravações de streaming, o que pode aumentar a latência de cada microlote.
Se utilizar foreachBatch para gravar em várias tabelas Delta, consulte Gravações idempotentes em tabela em foreachBatch.
Consuma completamente cada DataFrame em lote
Quando estiver a utilizar operadores com estado (por exemplo, usando dropDuplicatesWithinWatermark), cada iteração em lote deve consumir todo o DataFrame ou reiniciar a consulta. Se você não consumir o DataFrame inteiro, a consulta de streaming falhará com o próximo lote.
Isso pode acontecer em vários casos. Os exemplos a seguir mostram como corrigir consultas que não consomem corretamente um DataFrame.
Utilização intencional de um subconjunto do lote
Se você se preocupa apenas com um subconjunto do lote, você pode ter um código como o seguinte.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Neste caso, o batch_df.show(2) apenas manipula os dois primeiros itens do lote, o que é esperado, mas, se houver mais itens, estes devem ser consumidos. O código a seguir consome o DataFrame completo.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Aqui, a do_nothing função ignora silenciosamente o resto do DataFrame.
Tratamento de um erro num lote
Para o tratamento de erros em foreachBatch, a Databricks recomenda que permita que a consulta de streaming falhe rapidamente e que, em vez disso, confie na camada de orquestração, como Lakeflow Jobs ou Apache Airflow, para gerir a lógica de repetição. Isto é muito mais seguro do que construir ciclos complexos de retentativa no seu código, onde pode ocorrer perda de dados.
Aqui estão as diretrizes baseadas no seu objetivo de escrita.
| Target | Exemplos | Orientações |
|---|---|---|
| Operações DataFrame | Tabelas do Delta Lake | Deve usar as opções de gravação txnAppId e txnVersion, ligando txnVersion a batchId, para garantir a idempotência e proteger a correção dos dados nas tentativas. Não apanhe e tente novamente as exceções localmente. Em vez disso, o Databricks recomenda que permita que os erros se propaguem para que as métricas do Spark se mantenham precisas, os dados não se dupliquem e o orquestrador possa tentar novamente o lote completo de forma limpa. |
| Código personalizado e destinos externos |
.collect(), bases de dados OLTP, filas de mensagens, APIs |
Implementa a tua própria idempotência. Deve assumir que qualquer operação pode ser repetida e será repetida entre lotes. Se o batchId permanecer igual, o resultado da sua operação deverá permanecer o mesmo. Pode repetir erros puramente transitórios, como breves timeouts de ligação, mas tenha extremo cuidado para evitar escritas incompletas ou duplicadas caso a tentativa falhe. A abordagem mais segura é deixar que os erros se propaguem e permitir que o orquestrador tente novamente todo o lote. |
Aqui estão alguns exemplos de tipos de exceções e recomendações sobre como lidar com elas em foreachBatch:
| Tipo de exceção | Exemplos | Ação recomendada |
|---|---|---|
| Erros de sumidouro transitório |
SQLTransientConnectionException, HTTP 429, tempos limite |
Apanha: tentar novamente ou enviar para uma fila de letra morta |
| Violações de duplicados ou de restrições de chave quando o sumidouro é idempotente | SQLIntegrityConstraintViolationException |
Captura: registar e suprimir |
| Erros personalizados retentáveis | Exceções de sockets envoltas, erros de base de dados repetíveis | Problema: incrementar métricas e permitir continuação controlada |
| Erros lógicos ou de esquema |
NullPointerException, AttributeError, incompatibilidade de esquemas |
Propagar: deixar o Spark falhar a consulta |
| Erros de destino irrecuperáveis ou bugs lógicos não capturados |
ValueError, PermissionError |
Propagar: deixar o Spark falhar a consulta |
| Falhas críticas |
OutOfMemoryError, estado corrompido, violações da integridade dos dados |
Propagar: deixar o Spark falhar a consulta |
Exemplos de código: tratamento de exceções
Os exemplos seguintes geram intencionalmente um erro em foreach para demonstrar diferentes abordagens para lidar com o erro.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
O código acima trata e suprime silenciosamente o erro, podendo não consumir o resto do lote. Existem duas opções para lidar com esta situação.
Primeiro, podes relevantar o erro, que o passa para a tua camada de orquestração para reprocessar o lote. Isto pode resolver o erro, se for um problema transitório, ou levantá-lo para a sua equipa de operações tentar corrigir manualmente. Para fazer isso, altere o partial_func código para ter esta aparência:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Em segundo lugar, se quiseres capturar a exceção e ignorar o resto do lote, podes alterar o código para usar a função do_nothing para ignorar silenciosamente o restante.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()