SimpleDataSourceStreamReader

En basklass för förenklade läsare av strömmande datakällor.

Jämfört DataSourceStreamReadermed kräver SimpleDataSourceStreamReader inte planering av datapartitioner. Metoden read() gör det möjligt att läsa data och planera den senaste förskjutningen samtidigt.

Eftersom SimpleDataSourceStreamReader läser poster i Spark-drivrutinen för att fastställa slutförskjutningen för varje batch utan partitionering, passar den bara för enkla användningsfall där indatahastigheten och batchstorleken är små. Använd DataSourceStreamReader när läsdataflödet är högt och inte kan hanteras av en enda process.

Har lagts till i Databricks Runtime 15.3

Syntax

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

Metod Beskrivning
initialOffset() Returnerar den första förskjutningen av den strömmande datakällan. En ny direktuppspelningsfråga börjar läsas från den här förskjutningen.
read(start) Läser alla tillgängliga data från startförskjutningen och returnerar en tupplare av en iterator med poster och slutförskjutningen för nästa läsförsök.
readBetweenOffsets(start, end) Läser alla tillgängliga data mellan specifika start- och slutförskjutningar. Anropas under misslyckad återställning för att läsa om en batch deterministiskt.
commit(end) Informerar källan om att Spark har slutfört bearbetningen av alla data för förskjutningar som är mindre än eller lika med end.

Exempel

Definiera en anpassad förenklad strömmande datakällaläsare:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()