Compartilhar via


Configurar o modo em tempo real

Esta página descreve os pré-requisitos e a configuração necessários para executar consultas de modo em tempo real no Streaming Estruturado. Para obter um tutorial passo a passo, consulte Tutorial: Executar uma carga de trabalho de streaming em tempo real. Para obter informações conceituais sobre o modo em tempo real, consulte o modo em tempo real no Streaming Estruturado.

Pré-requisitos

Para usar o modo em tempo real, você deve configurar sua computação para atender aos seguintes requisitos:

  • Use o modo de acesso dedicado na computação clássica. Não há suporte para modo de acesso padrão, pipelines declarativos do Lakeflow Spark, e clusters sem servidor.
  • Utilize o Databricks Runtime 16.4 LTS ou versões posteriores.
  • Desativar o dimensionamento automático.
  • Desative o Photon.
  • Defina spark.databricks.streaming.realTimeMode.enabled como true.
  • Desative instâncias spot para evitar interrupções.

Para obter instruções sobre como criar e configurar a computação clássica, consulte a referência de configuração de computação.

Configuração de consulta

Para executar uma consulta no modo em tempo real, você deve habilitar o gatilho em tempo real. Os gatilhos em tempo real têm suporte apenas no modo de atualização.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Dimensionamento de computação

Você pode executar um trabalho em tempo real por recurso de computação se a computação tiver slots de tarefa suficientes.

Para ser executado no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.

Exemplos de cálculo de slots

Tipo de pipeline Configuração Slots necessários
Sem estado de estágio único (origem kafka + coletor) maxPartitions = 8 8 espaços
Estado com dois estágios (fonte Kafka + embaralhamento) maxPartitions = 8, partições de embaralhamento = 20 28 slots (8 + 20)
Três estágios (origem kafka + shuffle + repartition) maxPartitions = 8, duas fases de shuffle de 20 cada um 48 slots (8 + 20 + 20)

Se você não definir maxPartitions, use o número de partições no tópico Kafka.

Recursos adicionais