Autentisering

Azure Databricks Kafka-anslutningsprogrammet stöder flera autentiseringsmetoder för anslutning till Kafka. Den här artikeln beskriver några av de vanligaste autentiseringsmetoderna på Databricks. Den fullständiga listan över autentiseringsmetoder som stöds finns i Kafka-dokumentationen.

Anslut till Azure Event Hubs med en tjänsthuvudman

Azure Databricks stöder autentisering av Spark-jobb med Event Hubs-tjänster. Den här autentiseringen görs via OAuth med Microsoft Entra ID.

AAD-autentiseringsdiagram

Ansluta med autentiseringsuppgifter för Unity Catalog-tjänsten

Sedan versionen av Databricks Runtime 16.1 har Azure Databricks stöd för autentiseringsuppgifter för Unity Catalog-tjänsten för autentisering till Azure Event Hubs. Databricks rekommenderar den här metoden, särskilt när du kör Kafka-strömning på delade kluster eller serverlös beräkning.

Utför följande steg för att använda en autentiseringsuppgift för Unity Catalog-tjänsten:

  • Skapa en ny autentiseringsuppgift för Unity Catalog-tjänsten. Om du inte är bekant med den här processen kan du läsa Skapa autentiseringsuppgifter för tjänsten för instruktioner om hur du skapar en.
    • Kontrollera att åtkomstanslutningsappen som är kopplad till tjänstautentiseringsuppgifterna har de behörigheter som krävs för att ansluta till Azure Event Hubs.
  • Ange namnet på autentiseringsuppgifterna för Unity Catalog-tjänsten som ett källalternativ i Kafka-konfigurationen. Ange alternativet databricks.serviceCredential till namnet på tjänstens autentiseringsuppgifter.

I följande exempel konfigureras Kafka som källa med hjälp av en tjänstautentiseringsuppgift:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Obs! När du använder en autentiseringsuppgift för Unity Catalog-tjänsten för att ansluta till Kafka behövs inte längre följande alternativ:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Ansluta med ett klient-ID och en hemlighet

Azure Databricks stöder Microsoft Entra ID autentisering med ett klient-ID och en hemlighet i följande beräkningsmiljöer:

  • Databricks Runtime 12.2 LTS och senare för beräkningsresurser konfigurerade med dedikerat åtkomstläge (tidigare åtkomstläge för en enskild användare).
  • Databricks Runtime 14.3 LTS och senare på beräkningsresurser konfigurerade i standardåtkomstläge (tidigare delat åtkomstläge).
  • Lakeflow Spark deklarativa Pipelines, konfigurerade utan Unity Catalog.

Azure Databricks stöder inte Microsoft Entra ID autentisering med ett certifikat i någon beräkningsmiljö eller i Lakeflow Spark deklarativa pipelines som konfigurerats med Unity Catalog.

Den här autentiseringen fungerar inte på beräkning i standardåtkomstläge eller på Unity Catalog Lakeflow Spark deklarativa pipelines.

Om du vill utföra autentisering med Microsoft Entra ID måste du ha följande värden:

  • Ett hyresgäst-ID. Du hittar detta på fliken Microsoft Entra ID services.

  • Ett clientID (även kallat program-ID).

  • En klienthemlighet. När du har det här bör du lägga till det som en hemlighet i din Databricks-arbetsyta. Information om hur du lägger till den här hemligheten finns i Hemlighetshantering.

  • Ett EventHubs-ämne. Du hittar en lista med ämnen i avsnittet Event Hubs under avsnittet Entiteter på en specifik sida för Event Hubs-namnområde. Om du vill arbeta med flera ämnen kan du ange IAM-rollen på Event Hubs-nivå.

  • En EventHub-server. Du hittar detta på översiktssidan för ditt specifika Event Hubs-namnområde:

    Event Hubs namnområde

För att kunna använda Entra ID måste vi dessutom be Kafka att använda OAuth SASL-mekanismen (SASL är ett allmänt protokoll och OAuth är en typ av SASL-mekanism):

  • kafka.security.protocol bör vara SASL_SSL
  • kafka.sasl.mechanism bör vara OAUTHBEARER
  • kafka.sasl.login.callback.handler.class bör vara ett fullständigt kvalificerat namn på klassen Java med värdet kafkashaded för inloggningshanteraren för vår skuggade Kafka-klass. Se följande exempel för den exakta klassen.

I följande exempel konfigureras Kafka att ansluta till Azure Event Hubs med hjälp av Microsoft Entra ID autentisering med ett klient-ID och en hemlighet:

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Använd SASL/PLAIN för att autentisera

Konfigurera följande alternativ för att ansluta till Kafka med SASL-/PLAIN-autentisering (användarnamn och lösenord). Använd det skuggade PlainLoginModule klassnamnet:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

Azure Databricks rekommenderar att du lagrar lösenordet som en hemlighet i stället för att inkludera det direkt i koden. Mer information finns i Hemlig hantering.

Använd SASL/SCRAM för att autentisera

Konfigurera följande alternativ för att ansluta till Kafka med SASL/SCRAM (SCRAM-SHA-256 eller SCRAM-SHA-512). Använd det skuggade ScramLoginModule klassnamnet:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

