Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Vanliga frågor och svar om hur du använder Kafka med Azure Databricks.
Varför får jag ett felmeddelande om att ett Kafka-alternativ inte stöds eller inte känns igen?
Ett vanligt misstag är att glömma prefixet kafka. när du ställer in Kafka-inbyggda konfigurationsalternativ. Alla alternativ som skickas direkt till Kafka-klienten måste vara prefixet med kafka.:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Alternativ som är specifika för Spark Kafka-anslutningsappen (till exempel subscribe, startingOffsets, maxOffsetsPerTrigger) behöver inte prefixet. Se Alternativ för den fullständiga listan.
Varför får jag ett felmeddelande om skuggade Kafka-klasser?
Azure Databricks kräver att du använder skuggade Kafka-klasser (prefix med kafkashaded. eller shadedmskiam.). Om du ser fel som RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDmåste du använda de skuggade klassnamnen:
-
org.apache.kafka.*klasser kräver prefixetkafkashaded.. Till exempel:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule -
software.amazon.msk.*klasser kräver prefixetshadedmskiam.. Till exempel:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Varför får jag en TimeoutException när jag ansluter till Kafka?
Vanliga orsaker inkluderar:
- Nätverksanslutning: Beräkningsklustret kan inte nå Kafka-koordinatorerna. Kontrollera brandväggsregler, säkerhetsgrupper och VPC-konfigurationer.
-
Fel bootstrap-servrar: Kontrollera att
kafka.bootstrap.serversvärdnamnet och porten är korrekta. - DNS-upplösning: Kontrollera att värdnamnen för Kafka-koordinatorn kan lösas från Azure Databricks' nätverk.
- SSL/TLS-problem: Om du använder SSL kontrollerar du att certifikaten är korrekt konfigurerade.
För Private Link- eller VPC-peeringkonfigurationer kontrollerar du att rätt nätverksvägar finns på plats.
Ska jag använda batch- eller strömningsläge för Kafka?
Det beror på ditt användningsfall:
-
Strömningsläge (
spark.readStream): Använd när du behöver kontinuerlig databehandling eller inmatning med låg latens. -
Batchläge (
spark.read): Använd för engångsdatainläsningar, återfyllnad eller felsökning. Kräver bådestartingOffsetsochendingOffsets.
Mer information om hur du konfigurerar utlösarintervall som , AvailableNowoch ProcessingTime finns i Konfigurera intervall för strukturerad direktuppspelning.
Kan jag läsa från flera Kafka-ämnen i en enda ström?
Ja, du kan använda:
-
subscribe: Ange en kommaavgränsad lista med ämnen, till exempel.option("subscribe", "topic1,topic2"). -
subscribePattern: Använd ett Java regex-mönster för att matcha ämnesnamn, till exempel.option("subscribePattern", "topic-.*").
Hur använder jag Kafka med Lakeflow Spark deklarativa pipelines?
Lakeflow Spark Deklarativa Pipelines ger inbyggt stöd för Kafka-källor. Du kan definiera en strömmande tabell som läser från Kafka:
Python
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Mer information om strömmande källor i Lakeflow Spark Deklarativa Pipelines finns i Ladda data i pipelines.
Hur deserialiserar jag Kafka-nyckel- och värdekolumnerna?
Kolumnerna key och value returneras som binär (BINARY typ). Använd DataFrame-åtgärder för att deserialisera dem baserat på ditt dataformat:
-
Strängdata: Använd
cast("string")för att konvertera binär till sträng. -
JSON-data: Använd
from_json()efter konvertering till sträng. Sefrom_jsonfunktion. -
Avro-data: Används
from_avro()för att deserialisera Avro-kodade data. Se Att läsa och skriva strömmande Avro-data. -
Protocol buffers: Använd
from_protobuf()för att deserialisera protobufdata. Se Läsa och skriva protokollbuffertar.
Varför får jag ett idempotent skrivfel?
Databricks Runtime 13.3 LTS och senare innehåller en nyare version av kafka-clients biblioteket som aktiverar idempotent-skrivningar som standard. Om kafka-klustret använder version 2.8.0 eller senare med ACL:er konfigurerade men utan IDEMPOTENT_WRITE aktiverad misslyckas skrivning med: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Lös det här felet genom att uppgradera till Kafka version 2.8.0 eller senare, eller genom att ange .option("kafka.enable.idempotence", "false") när du konfigurerar din structured streaming-skrivare.
Vad är KAFKA_DATA_LOSS_ERROR och hur löser jag det?
Det här felet uppstår när Kafka-källan identifierar att offsets som lagras i kontrollpunkten inte längre är tillgängliga i Kafka, vanligtvis på grund av:
- Strömmen var pausad längre än den kvarhållningstid som Kafka tillåter.
- Kafka-ämnesdata togs bort eller så återskapades ämnet.
- Kafka Broker upplevde dataförlust.
Så här löser du följande:
-
Om dataförlusten är acceptabel: Ställ in
.option("failOnDataLoss", "false")för att tillåta att strömmen fortsätter från den tidigaste tillgängliga förskjutningen. -
Om dataförlust inte är acceptabelt: Återställ kontrollpunkten och bearbeta om från
earliestoffsets eller återskapa saknade Kafka-data.
Mer information finns i KAFKA_DATA_LOSS felvillkor .
Hur styr jag den hastighet som data läses från i Kafka?
Använd alternativet maxOffsetsPerTrigger för att begränsa antalet offsets (ungefär antalet poster) som bearbetas per mikro-batch. Detta hjälper till att förhindra stora batchar som kan överbelasta nedströmsbearbetningen eller orsaka minnesproblem när man hanterar en eftersläpning.
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Du kan också använda alternativ som minPartitions eller maxRecordsPerPartition för att styra hur många Spark-partitioner som skapas för varje batch.
Hur kan jag övervaka hur långt efter strömmen ligger jämfört med de senaste Kafka-offsetar?
Använd måtten avgOffsetsBehindLatest, maxOffsetsBehindLatest, och minOffsetsBehindLatest som är tillgängliga i framsteg för strömmande frågor. Den här rapporten visar hur många förskjutningar som ligger bakom den senaste tillgängliga förskjutningen av dataströmmen i alla prenumerationsavsnittspartitioner. Se Övervaka strukturerade strömningsfrågor på Azure Databricks.
Du kan också använda estimatedTotalBytesBehindLatest för att uppskatta de totala byteen av data som inte har bearbetats ännu.
Varför visar mina mått av Kafka-offset-fördröjning beständiga icke-nollvärden efter uppgradering till Databricks Runtime 17.1?
I Databricks Runtime 17.1 och senare hämtas de senaste Kafka-förskjutningarna när varje mikrobatch har slutförts. I ämnen som kontinuerligt tar emot data kan kvarvarande mått visa små, beständiga värden som inte är noll. Detta är ett förväntat beteende och indikerar inte att strömmen ligger efter.
I Databricks Runtime 17.0 och senare hämtas de senaste Kafka-förskjutningarna vid starttid för mikrobatch. Kvarvarande mått kan returneras 0 när strömmande frågor konsekvent använder alla poster som är tillgängliga i början av mikrobatchen.
Om värdena är stora eller växer kontinuerligt kanske strömmen inte håller jämna steg med inkommande data. Se Övervaka strukturerade strömningsfrågor på Azure Databricks.
Varför går initieringen av Kafka-strömmen långsamt?
Kafka-strömmar kräver tid för att:
- Anslut till Kafka-klustret och hämta metadata.
- Identifiera ämnespartitioner.
- Hämta inledande offsetvärden.
För lokala eller fjärranslutna Kafka-kluster kan nätverksfördröjningen avsevärt påverka initieringstiden. Om du kör utlösta/schemalagda pipelines med frekventa omstarter bör du överväga att använda läget för kontinuerlig strömning för att undvika upprepade initieringskostnader.
Varför ökar inte fler Spark-utförare mitt Kafka-dataflöde?
När Kafka-mäklarna har mättats ökar kostnaden utan att öka dataflödet genom att lägga till fler Spark-utförare.
Tecken på att Kafka är flaskhalsen:
- Genomströmningen når en platå trots tillägget av fler kärnor.
- Kafka Broker CPU eller nätverksanvändning är hög.
- Spark-uppgifter slutförs snabbt men väntar på nya data.
Lös detta genom att skala Kafka-klustret genom att lägga till fler mäklare eller öka antalet partitioner för att distribuera belastningen.
Hur kan jag optimera kostnads- och beräkningsanvändningen för Kafka-strömning?
För mikrobatch- och AvailableNow-lägen:
- Rätt storlek på klustret: Övervaka mått och ange en lämplig fast klusterstorlek för högsta belastning.
-
Använd
maxOffsetsPerTrigger: Begränsa batchstorlekar för att styra resursanvändningen vid belastningstoppar. - Undvik automatisk skalning: Direktuppspelningsjobb körs kontinuerligt, och om du lägger till eller tar bort noder blir uppgiften ombalanserad.
-
Minska datasnedvridning: Skeva partitioner gör att vissa uppgifter bearbetar betydligt mer data än andra, vilket leder till stragglers som saktar ner den övergripande batchens slutförande och slösar beräkningsresurser på inaktiva uppgifter. Använd alternativet
minPartitionsför att dela upp stora Kafka-partitioner i mindre Spark-partitioner för mer balanserad bearbetning.
För realtidsläge är rätt dimensionering särskilt viktigt eftersom aktiviteter kan vara inaktiva i väntan på data. Viktiga överväganden:
- Ange
maxPartitionsså att varje uppgift hanterar flera Kafka-partitioner för att minska kostnaderna. - Justera
spark.sql.shuffle.partitionsför jobb med omfattande datashuffling.
Mer information om hur du ändrar storlek på kluster för realtidsläge finns i Beräkningsstorlek .
Varför returnerar min dataström inga poster trots att det finns data i ämnet?
Vanliga orsaker inkluderar:
-
Fel
startingOffsetsinställning: Standardvärdet ärlatest, som bara läser nya data som kommer när strömmen startar. Ställ instartingOffsetspåearliestför att läsa befintliga data. - Fel ämnesnamn: Kontrollera att du prenumererar på rätt ämne.
- Autentiseringsproblem: Strömmen kan ha anslutits men saknar behörighet att läsa från ämnet. Kontrollera dina Kafka-ACL:er.
-
Förfallodatum för förskjutningar: Om strömmen har stoppats under en längre tid och förskjutningarna i kontrollpunkten har gått ut (har tagits bort av Kafka-kvarhållning) kan du behöva återställa kontrollpunkten eller kanske justera
failOnDataLoss.