重要
この機能はパブリック プレビュー段階にあります。
Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Azure Databricks 上の Apache Pulsar からデータをストリーミングできます。
構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して 1 回だけ処理のセマンティクスを提供します。
構文の例
次に、構造化ストリーミングを使用して Pulsar から読み取る基本的な例を示します。
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Pulsar トピックから読むには、 service.url と次のいずれかのオプションを指定する必要があります。
topictopicstopicsPattern
オプションの完全な一覧については、「Pulsar ストリーミング読み取りのオプションを構成する」を参照してください。
Pulsar に対する認証
Azure Databricks では、Pulsar に対する認証として、トラストストアとキーストアがサポートされています。 Databricks では、シークレットを使用して構成の詳細を格納することをお勧めします。
使用可能なストリーム構成オプションは次のとおりです。
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
ストリームで PulsarAdminを使用する場合は、次のオプションを設定する必要があります。
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Example
次の例では、認証オプションの構成を示します。
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar スキーマ
Pulsar から読み取る場合、行のスキーマはソースのトピックのスキーマによって異なります。
- Avro スキーマまたは JSON スキーマを含むトピックの場合、フィールド名とフィールド型は結果の Spark DataFrame に保持されます。
- スキーマのないトピック、または Pulsar の単純なデータ型を使用するトピックの場合、ペイロードは
value列に読み込まれます。 - スキーマが異なる複数のトピックを読み取るストリームを構成する場合は、生のコンテンツを
allowDifferentTopicSchemas列に読み込むvalueを設定します。
Pulsar レコードには、次のメタデータ フィールドがあります。
| 列 | タイプ |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Pulsar ストリーミング読み取り用オプションの設定
読み取りストリームの .option("<optionName>", "<optionValue>") 構文を使用して、次のすべてのオプションを構成します。
.options()を使用して認証を構成することもできます。 「Pulsar に対する認証」を参照してください。
次の表で、Pulsar に必要な構成について説明します。
topic、topics、topicsPattern のいずれかのオプションのみを指定する必要があります。
| オプション | 既定値 | 説明 |
|---|---|---|
service.url |
なし | Pulsar サービスの Pulsar serviceUrl 構成。 |
topic |
なし | コンシュームするトピックのトピック名を表す文字列。 |
topics |
なし | 受信するトピックのカンマ区切りのリスト。 |
topicsPattern |
なし | 使用するトピックに一致する Java 正規表現文字列。 |
次の表で、Pulsar でサポートされているその他のオプションについて説明します。
| オプション | 既定値 | 説明 |
|---|---|---|
predefinedSubscription |
なし | Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される、定義済みのサブスクリプション名。 |
subscriptionPrefix |
なし | Spark アプリケーションの進行状況を追跡するランダムなサブスクリプションを生成するために、コネクタによって使用されるプレフィックス。 |
pollTimeoutMs |
120000 | Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。 |
waitingForNonExistedTopic |
false |
コネクタは目的のトピックが作成されるまで待機する必要があるかどうか。 |
failOnDataLoss |
true |
データが失われたとき (たとえば、トピックが削除された場合や、アイテム保持ポリシーのためにメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。 |
allowDifferentTopicSchemas |
false |
スキーマが異なる複数のトピックを読み取る場合は、このオプションを使用して、スキーマベースのトピック値の自動逆シリアル化を無効にします。 これが true の場合は、生の値のみが返されます。 |
startingOffsets |
latest |
latest の場合、リーダーは実行を開始した後に最新のレコードを読み取ります。
earliest の場合、リーダーは最も早いオフセットから読み取ります。 特定のオフセットの JSON 文字列を指定することもできます。 |
maxBytesPerTrigger |
なし | マイクロバッチごとに処理する最大バイト数のソフト制限。 このオプションを指定する場合は、 admin.urlも指定する必要があります。 |
admin.url |
なし | PulsarserviceHttpUrl の構成。
maxBytesPerTriggerが指定されている場合に必要です。 |
次のパターンを使用して、Pulsar クライアント、管理者、リーダーの構成を指定することもできます。
| パターン | 構成オプション |
|---|---|
pulsar.client.* |
Pulsar クライアント構成 |
pulsar.admin.* |
Pulsar 管理者構成 |
pulsar.reader.* |
Pulsar リーダー構成 |
開始オフセット JSON を作成する
startingOffsets オプションでオフセットを JSON として指定するカスタム メッセージ ID を使用するには、次の例を参照してください。
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()