Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Azure Databricks Kafka-anslutningsprogrammet stöder flera autentiseringsmetoder för anslutning till Kafka. Den här artikeln beskriver några av de vanligaste autentiseringsmetoderna på Databricks. Den fullständiga listan över autentiseringsmetoder som stöds finns i Kafka-dokumentationen.
Anslut till Azure Event Hubs med en tjänsthuvudman
Azure Databricks stöder autentisering av Spark-jobb med Event Hubs-tjänster. Den här autentiseringen görs via OAuth med Microsoft Entra ID.
Ansluta med autentiseringsuppgifter för Unity Catalog-tjänsten
Sedan versionen av Databricks Runtime 16.1 har Azure Databricks stöd för autentiseringsuppgifter för Unity Catalog-tjänsten för autentisering till Azure Event Hubs. Databricks rekommenderar den här metoden, särskilt när du kör Kafka-strömning på delade kluster eller serverlös beräkning.
Utför följande steg för att använda en autentiseringsuppgift för Unity Catalog-tjänsten:
- Skapa en ny autentiseringsuppgift för Unity Catalog-tjänsten. Om du inte är bekant med den här processen kan du läsa Skapa autentiseringsuppgifter för tjänsten för instruktioner om hur du skapar en.
- Kontrollera att åtkomstanslutningsappen som är kopplad till tjänstautentiseringsuppgifterna har de behörigheter som krävs för att ansluta till Azure Event Hubs.
- Ange namnet på autentiseringsuppgifterna för Unity Catalog-tjänsten som ett källalternativ i Kafka-konfigurationen. Ange alternativet
databricks.serviceCredentialtill namnet på tjänstens autentiseringsuppgifter.
I följande exempel konfigureras Kafka som källa med hjälp av en tjänstautentiseringsuppgift:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Obs! När du använder en autentiseringsuppgift för Unity Catalog-tjänsten för att ansluta till Kafka behövs inte längre följande alternativ:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
Ansluta med ett klient-ID och en hemlighet
Azure Databricks stöder Microsoft Entra ID autentisering med ett klient-ID och en hemlighet i följande beräkningsmiljöer:
- Databricks Runtime 12.2 LTS och senare för beräkningsresurser konfigurerade med dedikerat åtkomstläge (tidigare åtkomstläge för en enskild användare).
- Databricks Runtime 14.3 LTS och senare på beräkningsresurser konfigurerade i standardåtkomstläge (tidigare delat åtkomstläge).
- Lakeflow Spark deklarativa Pipelines, konfigurerade utan Unity Catalog.
Azure Databricks stöder inte Microsoft Entra ID autentisering med ett certifikat i någon beräkningsmiljö eller i Lakeflow Spark deklarativa pipelines som konfigurerats med Unity Catalog.
Den här autentiseringen fungerar inte på beräkning i standardåtkomstläge eller på Unity Catalog Lakeflow Spark deklarativa pipelines.
Om du vill utföra autentisering med Microsoft Entra ID måste du ha följande värden:
Ett hyresgäst-ID. Du hittar detta på fliken Microsoft Entra ID services.
Ett clientID (även kallat program-ID).
En klienthemlighet. När du har det här bör du lägga till det som en hemlighet i din Databricks-arbetsyta. Information om hur du lägger till den här hemligheten finns i Hemlighetshantering.
Ett EventHubs-ämne. Du hittar en lista med ämnen i avsnittet Event Hubs under avsnittet Entiteter på en specifik sida för Event Hubs-namnområde. Om du vill arbeta med flera ämnen kan du ange IAM-rollen på Event Hubs-nivå.
En EventHub-server. Du hittar detta på översiktssidan för ditt specifika Event Hubs-namnområde:
För att kunna använda Entra ID måste vi dessutom be Kafka att använda OAuth SASL-mekanismen (SASL är ett allmänt protokoll och OAuth är en typ av SASL-mekanism):
-
kafka.security.protocolbör varaSASL_SSL -
kafka.sasl.mechanismbör varaOAUTHBEARER -
kafka.sasl.login.callback.handler.classbör vara ett fullständigt kvalificerat namn på klassen Java med värdetkafkashadedför inloggningshanteraren för vår skuggade Kafka-klass. Se följande exempel för den exakta klassen.
I följande exempel konfigureras Kafka att ansluta till Azure Event Hubs med hjälp av Microsoft Entra ID autentisering med ett klient-ID och en hemlighet:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
Använd SASL/PLAIN för att autentisera
Konfigurera följande alternativ för att ansluta till Kafka med SASL-/PLAIN-autentisering (användarnamn och lösenord). Använd det skuggade PlainLoginModule klassnamnet:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks rekommenderar att du lagrar lösenordet som en hemlighet i stället för att inkludera det direkt i koden. Mer information finns i Hemlig hantering.
Använd SASL/SCRAM för att autentisera
Konfigurera följande alternativ för att ansluta till Kafka med SASL/SCRAM (SCRAM-SHA-256 eller SCRAM-SHA-512). Använd det skuggade ScramLoginModule klassnamnet:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Anmärkning
Ersätt SCRAM-SHA-512 med SCRAM-SHA-256 om ditt Kafka-kluster är konfigurerat att använda SCRAM-SHA-256.
Azure Databricks rekommenderar att du lagrar lösenordet som en hemlighet i stället för att inkludera det direkt i koden. Mer information finns i Hemlig hantering.
Använd SSL för att ansluta Azure Databricks till Kafka
Om du vill aktivera SSL/TLS-anslutningar till Kafka, ställer du in kafka.security.protocol till SSL och anger konfigurationsalternativen för tillitsarkivet och nyckelarkivet med prefixet kafka.. För SSL-anslutningar som endast kräver serverautentisering (enkelriktad TLS) behöver du ett förtroendearkiv. För ömsesidig TLS (mTLS) där Kafka-koordinatorn även autentiserar klienten behöver du både ett förtroendearkiv och ett nyckelarkiv.
Följande SSL/TLS-alternativ är tillgängliga. En fullständig lista över SSL-egenskaper finns i dokumentationen för Apache Kafka SSL-konfiguration och Kryptering och autentisering med SSL i Confluent-dokumentationen.
| Option | Beskrivning |
|---|---|
kafka.security.protocol |
Ställ in SSL för att aktivera TLS-kryptering. |
kafka.ssl.truststore.location |
Sökväg till förtroendearkivfilen som innehåller betrodda CA-certifikat. |
kafka.ssl.truststore.password |
Lösenord för förtroendelagringsfilen. |
kafka.ssl.truststore.type |
Filformat för betrodd lagring (standard: JKS). |
kafka.ssl.keystore.location |
Sökväg till nyckellagringsfilen som innehåller klientcertifikatet och den privata nyckeln (krävs för mTLS). |
kafka.ssl.keystore.password |
Lösenord för nyckellagringsfilen. |
kafka.ssl.key.password |
Lösenord för den privata nyckeln i nyckelarkivet. |
kafka.ssl.endpoint.identification.algorithm |
Verifieringsalgoritm för värdnamn. Standardinställningen är https. Ange en tom sträng som ska inaktiveras. |
Om du använder SSL rekommenderar Databricks att du:
- Lagra dina certifikat på en Unity Catalog-volym. Användare som har åtkomst till att läsa från volymen kan använda dina Kafka-certifikat. Mer information finns i Vad är Unity Catalog-volymer?.
- Lagra dina certifikatlösenord som hemligheter i ett hemligt omfång. Mer information finns under Hantera hemliga områden.
I följande exempel används objektlagringsplatser och Databricks-hemligheter för att aktivera en SSL-anslutning:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Ansluta Kafka på HDInsight till Azure Databricks
Skapa ett HDInsight Kafka-kluster.
Anvisningar finns i Anslut till Kafka på HDInsight via en Azure Virtual Network.
Konfigurera Kafka-koordinatorerna så att de annonserar rätt adress.
Följ anvisningarna i Konfigurera Kafka för IP-reklam. Om du hanterar Kafka själv på Azure Virtual Machines, kontrollera att brokerkonfigurationen
advertised.listenersär inställd på värdarnas interna IP-adress.Skapa ett Azure Databricks kluster.
Peerkoppla Kafka-klustret till Azure Databricks-klustret.
Följ anvisningarna i peer-virtuella nätverk.
Använda Databricks-skuggade Kafka-klassnamn
Azure Databricks paketerar egna, skuggade versioner av Kafka-klientbiblioteken. Alla Kafka-klientklassnamn som du refererar till i konfigurationsalternativ för autentisering måste använda det skuggade klassnamnsprefixet i stället för standardnamnet för klassen öppen källkod. Detta gäller för alla klasser som refereras till i alternativ som kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classoch kafka.sasl.client.callback.handler.class.
Om du använder icke-markerade klassnamn resulterar det i ett RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED-fel. Mer information finns i Vanliga frågor och svar .
Hantera potentiella fel
Det gick inte att skapa en ny
KafkaAdminClientDet här interna Kafka-felet utlöses om något av följande autentiseringsalternativ är fel:
- Klient-ID (även kallat program-ID)
- Hyresgästens ID
- Event Hubs-server
För att lösa felet kontrollerar du att värdena är korrekta för de här alternativen. Dessutom kan det här felet visas om du ändrar de konfigurationsalternativ som anges som standard i exemplet (till exempel
kafka.security.protocol).Inga poster hittades
Om du försöker visa eller bearbeta din DataFrame men inte får resultat visas följande i användargränssnittet.
Det här meddelandet innebär att autentiseringen lyckades, men EventHubs returnerade inga data. Några möjliga (men inte på något sätt uttömmande) skäl är:
- Du hade fel i att ange EventHubs-ämne.
- Standardalternativet för Kafka-konfiguration för
startingOffsetsärlatest, och du tar inte för närvarande emot några data genom topic. Du kan sättastartingOffsetstillearliestför att börja läsa data från Kafkas tidigaste förskjutningar.