Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
De Azure Databricks Kafka-connector ondersteunt meerdere verificatiemethoden voor het maken van verbinding met Kafka. In dit artikel worden enkele van de meest voorkomende verificatiemethoden voor Databricks beschreven. De volledige lijst met ondersteunde verificatiemethoden vindt u in de Kafka-documentatie.
Maak verbinding met Azure Event Hubs met een service-principal
Azure Databricks ondersteunt de verificatie van Spark-taken met Event Hubs-services. Deze verificatie wordt uitgevoerd via OAuth met Microsoft Entra ID.
Verbinding maken met de Unity Catalog-service met behulp van service-referenties
Sinds de release van Databricks Runtime 16.1 ondersteunt Azure Databricks servicereferenties voor Unity Catalog voor verificatie bij Azure Event Hubs. Databricks raadt deze aanpak aan, met name bij het uitvoeren van Kafka-streaming op gedeelde clusters of serverloze compute.
Voer de volgende stappen uit om een Unity Catalog-servicereferentie te gebruiken voor verificatie:
- Maak een nieuwe Unity Catalog-servicereferentie. Als u niet bekend bent met dit proces, raadpleeg dan Servicereferenties maken voor instructies over het aanmaken van servicereferenties.
- Zorg ervoor dat de toegangsconnector die is gekoppeld aan uw servicereferentie, over de benodigde machtigingen beschikt om verbinding te maken met Azure Event Hubs.
- Geef de naam van uw Unity Catalog-servicereferentie op als bronoptie in uw Kafka-configuratie. Stel de optie
databricks.serviceCredentialin op de naam van uw servicereferentie.
In het volgende voorbeeld wordt Kafka als bron geconfigureerd met behulp van een servicereferentie:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Opmerking: wanneer u een Unity Catalog-servicereferentie gebruikt om verbinding te maken met Kafka, zijn de volgende opties niet meer nodig:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Verbinding maken met een client-id en -geheim
Azure Databricks ondersteunt Microsoft Entra ID verificatie met een client-id en geheim in de volgende rekenomgevingen:
- Databricks Runtime 12.2 LTS en hoger op compute dat is geconfigureerd met de exclusieve toegangsmodus (voorheen de modus voor toegang van één gebruiker).
- Databricks Runtime 14.3 LTS en hoger op systemen die zijn geconfigureerd met standaard toegangsmodus (voorheen gedeelde toegangsmodus).
- Lakeflow Spark-declaratieve pijplijnen die zijn geconfigureerd zonder Unity Catalog.
Azure Databricks biedt geen ondersteuning voor Microsoft Entra ID verificatie met een certificaat in een rekenomgeving of in Lakeflow Spark-declaratieve pijplijnen die zijn geconfigureerd met Unity Catalog.
Deze verificatie werkt niet op berekeningen met de standaardtoegangsmodus of op Unity Catalog Lakeflow Spark-declaratieve pijplijnen.
Als u verificatie wilt uitvoeren met Microsoft Entra ID, moet u de volgende waarden hebben:
Een tenant-id. U vindt dit op het tabblad Microsoft Entra ID services.
Een clientID (ook wel toepassings-id genoemd).
Een clientgeheim. Zodra u dit hebt, moet u het toevoegen als een geheim aan uw Databricks-werkruimte. Zie Geheimbeheer om dit geheim toe te voegen.
Een EventHubs-onderwerp. U vindt een lijst met onderwerpen in de sectie Event Hubs onder de sectie Entiteiten op een specifieke Event Hubs-naamruimte pagina. Als u met meerdere onderwerpen wilt werken, kunt u de IAM-rol instellen op Event Hubs-niveau.
Een EventHubs-server. U vindt dit op de overzichtspagina van uw specifieke Event Hubs-naamruimte:
Als u Entra ID wilt gebruiken, moeten we kafka vertellen het OAuth SASL-mechanisme te gebruiken (SASL is een algemeen protocol en OAuth is een type SASL-mechanisme):
-
kafka.security.protocolmoet zijnSASL_SSL -
kafka.sasl.mechanismmoet zijnOAUTHBEARER -
kafka.sasl.login.callback.handler.classmoet een volledig gekwalificeerde naam van de Java klasse zijn met de waardekafkashadedvoor de callback-handler voor aanmelding van onze gearceerde Kafka-klasse. Zie het volgende voorbeeld voor de exacte klasse.
In het volgende voorbeeld wordt Kafka geconfigureerd om verbinding te maken met Azure Event Hubs met behulp van Microsoft Entra ID verificatie met een client-id en -geheim:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
SASL/PLAIN gebruiken om te verifiëren
Als u verbinding wilt maken met Kafka met behulp van SASL-/PLAIN-verificatie (gebruikersnaam en wachtwoord), configureert u de volgende opties. Gebruik de gearceerde PlainLoginModule klassenaam:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks raadt u aan uw wachtwoord op te slaan als een geheim in plaats van het rechtstreeks in uw code op te slaan. Zie Geheimbeheer voor meer informatie.
SASL/SCRAM gebruiken om te verifiëren
Configureer de volgende opties om verbinding te maken met Kafka via SASL/SCRAM (SCRAM-SHA-256 of SCRAM-SHA-512). Gebruik de gearceerde ScramLoginModule klassenaam:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Opmerking
Vervang SCRAM-SHA-512 door SCRAM-SHA-256 als uw Kafka-cluster is geconfigureerd voor het gebruik van SCRAM-SHA-256.
Azure Databricks raadt u aan uw wachtwoord op te slaan als een geheim in plaats van het rechtstreeks in uw code op te slaan. Zie Geheimbeheer voor meer informatie.
Ssl gebruiken om Azure Databricks te verbinden met Kafka
Als u SSL/TLS-verbindingen met Kafka wilt inschakelen, stelt u deze in kafka.security.protocolSSL en geeft u de configuratieopties voor het vertrouwensarchief en het sleutelarchief op die voorafgegaan zijn door kafka.. Voor SSL-verbindingen waarvoor alleen serververificatie (eenrichtings-TLS) is vereist, hebt u een vertrouwensarchief nodig. Voor wederzijdse TLS (mTLS) waarbij de Kafka-broker ook de client verifieert, hebt u zowel een vertrouwensarchief als een sleutelarchief nodig.
De volgende SSL/TLS-opties zijn beschikbaar. Voor de volledige lijst met SSL-eigenschappen raadpleegt u de documentatie over de Apache Kafka SSL-configuratie en versleuteling en verificatie met SSL in de Confluent-documentatie.
| Option | Beschrijving |
|---|---|
kafka.security.protocol |
Instellen op SSL om TLS-versleuteling in te schakelen. |
kafka.ssl.truststore.location |
Pad naar het vertrouwensarchiefbestand met vertrouwde CA-certificaten. |
kafka.ssl.truststore.password |
Wachtwoord voor het vertrouwensarchiefbestand. |
kafka.ssl.truststore.type |
Trust Store-bestandsindeling (standaard: JKS). |
kafka.ssl.keystore.location |
Pad naar het sleutelarchiefbestand met het clientcertificaat en de persoonlijke sleutel (vereist voor mTLS). |
kafka.ssl.keystore.password |
Wachtwoord voor het sleutelarchiefbestand. |
kafka.ssl.key.password |
Wachtwoord voor de privésleutel in de sleutelopslag. |
kafka.ssl.endpoint.identification.algorithm |
Algoritme voor hostnaamverificatie. Wordt standaard ingesteld op https. Ingesteld op een lege tekenreeks om uit te schakelen. |
Als u SSL gebruikt, raadt Databricks u het volgende aan:
- Sla uw certificaten op in een Unity Catalog-volume. Gebruikers die toegang hebben om vanaf het volume te lezen, kunnen uw Kafka-certificaten gebruiken. Zie Wat zijn Unity Catalog-volumes? voor meer informatie.
- Sla uw certificaatwachtwoorden op als geheimen in een geheim bereik. Voor meer informatie, zie Geheime bereiken beheren.
In het volgende voorbeeld worden objectopslaglocaties en Databricks-geheimen gebruikt om een SSL-verbinding in te schakelen:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Kafka in HDInsight verbinden met Azure Databricks
Maak een HDInsight Kafka-cluster.
Zie Connect to Kafka on HDInsight via een Azure Virtual Network voor instructies.
Configureer de Kafka-brokers om het juiste adres te adverteren.
Volg de instructies in Kafka configureren voor IP-reclame. Als u Kafka zelf beheert op Azure Virtual Machines, moet u ervoor zorgen dat de
advertised.listenersconfiguratie van de brokers is ingesteld op het interne IP-adres van de hosts.Maak een Azure Databricks-cluster.
Koppel het Kafka-cluster aan het Azure Databricks-cluster.
Volg de instructies in virtuele peernetwerken.
Gearceerde Kafka-klassenamen van Databricks gebruiken
Azure Databricks bundelt eigen, gearceerde versies van de Kafka-clientbibliotheken. Alle Kafka-clientklassenamen waarnaar u in de configuratieopties voor verificatie verwijst, moeten het gearceerde klassenaamvoorvoegsel gebruiken in plaats van de standaard opensource-klassenaam. Dit geldt voor elke klasse waarnaar wordt verwezen in opties zoals kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classen kafka.sasl.client.callback.handler.class.
Het gebruik van niet-geschaduwde klassenamen resulteert in een RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED fout. Zie de veelgestelde vragen voor meer informatie.
Mogelijke fouten afhandelen
Kan geen nieuwe maken
KafkaAdminClientDeze interne Kafka-fout wordt gegenereerd als een van de volgende verificatieopties onjuist is:
- Client-id (ook wel toepassings-id genoemd)
- Tenant-id
- Event Hubs-server
Als u de fout wilt oplossen, controleert u of de waarden juist zijn voor deze opties. Bovendien ziet u deze fout mogelijk als u de standaardconfiguratieopties in het voorbeeld wijzigt (zoals
kafka.security.protocol).Er zijn geen records geretourneerd
Als u uw DataFrame probeert weer te geven of te verwerken, maar geen resultaten krijgt, ziet u het volgende in de gebruikersinterface.
Dit bericht betekent dat de verificatie is geslaagd, maar EventHubs geen gegevens heeft geretourneerd. Een aantal mogelijke (hoewel niet volledig) redenen zijn:
- U hebt het verkeerde EventHubs-onderwerp opgegeven.
- De standaard Kafka-configuratieoptie voor
startingOffsetsislatest, en u ontvangt momenteel nog geen gegevens via de topic. U kuntstartingOffsetsinstellen opearliestom gegevens te lezen vanaf de vroegste offsets van Kafka.