Partilhar via


Autenticação

O conector Azure Databricks Kafka suporta múltiplos métodos de autenticação para ligação ao Kafka. Este artigo aborda alguns dos métodos de autenticação mais comuns nos Databricks. A lista completa de métodos de autenticação suportados pode ser encontrada na documentação Kafka.

Liga-te a Hubs de Eventos do Azure com um principal de serviço

O Azure Databricks suporta a autenticação de trabalhos Spark com serviços Event Hubs. Esta autenticação é feita via OAuth com Microsoft Entra ID.

Diagrama de autenticação do AAD

Ligue-se com credenciais de serviço do Catálogo Unity

Desde o lançamento do Databricks Runtime 16.1, Azure Databricks suporta credenciais de serviço Unity Catalog para autenticação a Hubs de Eventos do Azure. A Databricks recomenda esta abordagem, especialmente ao executar streaming Kafka em clusters partilhados ou computação serverless.

Para usar uma credencial de serviço do Unity Catalog para autenticação, execute as seguintes etapas:

  • Crie uma nova credencial de serviço do Catálogo Unity. Se você não estiver familiarizado com esse processo, consulte Criar credenciais de serviço para obter instruções sobre como criar uma.
    • Certifique-se de que o conector de acesso associado à sua credencial de serviço tem as permissões necessárias para se ligar ao Hubs de Eventos do Azure.
  • Forneça o nome de sua credencial de serviço do Catálogo Unity como uma opção de origem em sua configuração do Kafka. Defina a opção databricks.serviceCredential como o nome da sua credencial de serviço.

O exemplo seguinte configura o Kafka como fonte usando uma credencial de serviço:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Nota: Quando utiliza uma credencial de serviço do Unity Catalog para se ligar ao Kafka, as seguintes opções deixam de ser necessárias:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Liga-te com um ID de cliente e um segredo

O Azure Databricks suporta autenticação Microsoft Entra ID com um ID de cliente e secreto nos seguintes ambientes de computação:

  • Databricks Runtime 12.2 LTS e superior em computação configurada com modo de acesso dedicado (anteriormente modo de acesso de usuário único).
  • Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso padrão (anteriormente modo de acesso compartilhado).
  • Lakeflow Spark Declarative Pipelines configurados sem Unity Catalog.

O Azure Databricks não suporta autenticação Microsoft Entra ID com certificado em qualquer ambiente de computação, nem em Lakeflow Spark Declarative Pipelines configurados com o Unity Catalog.

Essa autenticação não funciona na computação com o modo de acesso padrão ou nos pipelines declarativos do Unity Catalog Lakeflow Spark.

Para realizar autenticação com o Microsoft Entra ID, deve ter os seguintes valores:

  • Um ID de locatário. Pode encontrar isto no separador de serviços Microsoft Entra ID.

  • Um clientID (também conhecido como ID do aplicativo).

  • Um segredo do cliente. Depois de ter isso, você deve adicioná-lo como um segredo ao seu espaço de trabalho Databricks. Para adicionar esse segredo, consulte Gerenciamento secreto.

  • Um tópico do EventHubs. Pode encontrar uma lista de tópicos na secção Hubs de Eventos na secção Entidades numa página específica de Namespace de Hubs de Eventos. Para trabalhar com vários tópicos, você pode definir a função do IAM no nível dos Hubs de Eventos.

  • Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico de Hubs de Eventos:

    Espaço de nomes dos Event Hubs

Além disso, para usar o Entra ID, precisamos informar o Kafka para usar o mecanismo OAuth SASL (SASL é um protocolo genérico, e OAuth é um tipo de mecanismo SASL):

  • kafka.security.protocol deve ser SASL_SSL
  • kafka.sasl.mechanism deve ser OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com o valor de kafkashaded para o handler de retorno de login da nossa classe Kafka sombreada. Veja o exemplo a seguir para a classe exata.

O exemplo seguinte configura o Kafka para se ligar ao Hubs de Eventos do Azure usando autenticação Microsoft Entra ID com um ID de cliente e segredo:

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Use SASL/PLAIN para autenticar

Para se ligar ao Kafka usando autenticação SASL/PLAIN (nome de utilizador e palavra-passe), configure as seguintes opções. Use o nome da classe sombreado PlainLoginModule :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

O Azure Databricks recomenda guardar a sua palavra-passe como segredo em vez de a incluir diretamente no seu código. Para obter mais informações, consulte Gerenciamento secreto.

Use SASL/SCRAM para autenticar

Para se ligar ao Kafka usando SASL/SCRAM (SCRAM-SHA-256 ou SCRAM-SHA-512), configure as seguintes opções. Use o nome da classe sombreado ScramLoginModule :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

Observação

