SimpleDataSourceStreamReader

Clase base para lectores simplificados de orígenes de datos de streaming.

En comparación con DataSourceStreamReader, SimpleDataSourceStreamReader no requiere planear particiones de datos. El read() método permite leer datos y planear el desplazamiento más reciente al mismo tiempo.

Dado SimpleDataSourceStreamReader que lee registros en el controlador de Spark para determinar el desplazamiento final de cada lote sin crear particiones, solo es adecuado para casos de uso ligeros en los que la velocidad de entrada y el tamaño del lote son pequeños. Se usa DataSourceStreamReader cuando el rendimiento de lectura es alto y no se puede controlar mediante un único proceso.

Agregado en Databricks Runtime 15.3

Sintaxis

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Methods

Método Descripción
initialOffset() Devuelve el desplazamiento inicial del origen de datos de streaming. Una nueva consulta de streaming comienza a leer desde este desplazamiento.
read(start) Lee todos los datos disponibles del desplazamiento inicial y devuelve una tupla de un iterador de registros y el desplazamiento final del siguiente intento de lectura.
readBetweenOffsets(start, end) Lee todos los datos disponibles entre desplazamientos de inicio y finalización específicos. Se invoca durante la recuperación de errores para volver a leer un lote de forma determinista.
commit(end) Informa al origen de que Spark ha completado el procesamiento de todos los datos para desplazamientos inferiores o iguales a end.

Ejemplos

Defina un lector de origen de datos de streaming simplificado personalizado:

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()