SimpleDataSourceStreamReader

Een basisklasse voor vereenvoudigde lezers van streaminggegevensbronnen.

DataSourceStreamReaderIn vergelijking met , SimpleDataSourceStreamReader is het plannen van gegevenspartities niet vereist. Met de read() methode kunt u gegevens lezen en tegelijkertijd de meest recente offset plannen.

Omdat SimpleDataSourceStreamReader records in het Spark-stuurprogramma worden gelezen om de eindverrekening van elke batch te bepalen zonder partitionering, is het alleen geschikt voor lichtgewicht gebruiksvoorbeelden waarbij de invoersnelheid en batchgrootte klein zijn. Gebruik DataSourceStreamReader deze functie wanneer leesdoorvoer hoog is en niet door één proces kan worden verwerkt.

Toegevoegd in Databricks Runtime 15.3

Syntaxis

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

Methode Beschrijving
initialOffset() Retourneert de eerste offset van de streaminggegevensbron. Een nieuwe streamingquery begint met lezen vanaf deze offset.
read(start) Leest alle beschikbare gegevens van de begin offset en retourneert een tuple van een iterator van records en de eind offset voor de volgende leespoging.
readBetweenOffsets(start, end) Leest alle beschikbare gegevens tussen specifieke begin- en eind offsets. Aangeroepen tijdens foutherstel om een batch deterministisch opnieuw te lezen.
commit(end) Informeert de bron dat Spark alle gegevens heeft verwerkt voor offsets kleiner dan of gelijk aan end.

Examples

Definieer een aangepaste vereenvoudigde lezer voor streaminggegevensbronnen:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()