Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo descrive come usare Apache Kafka come origine o sink durante l'esecuzione di carichi di lavoro Structured Streaming in Azure Databricks.
Per altre informazioni su Kafka, vedere la documentazione di Apache Kafka.
Leggere dati da Kafka
Azure Databricks fornisce la kafka parola chiave come formato dati per configurare le connessioni a Kafka. Di seguito è riportato un esempio per una lettura in streaming:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks supporta anche la semantica di lettura batch, come illustrato nell'esempio seguente:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Per il caricamento batch incrementale, Databricks consiglia di usare Kafka con Trigger.AvailableNow. Vedere AvailableNow: Elaborazione batch incrementale.
In Databricks Runtime 13.3 LTS e versioni successive, Azure Databricks fornisce anche una funzione SQL per la lettura dei dati Kafka. Lo streaming con SQL è supportato solo nelle pipeline dichiarative di Lakeflow Spark o con tabelle di streaming in Databricks SQL. Vedere read_kafka funzione con valori di tabella.
Configurare il lettore di streaming strutturato Kafka
L'opzione seguente deve essere impostata per l'origine Kafka sia per le query batch che per le query di streaming:
| Opzione | Valore | Descrizione |
|---|---|---|
kafka.bootstrap.servers |
Elenco delimitato da virgole di host:port | Server bootstrap del cluster Kafka |
Inoltre, è necessaria una delle opzioni seguenti per specificare gli argomenti da sottoscrivere:
| Opzione | Valore | Descrizione |
|---|---|---|
subscribe |
Elenco di argomenti delimitato da virgole. | Elenco di argomenti a cui sottoscrivere. |
subscribePattern |
Stringa di espressione regolare di Java. | Modello utilizzato per iscriversi agli argomento/i. |
assign |
Stringa JSON {"topicA":[0,1],"topic":[2,4]}. |
Partizioni di argomenti specifiche da utilizzare. |
Vedere la pagina Opzioni per l'elenco completo delle opzioni disponibili.
Schema dei Kafka record
I record restituiti dal lettore Kafka Structured Streaming avranno lo schema seguente:
| colonna | Tipo |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
key e value vengono sempre deserializzati come matrici di byte con ByteArrayDeserializer. Usare operazioni dataframe (ad esempio cast("string") o from_avro) per deserializzare in modo esplicito le chiavi e i valori.
Scrivere dati in Kafka
Di seguito è riportato un esempio di scrittura di streaming in Kafka:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks supporta anche la semantica di scrittura batch nei sink di dati Kafka, come illustrato nell'esempio seguente:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configurare lo scrittore di streaming strutturato Kafka
Importante
Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients che abilita le scritture idempotenti per impostazione predefinita. Se un sink Kafka usa la versione 2.8.0 o successiva con ACL configurati, ma senza IDEMPOTENT_WRITE abilitato, la scrittura non riesce con il messaggio di errore org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.
Risolvere questo errore eseguendo l'aggiornamento a Kafka versione 2.8.0 o successiva oppure impostando .option(“kafka.enable.idempotence”, “false”) durante la configurazione del writer Structured Streaming.
Di seguito sono riportate le opzioni comuni configurate durante la scrittura su Kafka:
| Opzione | Valore | Valore predefinito | Descrizione |
|---|---|---|---|
kafka.boostrap.servers |
Elenco delimitato da virgole di <host:port> |
none | [Necessario] Configurazione Kafka bootstrap.servers. |
topic |
STRING |
non impostato | [Facoltativo] Imposta l'argomento per tutte le righe da scrivere. Questa opzione esegue l'override di qualsiasi colonna di argomento presente nei dati. |
includeHeaders |
BOOLEAN |
false |
[Opzionale] Specifica se includere le intestazioni di Kafka nella riga. |
Vedere la pagina Opzioni per l'elenco completo delle opzioni disponibili.
Schema per il writer Kafka
Quando si scrivono dati in Kafka, il dataframe fornito può includere i campi seguenti:
| Nome della colonna | Obbligatorio o facoltativo | Tipo |
|---|---|---|
key |
opzionale |
STRING oppure BINARY |
value |
required |
STRING oppure BINARY |
headers |
opzionale | ARRAY |
topic |
facoltativo (ignorato se topic è impostato come opzione di scrittura) |
STRING |
partition |
opzionale | INT |
Autenticazione
Azure Databricks supporta più metodi di autenticazione per Kafka, tra cui le credenziali del servizio Unity Catalog, SASL/SSL e le opzioni specifiche del cloud per AWS MSK, Hub eventi di Azure e Google Cloud Managed Kafka. Vedere Autenticazione.
Ottenere le metriche di Kafka
È possibile monitorare il ritardo di una query di streaming rispetto a Kafka usando le avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest metriche. Questi report segnalano il ritardo medio, massimo e minimo di offset in tutte le partizioni di argomento sottoscritte, rispetto agli offset più recenti in Kafka. Vedere Lettura interattiva delle metriche.
Per stimare la quantità di dati che la query non ha ancora utilizzato, usare la estimatedTotalBytesBehindLatest metrica . Questa metrica stima il numero totale di byte rimanenti in tutte le partizioni sottoscritte in base ai batch elaborati negli ultimi 300 secondi. È possibile modificare l'intervallo di tempo usato per questa stima impostando l'opzione bytesEstimateWindowLength . Ad esempio, per impostarlo su 10 minuti:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Se stai eseguendo il flusso in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard dei progressi delle query di streaming.
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Per ulteriori informazioni, vedere Monitoraggio delle query Structured Streaming su Azure Databricks.
Esempio di codice: da Kafka a Delta
Nell'esempio seguente viene illustrato un flusso di lavoro completo per lo streaming continuo dei dati da Kafka a una tabella Delta. Questo modello è ideale per carichi di lavoro di inserimento dati quasi in tempo reale.
Questo esempio usa uno schema JSON fisso. Per altri formati come Avro o Protobuf, usare from_avro o from_protobuf. È anche possibile eseguire l'integrazione con un registro schemi. Consultare Esempio con Registro Schemi.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);