Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página descreve os pré-requisitos e a configuração necessários para executar consultas de modo em tempo real no Streaming Estruturado. Para obter um tutorial passo a passo, consulte Tutorial: Executar uma carga de trabalho de streaming em tempo real. Para obter informações conceituais sobre o modo em tempo real, consulte o modo em tempo real no Streaming Estruturado.
Pré-requisitos
Para usar o modo em tempo real, você deve configurar sua computação para atender aos seguintes requisitos:
- Use o modo de acesso dedicado na computação clássica. Não há suporte para modo de acesso padrão, pipelines declarativos do Lakeflow Spark, e clusters sem servidor.
- Utilize o Databricks Runtime 16.4 LTS ou versões posteriores.
- Desativar o dimensionamento automático.
- Desative o Photon.
- Defina
spark.databricks.streaming.realTimeMode.enabledcomotrue. - Desative instâncias spot para evitar interrupções.
Para obter instruções sobre como criar e configurar a computação clássica, consulte a referência de configuração de computação.
Configuração de consulta
Para executar uma consulta no modo em tempo real, você deve habilitar o gatilho em tempo real. Os gatilhos em tempo real têm suporte apenas no modo de atualização.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Dimensionamento de computação
Você pode executar um trabalho em tempo real por recurso de computação se a computação tiver slots de tarefa suficientes.
Para ser executado no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todos os estágios de consulta.
Exemplos de cálculo de slots
| Tipo de pipeline | Configuração | Slots necessários |
|---|---|---|
| Sem estado de estágio único (origem kafka + coletor) |
maxPartitions = 8 |
8 espaços |
| Estado com dois estágios (fonte Kafka + embaralhamento) |
maxPartitions = 8, partições de embaralhamento = 20 |
28 slots (8 + 20) |
| Três estágios (origem kafka + shuffle + repartition) |
maxPartitions = 8, duas fases de shuffle de 20 cada um |
48 slots (8 + 20 + 20) |
Se você não definir maxPartitions, use o número de partições no tópico Kafka.