オプション

このページでは、Azure Databricks で Structured Streaming を使用して Apache Kafka を読み取り、書き込むための構成オプションについて説明します。

Azure Databricks Kafka コネクタは、Apache Spark Kafka コネクタの上に構築され、すべての標準的な Kafka 構成オプションをサポートします。 kafka.プレフィックスが付いたオプションは、基になる Kafka クライアントに直接渡されます。 たとえば、 .option("kafka.max.poll.records", "500") は Kafka コンシューマーの max.poll.records プロパティを設定します。 使用可能な Kafka プロパティの完全な一覧については、Kafka 構成ドキュメントを参照してください。

Structured Streaming のソースとシンクのオプションの完全な一覧については、 Kafka構造化ストリーミング + Kafka 統合ガイドを参照してください。

必須オプション

必要なオプションの詳細については、 Kafka を参照してください。

読み取りと書き込みの両方に次のオプションが必要です。

Key 説明
kafka.bootstrap.servers Kafka ブローカーの host:port アドレスのコンマ区切りのリスト。 Kafka クライアントの bootstrap.servers プロパティを設定します。
Kafka からのデータがない場合は、このブローカーのアドレス一覧で正しくないアドレスを確認してください。 ブローカーのアドレス一覧が正しくない場合は、エラーがない可能性があります。 Kafka クライアントは、ブローカーが最終的に使用可能になると想定し、ネットワーク エラーを受信したときに永久に再試行します。

Kafka 読み取りの場合は、使用するトピックを特定するために、次のいずれかのオプションを正確に指定する必要もあります。

  • subscribe
  • subscribePattern
  • assign

Kafka に書き込むときに、必要に応じて topic オプションを設定して、すべての行の宛先トピックを指定できます。 設定しない場合、DataFrame には topic 列を含める必要があります。

一般的なリーダー オプション

Kafka から読み取るときに、次のオプションが一般的に使用されます。

Key 説明
minPartitions Kafka から読み取るパーティションの最小数。
maxRecordsPerPartition Spark パーティションあたりのレコードの最大数。
failOnDataLoss データが失われた可能性がある場合にクエリを失敗させるかどうか。
maxOffsetsPerTrigger トリガー間隔ごとに処理されるオフセットの最大数。
startingOffsets クエリが読み取りを開始するオフセット。
endingOffsets バッチ クエリの読み取りを停止する場所。
groupIdPrefix 自動生成されたコンシューマー グループ ID のカスタマイズされたプレフィックス。
kafka.group.id Kafka からの読み取り中に使用するグループ ID。
これは予期しない動作を引き起こす可能性があるため、注意して使用してください。 既定では、各クエリによって、データを読み取るための一意のグループ ID が生成されます。 これにより、各クエリに、他のコンシューマーからの干渉を回避する独自のコンシューマー グループが確実に含まれるようにし、各クエリがサブスクライブしたトピックのすべてのパーティションを読み取れるようにします。 Kafka グループベースの承認など、一部のシナリオでは、特定の承認済みグループ ID を使用してデータを読み取ることができます。
同じグループ ID を持つクエリは、相互に干渉し、部分的なデータのみを読み取る可能性があります。 コンカレント バッチとストリーミング ワークロードを実行するとき、またはクエリを連続して開始して再起動すると、干渉が発生する可能性があります。
問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms を非常に小さく設定します。
includeHeaders 出力に Kafka メッセージ ヘッダーを含めるかどうか。
bytesEstimateWindowLength estimatedTotalBytesBehindLatest メトリックを使用して残りのバイトを見積もるために使用される時間枠。

Structured Streaming のソースとシンクのオプションの完全な一覧については、 Kafka構造化ストリーミング + Kafka 統合ガイドを参照してください。

一般的なライター オプション

Kafka への書き込み時には、次のオプションが一般的に使用されます。

Key 説明
topic すべての行のトピックを設定します。 これは、データ内の任意の topic 列よりも優先されます。
includeHeaders 行に Kafka ヘッダーを含めるかどうか。

Important

Databricks Runtime 13.3 LTS 以降には、既定でべき等書き込みを有効にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクで ACL が構成されたバージョン 2.8.0 以下が使用されているが、 IDEMPOTENT_WRITE が有効になっていない場合、書き込みは失敗します。 これを解決するには、Kafka 2.8.0 以降にアップグレードするか、 .option("kafka.enable.idempotence", "false")設定します。

Structured Streaming のソースとシンクのオプションの完全な一覧については、 Kafka構造化ストリーミング + Kafka 統合ガイドを参照してください。

認証オプション

Azure Databricks では、Unity カタログ サービスの資格情報、SASL/SSL、AWS MSK、Azure Event Hubs、Google Cloud Managed Kafka のクラウド固有のオプションなど、Kafka の複数の認証方法がサポートされています。

Azure Databricksでは、クラウドで管理される Kafka サービスへの認証に Unity カタログ サービスの資格情報を使用することをお勧めします。

オプション 説明
databricks.serviceCredential クラウドで管理される Kafka サービス (AWS MSK、Azure Event Hubs、または Google Cloud Managed Kafka) に対して認証するための Unity カタログ サービス資格情報 の名前。 Databricks Runtime 16.1 以降で使用できます。
databricks.serviceCredential.scope サービス資格情報の OAuth スコープ。 これは、Azure Databricks が Kafka サービスのスコープを自動的に推論できない場合にのみ設定します。

Unity カタログ サービスの資格情報を使用する場合、 kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolなどの SASL/SSL オプションを指定する必要はありません。

SASL/SSL の一般的なオプションは次のとおりです。

オプション 説明
kafka.security.protocol ブローカーとの通信に使用されるプロトコル ( SASL_SSLSSLPLAINTEXTなど)。
kafka.sasl.mechanism SASL メカニズム (たとえば、 PLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARERAWS_MSK_IAM)。
kafka.sasl.jaas.config JAAS ログイン構成文字列。
kafka.sasl.login.callback.handler.class SASL 認証のログイン コールバック ハンドラーの完全修飾クラス名。
kafka.sasl.client.callback.handler.class SASL 認証用のクライアント コールバック ハンドラーの完全修飾クラス名。
kafka.ssl.truststore.location SSL 信頼ストア ファイルの場所。
kafka.ssl.truststore.password SSL 信頼ストア ファイルのパスワード。
kafka.ssl.keystore.location SSL キー ストア ファイルの場所。
kafka.ssl.keystore.password SSL キー ストア ファイルのパスワード。

認証のセットアップ手順の詳細については、「 認証」を参照してください。

その他のリソース