SimpleDataSourceStreamReader

Classe de base pour les lecteurs de sources de données de diffusion en continu simplifiés.

Par rapport à DataSourceStreamReader, SimpleDataSourceStreamReader ne nécessite pas de partitions de données de planification. La read() méthode permet de lire les données et de planifier le dernier décalage en même temps.

Étant donné que SimpleDataSourceStreamReader les enregistrements lisent dans le pilote Spark pour déterminer le décalage de fin de chaque lot sans partitionnement, il est uniquement adapté aux cas d’usage légers où le taux d’entrée et la taille du lot sont petits. Utilisez DataSourceStreamReader quand le débit de lecture est élevé et ne peut pas être géré par un seul processus.

Ajouté dans Databricks Runtime 15.3

Syntaxe

from pyspark.sql.datasource import SimpleDataSourceStreamReader

class MyStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"offset": 0}

    def read(self, start):
        ...

    def readBetweenOffsets(self, start, end):
        ...

Méthodes

Méthode Description
initialOffset() Retourne le décalage initial de la source de données de streaming. Une nouvelle requête de streaming commence à lire à partir de ce décalage.
read(start) Lit toutes les données disponibles à partir du décalage de début et retourne un tuple d’un itérateur d’enregistrements et le décalage de fin pour la prochaine tentative de lecture.
readBetweenOffsets(start, end) Lit toutes les données disponibles entre des décalages de début et de fin spécifiques. Appelé lors de la récupération d’échec pour relire un lot de manière déterministe.
commit(end) Informe la source que Spark a terminé de traiter toutes les données pour les décalages inférieurs ou égaux à end.

Exemples

Définissez un lecteur de source de données de streaming simplifié personnalisé :

from pyspark.sql.datasource import DataSource, SimpleDataSourceStreamReader

class MyStreamingDataSource(DataSource):
    @classmethod
    def name(cls):
        return "my_streaming_source"

    def schema(self):
        return "value STRING"

    def simpleStreamReader(self, schema):
        return MySimpleStreamReader()

class MySimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        return {"partition-1": {"index": 0}}

    def read(self, start):
        end = {"partition-1": {"index": start["partition-1"]["index"] + 1}}
        def records():
            yield ("hello",)
        return records(), end

    def readBetweenOffsets(self, start, end):
        def records():
            yield ("hello",)
        return records()

    def commit(self, end):
        pass

spark.dataSource.register(MyStreamingDataSource)
df = spark.readStream.format("my_streaming_source").load()