Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Een basisklasse voor vereenvoudigde lezers van streaminggegevensbronnen.
DataSourceStreamReaderIn vergelijking met , SimpleDataSourceStreamReader is het plannen van gegevenspartities niet vereist. Met de read() methode kunt u gegevens lezen en tegelijkertijd de meest recente offset plannen.
Omdat SimpleDataSourceStreamReader records in het Spark-stuurprogramma worden gelezen om de eindverrekening van elke batch te bepalen zonder partitionering, is het alleen geschikt voor lichtgewicht gebruiksvoorbeelden waarbij de invoersnelheid en batchgrootte klein zijn. Gebruik DataSourceStreamReader deze functie wanneer leesdoorvoer hoog is en niet door één proces kan worden verwerkt.
Toegevoegd in Databricks Runtime 15.3
Syntaxis
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| Methode | Beschrijving |
|---|---|
initialOffset() |
Retourneert de eerste offset van de streaminggegevensbron. Een nieuwe streamingquery begint met lezen vanaf deze offset. |
read(start) |
Leest alle beschikbare gegevens van de begin offset en retourneert een tuple van een iterator van records en de eind offset voor de volgende leespoging. |
readBetweenOffsets(start, end) |
Leest alle beschikbare gegevens tussen specifieke begin- en eind offsets. Aangeroepen tijdens foutherstel om een batch deterministisch opnieuw te lezen. |
commit(end) |
Informeert de bron dat Spark alle gegevens heeft verwerkt voor offsets kleiner dan of gelijk aan end. |
Examples
Definieer een aangepaste vereenvoudigde lezer voor streaminggegevensbronnen:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()