次の方法で共有


DataSourceStreamReader

ストリーミング データ ソース リーダーの基本クラス。

データ ソース ストリーム リーダーは、ストリーミング データ ソースからデータを出力する役割を担います。 このクラスを実装し、データ ソースをストリーミング ソースとして読み取り可能にするために、 DataSource.streamReader() からインスタンスを返します。

構文

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

メソッド

メソッド 説明
initialOffset() ストリーミング データ ソースの初期オフセットを dictとして返します。 新しいストリーミング クエリは、このオフセットからの読み取りを開始します。 JSON 形式または dict 形式のプリミティブ型のオフセット キーと値のペアを返す必要があります。 実装されていない場合は PySparkNotImplementedError を発生させます。
latestOffset(start, limit) 開始オフセットと読み取り制限を指定して、 dictとして使用できる最新のオフセットを返します。 新しいデータがない場合、ソースは start と同じオフセットを返す場合があります。 ソースは常に指定された limitを尊重する必要があります。 JSON 形式または dict 形式のプリミティブ型のオフセット キーと値のペアを返す必要があります。 実装されていない場合は PySparkNotImplementedError を発生させます。
partitions(start, end) InputPartitionオフセットとstart オフセットの間のデータを表すend オブジェクトのシーケンスを返します。 startend と等しい場合は、空のシーケンスを返します。 各 InputPartition は、1 つの Spark タスクで処理できるデータ分割を表します。
read(partition) 特定のパーティションのデータを生成し、タプル、行、または PyArrow RecordBatch オブジェクトの反復子を返します。 各タプルまたは行は、最終的な DataFrame の行に変換されます。 このメソッドは抽象メソッドであり、実装する必要があります。
commit(end) end以下のオフセットのすべてのデータの処理が Spark によって完了したことをソースに通知します。 Spark は今後、 end より大きいオフセットのみを要求します。
stop() ソースを停止し、割り当てられているすべてのリソースを解放します。 ストリーミング クエリが終了したときに呼び出されます。

メモ

  • read() は静的でステートレスです。 変更可能なクラス メンバーにアクセスしたり、 read()のさまざまな呼び出し間でメモリ内の状態を保持したりしないでください。
  • partitions()によって返されるすべてのパーティション値は、picklable オブジェクトである必要があります。
  • オフセットは、キーと値がプリミティブ型 (整数、文字列、またはブール型) である dict または再帰 dict として表されます。

例示

インデックス付きレコードのシーケンスから読み取るストリーミング リーダーを実装します。

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")