Anmärkning

Ersätt SCRAM-SHA-512 med SCRAM-SHA-256 om ditt Kafka-kluster är konfigurerat att använda SCRAM-SHA-256.

Azure Databricks rekommenderar att du lagrar lösenordet som en hemlighet i stället för att inkludera det direkt i koden. Mer information finns i Hemlig hantering.

Använd SSL för att ansluta Azure Databricks till Kafka

Om du vill aktivera SSL/TLS-anslutningar till Kafka, ställer du in kafka.security.protocol till SSL och anger konfigurationsalternativen för tillitsarkivet och nyckelarkivet med prefixet kafka.. För SSL-anslutningar som endast kräver serverautentisering (enkelriktad TLS) behöver du ett förtroendearkiv. För ömsesidig TLS (mTLS) där Kafka-koordinatorn även autentiserar klienten behöver du både ett förtroendearkiv och ett nyckelarkiv.

Följande SSL/TLS-alternativ är tillgängliga. En fullständig lista över SSL-egenskaper finns i dokumentationen för Apache Kafka SSL-konfiguration och Kryptering och autentisering med SSL i Confluent-dokumentationen.

Option Beskrivning
kafka.security.protocol Ställ in SSL för att aktivera TLS-kryptering.
kafka.ssl.truststore.location Sökväg till förtroendearkivfilen som innehåller betrodda CA-certifikat.
kafka.ssl.truststore.password Lösenord för förtroendelagringsfilen.
kafka.ssl.truststore.type Filformat för betrodd lagring (standard: JKS).
kafka.ssl.keystore.location Sökväg till nyckellagringsfilen som innehåller klientcertifikatet och den privata nyckeln (krävs för mTLS).
kafka.ssl.keystore.password Lösenord för nyckellagringsfilen.
kafka.ssl.key.password Lösenord för den privata nyckeln i nyckelarkivet.
kafka.ssl.endpoint.identification.algorithm Verifieringsalgoritm för värdnamn. Standardinställningen är https. Ange en tom sträng som ska inaktiveras.

Om du använder SSL rekommenderar Databricks att du:

  • Lagra dina certifikat på en Unity Catalog-volym. Användare som har åtkomst till att läsa från volymen kan använda dina Kafka-certifikat. Mer information finns i Vad är Unity Catalog-volymer?.
  • Lagra dina certifikatlösenord som hemligheter i ett hemligt omfång. Mer information finns under Hantera hemliga områden.

I följande exempel används objektlagringsplatser och Databricks-hemligheter för att aktivera en SSL-anslutning:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Ansluta Kafka på HDInsight till Azure Databricks

  1. Skapa ett HDInsight Kafka-kluster.

    Anvisningar finns i Anslut till Kafka på HDInsight via en Azure Virtual Network.

  2. Konfigurera Kafka-koordinatorerna så att de annonserar rätt adress.

    Följ anvisningarna i Konfigurera Kafka för IP-reklam. Om du hanterar Kafka själv på Azure Virtual Machines, kontrollera att brokerkonfigurationen advertised.listeners är inställd på värdarnas interna IP-adress.

  3. Skapa ett Azure Databricks kluster.

  4. Peerkoppla Kafka-klustret till Azure Databricks-klustret.

    Följ anvisningarna i peer-virtuella nätverk.

Använda Databricks-skuggade Kafka-klassnamn

Azure Databricks paketerar egna, skuggade versioner av Kafka-klientbiblioteken. Alla Kafka-klientklassnamn som du refererar till i konfigurationsalternativ för autentisering måste använda det skuggade klassnamnsprefixet i stället för standardnamnet för klassen öppen källkod. Detta gäller för alla klasser som refereras till i alternativ som kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classoch kafka.sasl.client.callback.handler.class.

Om du använder icke-markerade klassnamn resulterar det i ett RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED-fel. Mer information finns i Vanliga frågor och svar .

Hantera potentiella fel

  • Det gick inte att skapa en ny KafkaAdminClient

    Det här interna Kafka-felet utlöses om något av följande autentiseringsalternativ är fel:

    • Klient-ID (även kallat program-ID)
    • Hyresgästens ID
    • Event Hubs-server

    För att lösa felet kontrollerar du att värdena är korrekta för de här alternativen. Dessutom kan det här felet visas om du ändrar de konfigurationsalternativ som anges som standard i exemplet (till exempel kafka.security.protocol).

  • Inga poster hittades

    Om du försöker visa eller bearbeta din DataFrame men inte får resultat visas följande i användargränssnittet.

    Inget resultatmeddelande

    Det här meddelandet innebär att autentiseringen lyckades, men EventHubs returnerade inga data. Några möjliga (men inte på något sätt uttömmande) skäl är:

    • Du hade fel i att ange EventHubs-ämne.
    • Standardalternativet för Kafka-konfiguration för startingOffsets är latest, och du tar inte för närvarande emot några data genom topic. Du kan sätta startingOffsets till earliest för att börja läsa data från Kafkas tidigaste förskjutningar.