このページでは、Azure Databricks上の Kafka コネクタの最も一般的な認証方法を示します。
サポートされている認証方法の完全な一覧については、 Kafka のドキュメントを参照してください。
サービス プリンシパルを使用してAzure Event Hubsに接続する
Azure Databricksでは、Microsoft Entra IDで OAuth を使用した Event Hubs サービスでの Spark ジョブの認証がサポートされます。
Unity カタログ サービスの資格情報を使用して接続する
Databricks Runtime 16.1 以降では、Azure Databricksは、Azure Event Hubsに対する認証のための Unity カタログ サービス資格情報をサポートしています。 共有クラスターまたはサーバーレス コンピューティングで Kafka ストリーミングを実行する場合は、Databricks がこの方法をお勧めします。
認証に Unity カタログ サービスの資格情報を使用するには、次の操作を行います。
- 新しい Unity カタログ サービス資格情報を作成します。
「サービス資格情報の作成」を参照してください。
- サービス資格情報に接続されているアクセス コネクタに、Azure Event Hubsに接続するための適切なアクセス許可があることを確認します。
- ソース オプション
databricks.serviceCredentialをサービス資格情報の名前に設定します。
次の例では、サービス資格情報を使用して Kafka をソースとして構成します。
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
注
Unity カタログ サービスの資格情報を使用して Kafka に接続する場合は、次のオプションを使用しないでください。
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
クライアント ID とシークレットを使用して接続する
Azure Databricksでは、次のコンピューティング環境でクライアント ID とシークレットを使用したMicrosoft Entra ID認証がサポートされています。
- 専用アクセス モードで構成されたコンピューティング上の Databricks Runtime 12.2 LTS 以降。
- 標準アクセス モードに設定されたコンピュートでの Databricks Runtime 14.3 LTS 以降。
- Unity カタログなしで構成された Lakeflow Spark 宣言パイプライン。
Azure Databricksでは、どのコンピューティング環境でも、Unity カタログで構成された Lakeflow Spark 宣言パイプラインでも、証明書を使用したMicrosoft Entra ID認証はサポートされていません。
この認証は、標準アクセス モードのコンピューティングや Unity Catalog Lakeflow Spark 宣言パイプラインでは機能しません。
Microsoft Entra IDで認証を実行するには、次の値が必要です。
テナント ID。 これは、Microsoft Entra ID サービス タブにあります。
clientID (アプリケーション ID とも呼ばれます)。
クライアント シークレット。 これをシークレットとして Databricks ワークスペースに追加します。 「シークレットの管理」を参照してください。
EventHubs トピック。 トピックの一覧は、特定の Event Hubs 名前空間ページの [ エンティティ ] セクションの Event Hubs セクションにあります。 複数のトピックを操作するには、Event Hubs レベルで IAM ロールを設定します。
EventHubs サーバー。 これは、特定の Event Hubs 名前空間の概要ページにあります。
Entra IDを使用するには、OAuth SASL を使用するように Kafka を構成する必要があります。
-
kafka.security.protocolをSASL_SSLに設定します -
kafka.sasl.mechanismをOAUTHBEARERに設定します -
kafka.sasl.login.callback.handler.classをJava クラスの完全修飾名に設定します。 修飾名はkafkashadedであり、Databricks のシェーディング Kafka クラスのログイン コールバック ハンドラーです。 正確なクラスについては、次の例を参照してください。
SASL は汎用認証プロトコルであり、OAuth は SASL メカニズムです。
次の例では、クライアント ID とシークレットを使用したMicrosoft Entra ID認証を使用してAzure Event Hubsに接続するように Kafka を構成します。
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
SASL/PLAIN を使用して認証する
SASL/PLAIN (ユーザー名とパスワード) 認証を使用して Kafka に接続するには、次のオプションを構成します。 シェーディングされた PlainLoginModule クラス名を使用します。
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricksは、パスワードをコードに直接含めるのではなく、シークレットとして保存することをお勧めします。 詳細については、「 Secret management」を参照してください。
SASL/SCRAM を使用して認証する
SASL/SCRAM (SCRAM-SHA-256 または SCRAM-SHA-512) を使用して Kafka に接続するには、次のオプションを構成します。 シェーディングされた ScramLoginModule クラス名を使用します。
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
注
Kafka クラスターが SCRAM-SHA-256 を使用するように構成されている場合は、 SCRAM-SHA-512 を SCRAM-SHA-256 に置き換えます。
Azure Databricksは、パスワードをコードに直接含めるのではなく、シークレットとして保存することをお勧めします。 詳細については、「 Secret management」を参照してください。
Azure Databricksを Kafka に接続するには SSL を使用します
Kafka への SSL/TLS 接続を有効にするには、 kafka.security.protocol を SSL に設定し、トラスト ストアとキー ストアの構成オプションにプレフィックスとして kafka. を付けます。 サーバー認証のみを必要とする SSL 接続 (一方向 TLS) の場合は、信頼ストアを使用する必要があります。 Kafka ブローカーもクライアントを認証する相互 TLS (mTLS) の場合は、トラスト ストアとキー ストアの両方を使用する必要があります。
次の SSL/TLS オプションを使用できます。 SSL プロパティの完全な一覧については、 Apache Kafka SSL 構成のドキュメント と Confluent ドキュメントの SSL による暗号化と認証 を参照してください。
| オプション | 説明 |
|---|---|
kafka.security.protocol |
TLS 暗号化を有効にするには、 SSL に設定します。 |
kafka.ssl.truststore.location |
信頼された CA 証明書を含む信頼ストア ファイルへのパス。 |
kafka.ssl.truststore.password |
信頼ストア ファイルのパスワード。 |
kafka.ssl.truststore.type |
トラストストアファイル形式 (既定値: JKS)。 |
kafka.ssl.keystore.location |
クライアント証明書と秘密キーを含むキー ストア ファイルへのパス (mTLS に必要)。 |
kafka.ssl.keystore.password |
キー ストア ファイルのパスワード。 |
kafka.ssl.key.password |
キー ストア内の秘密キーのパスワード。 |
kafka.ssl.endpoint.identification.algorithm |
ホスト名検証アルゴリズム。 既定値は https です。 空の文字列に設定すると無効になります。 |
SSL を使用する場合、Databricks では次のことをお勧めします。
- 証明書を Unity カタログ ボリュームに格納します。 ボリュームから読み取るアクセス権を持つユーザーは、Kafka 証明書を使用できます。 詳細については、「Unity Catalog ボリュームとは」を参照してください。
- 証明書パスワードをシークレット スコープにシークレットとして格納します。 詳細については、「 シークレット スコープの管理」を参照してください。
次の例では、オブジェクト ストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
HDInsight 上の Kafka を Azure Databricks に接続する
HDInsight Kafka クラスターを作成します。
手順については、「connect to Kafka on HDInsight through an Azure Virtual Network」を参照してください。
正しいアドレスをアドバタイズするように Kafka ブローカーを構成します。
「IP をアドバタイズするように Kafka を構成する」の手順に従います。 Azure 仮想マシンで Kafka を自分で管理する場合は、ブローカーの
advertised.listeners構成がホストの内部 IP に設定されていることを確認します。Azure Databricks クラスターを作成します。
Kafka クラスターを Azure Databricks クラスターにピアリングします。
「仮想ネットワークをピアリングする」の手順に従います。
Databricks のシェーディングされた Kafka クラス名を使用する
Azure Databricksは、Kafka クライアント ライブラリの独自のシェーディング バージョンをバンドルします。 認証構成オプションで参照するすべての Kafka クライアント クラス名は、標準のオープン ソース クラス名ではなく、網かけのクラス名プレフィックスを使用する必要があります。 これは、 kafka.sasl.jaas.config、 kafka.sasl.login.callback.handler.class、 kafka.sasl.client.callback.handler.classなどのオプションで参照されるすべてのクラスに適用されます。
シェーダーなしのクラス名を使用すると、コードによって RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED エラーが発生します。 詳細については、 FAQ を参照してください。
潜在的なエラーの処理
新しい作成に失敗しました
KafkaAdminClientこの内部Kafkaエラーは、以下のいずれかの認証オプションに誤りがある場合にスローされます。
- クライアント ID (アプリケーション ID とも呼ばれます)
- テナント ID
- Event Hubs サーバー
エラーを解決するには、これらのオプションの値が正しいことを確認します。 また、この例で既定で提供されている構成オプション (
kafka.security.protocolなど) を変更すると、このエラーが表示されることがあります。レコードが返されない
DataFrame を表示または処理しようとしても結果が得られない場合は、UI に次の情報が表示されます。
このメッセージは、認証が成功したが、EventHubs がデータを返さなかったことを意味します。 次のような理由が考えられます (ただし、すべてを網羅しているわけではありません)。
- 正しくない EventHubs トピックを指定しました。
-
startingOffsetsの既定の Kafka 構成オプションはlatestであり、現在、このトピックを通じてデータを受信していません。startingOffsetsをearliestに設定すると、Kafka の最も古いオフセットからデータの読み取りを開始できます。