Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Veelgestelde vragen over het gebruik van Kafka met Azure Databricks.
Waarom krijg ik een foutmelding dat een Kafka-optie niet wordt ondersteund of niet wordt herkend?
Een veelvoorkomende fout is het vergeten van het voorvoegsel bij het kafka. instellen van systeemeigen kafka-configuratieopties. Alle opties die rechtstreeks aan de Kafka-client worden doorgegeven, moeten worden voorafgegaan door kafka.:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Opties die specifiek zijn voor de Spark Kafka-connector (zoals subscribe, startingOffsets, maxOffsetsPerTrigger) hebben het voorvoegsel niet nodig. Zie Opties voor de volledige lijst.
Waarom krijg ik een foutmelding over afgeschermde Kafka-klassen?
Azure Databricks vereist het gebruik van gearceerde Kafka-klassen (voorafgegaan door kafkashaded. of shadedmskiam.). Als u fouten ziet zoals RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, moet u gebruik maken van de gearceerde klassenamen.
-
org.apache.kafka.*voor klassen is hetkafkashaded.voorvoegsel vereist. Bijvoorbeeld:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule -
software.amazon.msk.*voor klassen is hetshadedmskiam.voorvoegsel vereist. Bijvoorbeeld:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Waarom krijg ik een TimeoutException foutmelding bij het verbinden met Kafka?
Veelvoorkomende oorzaken zijn onder andere:
- Netwerkverbinding: het rekencluster kan de Kafka-brokers niet bereiken. Controleer de firewallregels, beveiligingsgroepen en VPC-configuraties.
-
Onjuiste bootstrapservers: Verifieer of de
kafka.bootstrap.servershostnaam en poort correct zijn. - DNS-resolutie: Zorg ervoor dat de Kafka-brokerhostnamen kunnen worden omgezet vanuit het Azure Databricks-netwerk.
- SSL/TLS-problemen: als u SSL gebruikt, controleert u of certificaten correct zijn geconfigureerd.
Voor Private Link- of VPC-peering-instellingen moet u ervoor zorgen dat de juiste netwerkroutes aanwezig zijn.
Moet ik de batch- of streamingmodus voor Kafka gebruiken?
Dit hangt af van uw use-case:
-
Streamingmodus (
spark.readStream): Gebruik deze modus wanneer u continue gegevensverwerking of opname met lage latentie nodig hebt. -
Batchmodus (
spark.read): Gebruiken voor eenmalige gegevensbelastingen, backfills of foutopsporing. Vereist zowelstartingOffsetsalsendingOffsets.
Zie Triggerintervallen voor gestructureerd streamen configureren voor meer informatie over het configureren van triggerintervallen, zoals AvailableNow, ProcessingTimeen de realtimemodus.
Kan ik vanuit meerdere Kafka-onderwerpen in één stream lezen?
Ja, u kunt het volgende gebruiken:
-
subscribe: Geef een door komma's gescheiden lijst met onderwerpen op, bijvoorbeeld.option("subscribe", "topic1,topic2"). -
subscribePattern: gebruik een Java regex-patroon om onderwerpnamen te vinden, bijvoorbeeld.option("subscribePattern", "topic-.*").
Hoe gebruik ik Kafka met Lakeflow Spark-declaratieve pijplijnen?
Lakeflow Spark-declaratieve pijplijnen biedt systeemeigen ondersteuning voor Kafka-bronnen. U kunt een streamingtabel definiëren die wordt gelezen uit Kafka:
Python
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Zie Gegevens laden in pijplijnen voor meer informatie over streamingbronnen in Lakeflow Spark-declaratieve pijplijnen.
Hoe de kafka-sleutel en waardekolommen deserialiseren?
De key en value kolommen worden geretourneerd als binair (BINARY type). DataFrame-bewerkingen gebruiken om ze te deserialiseren op basis van uw gegevensindeling:
-
Tekenreeksgegevens: hiermee
cast("string")converteert u een binair bestand naar een tekenreeks. -
JSON-gegevens: Gebruik
from_json()na het casten naar string. Ziefrom_jsonde functie. -
Avro-gegevens: gebruiken
from_avro()om avro-gecodeerde gegevens te deserialiseren. Zie Lezen en schrijven van streaming Avro-gegevens. -
Protocolbuffers: gebruiken
from_protobuf()om protobuf-gegevens te deserialiseren. Zie Buffers voor lees- en schrijfprotocollen.
Waarom krijg ik een idempotent schrijffout?
Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als uw Kafka-cluster versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd maar zonder IDEMPOTENT_WRITE ingeschakeld, mislukt de schrijfbewerking met: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
Los deze fout op door een upgrade uit te voeren naar Kafka versie 2.8.0 of hoger, of door de instelling .option("kafka.enable.idempotence", "false") tijdens het configureren van uw Structured Streaming Writer.
Wat is KAFKA_DATA_LOSS_ERROR en hoe kan ik dit oplossen?
Deze fout treedt op wanneer de Kafka-bron detecteert dat offsets die zijn opgeslagen in het controlepunt niet meer beschikbaar zijn in Kafka, meestal omdat:
- De stream was langer gepauzeerd dan de Kafka-retentieperiode.
- Kafka-onderwerpgegevens zijn verwijderd of het onderwerp is opnieuw gemaakt.
- Kafka Broker heeft gegevensverlies ervaren.
U lost dit als volgt op:
-
Als gegevensverlies acceptabel is: stel
.option("failOnDataLoss", "false")in dat de stroom kan worden voortgezet vanaf de vroegste beschikbare offset. -
Als gegevensverlies niet acceptabel is: stel het controlepunt opnieuw in en verwerk de
earliestoffsets of herstel de ontbrekende Kafka-gegevens.
Zie KAFKA_DATA_LOSS foutvoorwaarde voor meer informatie.
Hoe kan ik de snelheid bepalen waarmee gegevens uit Kafka worden gelezen?
Gebruik de maxOffsetsPerTrigger optie om het aantal offsets (ongeveer het aantal records) dat per microbatch wordt verwerkt, te beperken. Dit helpt grote batches te voorkomen die downstreamverwerking kunnen overweldigen of geheugenproblemen kunnen veroorzaken bij het inhalen van een achterstand.
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
U kunt ook opties gebruiken zoals minPartitions of maxRecordsPerPartition bepalen hoeveel Spark-partities voor elke batch worden gemaakt.
Hoe kan ik controleren hoe ver achter mijn stream de meest recente Kafka-offsets liggen?
Gebruik de avgOffsetsBehindLatest, maxOffsetsBehindLatest, en minOffsetsBehindLatest metrics die beschikbaar zijn in de streaming query voortgang. Deze rapporten geven aan hoeveel offsets zich achter de nieuwste beschikbare offset bevinden in uw stream over alle geabonneerde onderwerp-partities. Zie Monitoring van Structured Streaming-queries op Azure Databricks.
U kunt ook estimatedTotalBytesBehindLatest gebruiken om de totale hoeveelheid bytes aan gegevens te schatten die nog niet verwerkt zijn.
Waarom tonen mijn kafka-offsetvertragingsgegevens permanente niet-nulwaarden na een upgrade naar Databricks Runtime 17.1?
In Databricks Runtime 17.1 en hoger worden de meest recente Kafka-offsets opgehaald nadat elke microbatch is voltooid. Voor onderwerpen die continu gegevens ontvangen, kunnen achterstandsmetrics kleine, constante niet-nulwaarden vertonen. Dit is verwacht gedrag en geeft niet aan dat de stroom achterloopt.
In Databricks Runtime 17.0 en lager worden de meest recente Kafka-offsets opgehaald op de begintijd van de microbatch. Achterstandsmetriek kan 0 retourneren wanneer streamingquery's consistent alle records verwerken die beschikbaar zijn aan het begin van een micro-batch.
Als waarden groter worden of continu groeien, kan de stroom misschien niet bijblijven met binnenkomende gegevens. Zie Monitoring van Structured Streaming-queries op Azure Databricks.
Waarom is de initialisatie van mijn Kafka-stream traag?
Kafka-streams vereisen tijd voor het volgende:
- Maak verbinding met het Kafka-cluster en haal metagegevens op.
- Onderwerppartities ontdekken.
- Haal initiële offsets op.
Voor on-premises of externe Kafka-clusters kan de netwerklatentie de initialisatietijd aanzienlijk beïnvloeden. Als u geactiveerde/geplande pijplijnen uitvoert met regelmatig opnieuw opstarten, kunt u overwegen om de modus voor continue streaming te gebruiken om herhaalde initialisatieoverhead te voorkomen.
Waarom verhoogt het toevoegen van meer Spark-uitvoerders mijn Kafka-doorvoer niet?
Zodra de Kafka-brokers verzadigd raken, verhoogt het toevoegen van meer Spark-uitvoerders de kosten zonder de doorvoer te verhogen.
Hiermee wordt aangegeven dat Kafka het knelpunt is:
- De doorvoer blijft gelijk, ondanks het toevoegen van meer kernen.
- Het CPU- of netwerkgebruik van Kafka Broker is hoog.
- Spark-taken worden snel voltooid, maar wachten op nieuwe gegevens.
U kunt dit oplossen door uw Kafka-cluster te schalen door brokers toe te voegen of het aantal partities te verhogen om de belasting te verdelen.
Hoe kan ik kosten- en rekengebruik optimaliseren voor Kafka-streaming?
Voor microbatch- en AvailableNow-modi:
- De juiste grootte van uw cluster bepalen: bewaak metrische gegevens en stel een geschikte vaste clustergrootte in voor piekbelasting.
-
Gebruik
maxOffsetsPerTrigger: Beperk batchgrootten om het resourcegebruik te beheren tijdens belastingpieken. - Voorkom automatisch schalen: streamingtaken worden continu uitgevoerd en het toevoegen of verwijderen van knooppunten zorgt ervoor dat de overhead voor taken opnieuw wordt verdeeld.
-
Verminder de scheefheid van gegevens: scheefgetrokken partities zorgen ervoor dat sommige taken aanzienlijk meer gegevens verwerken dan andere, wat leidt tot achterblijvende taken die de algehele voltooiing van de batch vertragen en rekenresources verspillen aan niet-actieve taken. Gebruik de
minPartitionsoptie om grote Kafka-partities te splitsen in kleinere Spark-partities voor meer evenwichtige verwerking.
Voor de realtimemodus is de juiste grootte vooral belangrijk omdat taken inactief kunnen blijven terwijl ze wachten op gegevens. Belangrijke overwegingen:
- Stel
maxPartitionszo in dat elke taak meerdere Kafka-partities verwerkt om de overhead te verminderen. - Stem
spark.sql.shuffle.partitionsaf op taken die veel gegevensverplaatsingen vereisen.
Zie Compute-grootte voor hulp bij het aanpassen van de grootte van clusters voor realtimemodus.
Waarom retourneert mijn stream geen records, ook al bestaan er gegevens in het onderwerp?
Veelvoorkomende oorzaken zijn onder andere:
-
Verkeerde
startingOffsetsinstelling: de standaardwaarde islatest, die alleen nieuwe gegevens leest die binnenkomen nadat de stream is gestart. StelstartingOffsetsin opearliestom bestaande gegevens te lezen. - Verkeerde onderwerpnaam: Controleer of u zich abonneert op het juiste onderwerp.
- Verificatieproblemen: uw stream is mogelijk succesvol verbonden, maar beschikt niet over machtigingen om van de topic te lezen. Controleer uw Kafka-ACL's.
-
Verlooptijd van offset: Als uw stream lang is gestopt en de verschuivingen in het controlepunt zijn verlopen (verwijderd door Kafka-retentie), moet u mogelijk het controlepunt opnieuw instellen of aanpassen
failOnDataLoss.