Compartilhar via


SimpleDataSourceStreamReader

Uma classe base para leitores simplificados da fonte de dados de streaming.

Em comparação com DataSourceStreamReader, SimpleDataSourceStreamReader não requer o planejamento de partições de dados. O read() método permite ler dados e planejar o deslocamento mais recente ao mesmo tempo.

Como SimpleDataSourceStreamReader lê registros no driver spark para determinar o deslocamento final de cada lote sem particionamento, ele é adequado apenas para casos de uso leve em que a taxa de entrada e o tamanho do lote são pequenos. Use DataSourceStreamReader quando a taxa de transferência de leitura for alta e não puder ser tratada por um único processo.

Adicionado no Databricks Runtime 15.3

Sintaxe

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

Método Descrição
initialOffset() Retorna o deslocamento inicial da fonte de dados de streaming. Uma nova consulta de streaming inicia a leitura desse deslocamento.
read(start) Lê todos os dados disponíveis do deslocamento inicial e retorna uma tupla de um iterador de registros e o deslocamento final para a próxima tentativa de leitura.
readBetweenOffsets(start, end) Lê todos os dados disponíveis entre deslocamentos de início e término específicos. Invocado durante a recuperação de falha para ler novamente um lote deterministicamente.
commit(end) Informa à fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a end.

Exemplos

Defina um leitor de fonte de dados de streaming simplificado personalizado:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()