Substitui SCRAM-SHA-512 por SCRAM-SHA-256 se o teu cluster Kafka estiver configurado para usar SCRAM-SHA-256.

O Azure Databricks recomenda guardar a sua palavra-passe como segredo em vez de a incluir diretamente no seu código. Para obter mais informações, consulte Gerenciamento secreto.

usa SSL para ligar Azure Databricks ao Kafka

Para permitir ligações SSL/TLS ao Kafka, defina kafka.security.protocol para SSL e forneça as opções de configuração de armazenamento de confiança e armazenamento de chaves com o prefixo kafka.. Para conexões SSL que requerem apenas autenticação de servidor (TLS unidirecional), é necessário um repositório de confiança. Para TLS mútuo (mTLS), onde o corretor Kafka também autentica o cliente, precisas tanto de um trust store como de um key store.

As seguintes opções SSL/TLS estão disponíveis. Para a lista completa de propriedades SSL, consulte a documentação de configuração SSL do Apache Kafka e Encriptação e Autenticação com SSL na documentação do Confluent.

Option Descrição
kafka.security.protocol Definir para SSL ativar a encriptação TLS.
kafka.ssl.truststore.location Caminho para o ficheiro de armazenamento de confiança que contém certificados CA confiáveis.
kafka.ssl.truststore.password Palavra-passe para o ficheiro do repositório de confiança.
kafka.ssl.truststore.type Formato de ficheiro Trust store (predefinido: JKS).
kafka.ssl.keystore.location Caminho para o ficheiro de armazenamento de chaves que contém o certificado do cliente e a chave privada (necessária para o mTLS).
kafka.ssl.keystore.password Palavra-passe para o ficheiro da loja de chaves.
kafka.ssl.key.password Palavra-passe para a chave privada na loja de chaves.
kafka.ssl.endpoint.identification.algorithm Algoritmo de verificação do nome do anfitrião. O padrão é https. Defina como uma cadeia de caracteres vazia para desativar.

Se usar SSL, o Databricks recomenda que:

  • Guarde os seus certificados num volume do Catálogo Unity. Os utilizadores que têm acesso a ler a partir do volume podem usar os seus certificados Kafka. Para obter mais informações, consulte O que são volumes do Catálogo Unity?.
  • Guarda as passwords dos teus certificados como segredos num âmbito secreto. Para obter mais informações, consulte Gerenciar escopos secretos.

O exemplo a seguir usa locais de armazenamento de objetos e segredos do Databricks para habilitar uma conexão SSL:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Conectar o Kafka no HDInsight ao Azure Databricks

  1. Crie um cluster HDInsight Kafka.

    Consulte Conectar-se ao Kafka no HDInsight por meio de uma Rede Virtual do Azure para obter instruções.

  2. Configure os corretores Kafka para anunciar o endereço correto.

    Siga as instruções em Configurar o Kafka para anunciar o IP. Se gerir o Kafka você próprio no Máquinas Virtuais do Azure, certifique-se de que a configuração advertised.listeners dos brokers está definida para o IP interno dos hosts.

  3. Crie um cluster no Azure Databricks.

  4. Ligue o cluster Kafka ao cluster Azure Databricks.

    Siga as instruções em Redes virtuais entre pares.

Use os nomes de classes do Kafka sombreados do Databricks

Azure Databricks agrupa versões proprietárias e sombreadas das bibliotecas cliente Kafka. Todos os nomes de classes de cliente Kafka a que te referes nas configurações de autenticação devem usar o prefixo de nome de classe sombreado em vez do nome de classe de código aberto padrão. Isto aplica-se a qualquer classe referenciada em opções como kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class, e kafka.sasl.client.callback.handler.class.

Usar nomes de classes sem sombreamento resulta num RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED erro. Consulte o FAQ para mais detalhes.

Tratamento de possíveis erros

  • Não foi possível criar um novo KafkaAdminClient

    Este erro interno de Kafka é lançado se alguma das seguintes opções de autenticação estiver incorreta:

    • ID do cliente (também conhecido como ID do aplicativo)
    • ID do inquilino
    • Servidor Event Hubs

    Para resolver o erro, verifique se os valores estão corretos para essas opções. Além disso, pode ver este erro se modificar as opções de configuração fornecidas por defeito no exemplo (como kafka.security.protocol).

  • Nenhum registo encontrado

    Se você estiver tentando exibir ou processar seu DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.

    Nenhuma mensagem de resultados

    Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Algumas razões possíveis (embora não exaustivas) são:

    • Você especificou o tópico EventHubs errado.
    • A opção de configuração padrão do Kafka para startingOffsets é latest, e você ainda não está recebendo nenhum dado através do tópico. Você pode definir startingOffsets para earliest, para começar a ler dados a partir dos primeiros deslocamentos de Kafka.