Apache Pulsar からのストリーム

重要

この機能はパブリック プレビュー段階にあります。

Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Azure Databricks 上の Apache Pulsar からデータをストリーミングできます。

構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して 1 回だけ処理のセマンティクスを提供します。

構文の例

次に、構造化ストリーミングを使用して Pulsar から読み取る基本的な例を示します。

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Pulsar トピックから読むには、 service.url と次のいずれかのオプションを指定する必要があります。

  • topic
  • topics
  • topicsPattern

オプションの完全な一覧については、「Pulsar ストリーミング読み取りのオプションを構成する」を参照してください。

Pulsar に対する認証

Azure Databricks では、Pulsar に対する認証として、トラストストアとキーストアがサポートされています。 Databricks では、シークレットを使用して構成の詳細を格納することをお勧めします。

使用可能なストリーム構成オプションは次のとおりです。

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

ストリームで PulsarAdminを使用する場合は、次のオプションを設定する必要があります。

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

次の例では、認証オプションの構成を示します。

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar スキーマ

Pulsar から読み取る場合、行のスキーマはソースのトピックのスキーマによって異なります。

  • Avro スキーマまたは JSON スキーマを含むトピックの場合、フィールド名とフィールド型は結果の Spark DataFrame に保持されます。
  • スキーマのないトピック、または Pulsar の単純なデータ型を使用するトピックの場合、ペイロードは value 列に読み込まれます。
  • スキーマが異なる複数のトピックを読み取るストリームを構成する場合は、生のコンテンツをallowDifferentTopicSchemas列に読み込むvalueを設定します。

Pulsar レコードには、次のメタデータ フィールドがあります。

タイプ
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Pulsar ストリーミング読み取り用オプションの設定

読み取りストリームの .option("<optionName>", "<optionValue>") 構文を使用して、次のすべてのオプションを構成します。 .options()を使用して認証を構成することもできます。 「Pulsar に対する認証」を参照してください。

次の表で、Pulsar に必要な構成について説明します。 topictopicstopicsPattern のいずれかのオプションのみを指定する必要があります。

オプション 既定値 説明
service.url なし Pulsar サービスの Pulsar serviceUrl 構成。
topic なし コンシュームするトピックのトピック名を表す文字列。
topics なし 受信するトピックのカンマ区切りのリスト。
topicsPattern なし 使用するトピックに一致する Java 正規表現文字列。

次の表で、Pulsar でサポートされているその他のオプションについて説明します。

オプション 既定値 説明
predefinedSubscription なし Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される、定義済みのサブスクリプション名。
subscriptionPrefix なし Spark アプリケーションの進行状況を追跡するランダムなサブスクリプションを生成するために、コネクタによって使用されるプレフィックス。
pollTimeoutMs 120000 Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。
waitingForNonExistedTopic false コネクタは目的のトピックが作成されるまで待機する必要があるかどうか。
failOnDataLoss true データが失われたとき (たとえば、トピックが削除された場合や、アイテム保持ポリシーのためにメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。
allowDifferentTopicSchemas false スキーマが異なる複数のトピックを読み取る場合は、このオプションを使用して、スキーマベースのトピック値の自動逆シリアル化を無効にします。 これが true の場合は、生の値のみが返されます。
startingOffsets latest latest の場合、リーダーは実行を開始した後に最新のレコードを読み取ります。 earliest の場合、リーダーは最も早いオフセットから読み取ります。 特定のオフセットの JSON 文字列を指定することもできます。
maxBytesPerTrigger なし マイクロバッチごとに処理する最大バイト数のソフト制限。 このオプションを指定する場合は、 admin.urlも指定する必要があります。
admin.url なし PulsarserviceHttpUrl の構成。 maxBytesPerTriggerが指定されている場合に必要です。

次のパターンを使用して、Pulsar クライアント、管理者、リーダーの構成を指定することもできます。

パターン 構成オプション
pulsar.client.* Pulsar クライアント構成
pulsar.admin.* Pulsar 管理者構成
pulsar.reader.* Pulsar リーダー構成

開始オフセット JSON を作成する

startingOffsets オプションでオフセットを JSON として指定するカスタム メッセージ ID を使用するには、次の例を参照してください。

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()