Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Uma classe base para leitores simplificados da fonte de dados de streaming.
Em comparação com DataSourceStreamReader, SimpleDataSourceStreamReader não requer o planejamento de partições de dados. O read() método permite ler dados e planejar o deslocamento mais recente ao mesmo tempo.
Como SimpleDataSourceStreamReader lê registros no driver spark para determinar o deslocamento final de cada lote sem particionamento, ele é adequado apenas para casos de uso leve em que a taxa de entrada e o tamanho do lote são pequenos. Use DataSourceStreamReader quando a taxa de transferência de leitura for alta e não puder ser tratada por um único processo.
Adicionado no Databricks Runtime 15.3
Sintaxe
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| Método | Descrição |
|---|---|
initialOffset() |
Retorna o deslocamento inicial da fonte de dados de streaming. Uma nova consulta de streaming inicia a leitura desse deslocamento. |
read(start) |
Lê todos os dados disponíveis do deslocamento inicial e retorna uma tupla de um iterador de registros e o deslocamento final para a próxima tentativa de leitura. |
readBetweenOffsets(start, end) |
Lê todos os dados disponíveis entre deslocamentos de início e término específicos. Invocado durante a recuperação de falha para ler novamente um lote deterministicamente. |
commit(end) |
Informa à fonte que o Spark concluiu o processamento de todos os dados para deslocamentos menores ou iguais a end. |
Exemplos
Defina um leitor de fonte de dados de streaming simplificado personalizado:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()