Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Clase base para lectores simplificados de orígenes de datos de streaming.
En comparación con DataSourceStreamReader, SimpleDataSourceStreamReader no requiere planear particiones de datos. El read() método permite leer datos y planear el desplazamiento más reciente al mismo tiempo.
Dado SimpleDataSourceStreamReader que lee registros en el controlador de Spark para determinar el desplazamiento final de cada lote sin crear particiones, solo es adecuado para casos de uso ligeros en los que la velocidad de entrada y el tamaño del lote son pequeños. Se usa DataSourceStreamReader cuando el rendimiento de lectura es alto y no se puede controlar mediante un único proceso.
Agregado en Databricks Runtime 15.3
Sintaxis
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| Método | Descripción |
|---|---|
initialOffset() |
Devuelve el desplazamiento inicial del origen de datos de streaming. Una nueva consulta de streaming comienza a leer desde este desplazamiento. |
read(start) |
Lee todos los datos disponibles del desplazamiento inicial y devuelve una tupla de un iterador de registros y el desplazamiento final del siguiente intento de lectura. |
readBetweenOffsets(start, end) |
Lee todos los datos disponibles entre desplazamientos de inicio y finalización específicos. Se invoca durante la recuperación de errores para volver a leer un lote de forma determinista. |
commit(end) |
Informa al origen de que Spark ha completado el procesamiento de todos los datos para desplazamientos inferiores o iguales a end. |
Ejemplos
Defina un lector de origen de datos de streaming simplificado personalizado:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()