Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan beskriver de förutsättningar och konfigurationer som krävs för att köra frågor i realtidsläge i Strukturerad direktuppspelning. För en steg-för-steg-självstudie, se Självstudie: Kör en arbetsbelastning för realtidsströmning. Konceptuell information om realtidsläge finns i Realtidsläge i Strukturerad direktuppspelning.
Förutsättningar
Om du vill använda realtidsläge måste du konfigurera beräkningen så att den uppfyller följande krav:
- Använd klassisk beräkning. Dedikerade och standardåtkomstlägen stöds. Standardåtkomstläge stöds endast för Python. Lakeflow Spark Deklarativa pipelines och serverlösa kluster stöds inte.
- Använd Databricks Runtime 16.4 LTS och senare.
- Inaktivera autoskalning.
- Stäng av Foton.
- Ställ in
spark.databricks.streaming.realTimeMode.enabledpåtrue. - Inaktivera spotinstanser för att undvika avbrott.
För svarstidskänsliga arbetsbelastningar med UDF:er rekommenderar Databricks att du använder dedikerat åtkomstläge. Se Tabellfunktioner.
Anvisningar om hur du skapar och konfigurerar klassisk beräkning finns i Referens för beräkningskonfiguration.
Frågekonfiguration
Om du vill köra en fråga i realtidsläge måste du aktivera realtidsutlösaren. Realtidsutlösare stöds endast i uppdateringsläge.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Beräkningsstorlek
Du kan köra ett realtidsjobb per beräkningsresurs om resursen har tillräckligt med uppgiftsplatser.
Om du vill köra i läge med låg latens måste det totala antalet tillgängliga aktivitetsfack vara större än eller lika med antalet aktiviteter i alla frågesteg.
Exempel på platsberäkningar
| Typ av pipeline | Konfiguration | Nödvändiga fack |
|---|---|---|
| Tillståndslös i en fas (Kafka-källa + mottagare) |
maxPartitions = 8 |
8 platser |
| Tillståndskänsligt i två steg (Kafka-källa + shuffle) |
maxPartitions = 8, shuffle partitioner = 20 |
28 platser (8 + 20) |
| Tresteg (Kafka-källa + shuffle + ompartition) |
maxPartitions = 8, två blandningssteg på 20 vardera |
48 platser (8 + 20 + 20) |
Om du inte anger maxPartitions, använd antalet partitioner i Kafka-ämnet.