Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
En basklass för förenklade läsare av strömmande datakällor.
Jämfört DataSourceStreamReadermed kräver SimpleDataSourceStreamReader inte planering av datapartitioner. Metoden read() gör det möjligt att läsa data och planera den senaste förskjutningen samtidigt.
Eftersom SimpleDataSourceStreamReader läser poster i Spark-drivrutinen för att fastställa slutförskjutningen för varje batch utan partitionering, passar den bara för enkla användningsfall där indatahastigheten och batchstorleken är små. Använd DataSourceStreamReader när läsdataflödet är högt och inte kan hanteras av en enda process.
Har lagts till i Databricks Runtime 15.3
Syntax
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class MyStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"offset": 0}
def read(self, start):
...
def readBetweenOffsets(self, start, end):
...
Methods
| Metod | Beskrivning |
|---|---|
initialOffset() |
Returnerar den första förskjutningen av den strömmande datakällan. En ny direktuppspelningsfråga börjar läsas från den här förskjutningen. |
read(start) |
Läser alla tillgängliga data från startförskjutningen och returnerar en tupplare av en iterator med poster och slutförskjutningen för nästa läsförsök. |
readBetweenOffsets(start, end) |
Läser alla tillgängliga data mellan specifika start- och slutförskjutningar. Anropas under misslyckad återställning för att läsa om en batch deterministiskt. |
commit(end) |
Informerar källan om att Spark har slutfört bearbetningen av alla data för förskjutningar som är mindre än eller lika med end. |
Exempel
Definiera en anpassad förenklad strömmande datakällaläsare:
from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader
class MyStreamingDataSource(DataSource):
@classmethod
def name(cls):
return "my_streaming_source"
def schema(self):
return "value STRING"
def simpleStreamReader(self, schema):
return MySimpleStreamReader()
class MySimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
return {"partition-1": {"index": 0}}
def read(self, start):
end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
def records():
yield ("hello",)
return records(), end
def readBetweenOffsets(self, start, end):
def records():
yield ("hello",)
return records()
def commit(self, end):
pass
spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()