Apache Kafka に接続する

このページでは、Azure Databricksで構造化ストリーミング ワークロードを実行するときに、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafka の詳細については、 Apache Kafka のドキュメントを参照してください

Kafka からデータを読み取る

kafka への接続を構成するには、 kafka 形式を使用します。 ストリーミング読み取りの例を次に示します。

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricksでは、次の例のように、Kafka からのバッチ読み取りもサポートされています。

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

増分バッチ読み込みの場合、Databricks では、Trigger.AvailableNow で Kafka を使用することをお勧めします。 AvailableNow: 増分バッチ処理」を参照してください。

Databricks Runtime 13.3 LTS 以降では、Azure Databricksは Kafka データを読み取るための SQL 関数も提供します。 SQL を使用したストリーミングは、Lakeflow Spark 宣言パイプラインまたは Databricks SQL のストリーミング テーブルでのみサポートされます。 テーブル値関数read_kafka参照してください。

Kafka Structured Streaming リーダーを構成する

バッチ クエリとストリーミング クエリの両方で、次のオプションを使用して Kafka ソースのブートストラップ サーバーを設定する必要があります。

Key 価値 説明
kafka.bootstrap.servers host:port のコンマ区切りのリスト Kafka クラスター ブートストラップ サーバー

サブスクリプション トピックを設定するには、次のいずれかのオプションを指定する必要があります。

オプション 価値 説明
subscribe トピックのコンマ区切りの一覧。 購読するトピックの一覧。
subscribePattern Javaの正規表現文字列。 トピックのサブスクライブに使用するパターン。
assign JSON 文字列 {"topicA":[0,1],"topic":[2,4]} 消費する特定の topicPartitions

使用可能な オプション の完全な一覧については、[オプション] ページを参照してください。

Kafka の行スキーマ

Kafka 構造化ストリーミング リーダーは、次のスキーマを持つ行を返します。

コラム タイプ
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

keyvalue は、ByteArrayDeserializer を使用して常にバイト配列として逆シリアル化されます。 DataFrame 操作 ( cast("string")from_avroなど) を使用して、キーと値を明示的に逆シリアル化します。

Kafka にデータを書き込む

Kafka へのストリーミング書き込みの例を次に示します。

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricksでは、次の例に示すように、Kafka データ シンクへのバッチ書き込みセマンティクスもサポートされています。

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Kafka 構造化ストリーミング ライターを構成する

Important

Databricks Runtime 13.3 LTS 以降には、既定でべき等書き込みを有効にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクがバージョン 2.8.0 以下を使用しており、ACL が構成されているものの IDEMPOTENT_WRITE が有効になっていない場合、書き込みは失敗し、エラー メッセージ org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state が表示されます。

このエラーを解決するには、Kafka バージョン 2.8.0 以上にアップグレードするか、Structured Streaming ライターを構成するときに .option(“kafka.enable.idempotence”, “false”) を設定します。

Kafka への書き込みの一般的なオプションを次に示します。

Key 価値 既定値 説明
kafka.boostrap.servers <host:port> のコンマ区切りリスト なし Required. Kafka bootstrap.servers 構成。
topic STRING 設定しない オプション。 書き込むすべての行のトピックを設定します。 このオプションは、データに存在するすべてのトピック列をオーバーライドします。
includeHeaders BOOLEAN false オプション。 行に Kafka ヘッダーを含めるかどうか。

使用可能な オプション の完全な一覧については、[オプション] ページを参照してください。

Kafka ライター用スキーマ

Kafka にデータを書き込む場合、指定された DataFrame には次のフィールドが含まれる場合があります。

列名 必須またはオプション タイプ
key 任意 STRING または BINARY
value required STRING または BINARY
headers 任意 ARRAY
topic 省略可能 ( topic がライター オプションとして設定されている場合は無視されます) STRING
partition 任意 INT

認証

Azure Databricksでは、Unity カタログ サービスの資格情報、SASL/SSL、AWS MSK、Azure Event Hubs、Google Cloud Managed Kafka のクラウド固有のオプションなど、Kafka の複数の認証方法がサポートされています。 認証に関するページを参照してください。

Kafka メトリックを取得する

ストリーミング クエリの Kafka の遅延を監視するには、 avgOffsetsBehindLatestmaxOffsetsBehindLatest、および minOffsetsBehindLatest メトリックを使用します。 これらのメトリックは、Kafka の最新のオフセットを基準にして、サブスクライブされているすべてのトピック パーティションの平均、最大、最小オフセットラグを報告します。 「対話形式によるメトリックの読み取り」を参照してください。

Databricks Runtime 17.1 以降では、マイクロバッチが完了するたびに最新の Kafka オフセットがフェッチされます。 データを継続的に受信するトピックでは、バックログ メトリックに小さい永続的な 0 以外の値が表示される場合があります。 これは予期される動作であり、ストリームが遅れていることを示すものではありません。

Databricks Runtime 17.0 以降では、最新の Kafka オフセットはマイクロバッチの開始時刻にフェッチされます。 バックログ メトリックは、ストリーミング クエリがマイクロバッチの開始時に使用可能なすべてのレコードを一貫して使用する場合に、 0 を返す場合があります。

読み取るクエリの残りのデータを推定するには、 estimatedTotalBytesBehindLatest メトリックを使用します。 このメトリックは、過去 300 秒間に処理されたバッチに基づいて、サブスクライブされているすべてのパーティションに残っている合計バイト数を見積もります。 この見積もりに使用する時間枠は、 bytesEstimateWindowLength オプションを設定することで変更できます。

たとえば、ウィンドウの長さを 10 分に設定するには、次のようにします。

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

ノートブックでストリームを実行している場合、これらのメトリックは、ストリーミング クエリの進行状況ダッシュボードの [生データ] タブに表示されます。

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

詳細については、 Azure Databricksを参照してください。

Kafka から Delta Lake への例

次の例は、Kafka から Delta Lake テーブルにデータを継続的にストリーミングするための完全なワークフローを示しています。 このアプローチは、ほぼリアルタイムのデータ インジェスト ワークロードに使用できます。

この例では、固定 JSON スキーマを使用します。 Avro や Protobuf などの他の形式の場合は、 from_avro または from_protobufを使用します。 スキーマ レジストリと統合することもできます。 スキーマ レジストリの例を参照してください。

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);