Aangepaste gegevensbronnen van PySpark

Aangepaste pySpark-gegevensbronnen worden gemaakt met behulp van de Python DataSource-API (PySpark), waarmee u kunt lezen uit aangepaste gegevensbronnen en schrijven naar aangepaste gegevenssinks in Apache Spark met behulp van Python. U kunt aangepaste gegevensbronnen van PySpark gebruiken om aangepaste verbindingen met gegevenssystemen te definiëren en extra functionaliteit te implementeren om herbruikbare gegevensbronnen uit te bouwen.

Notitie

Voor aangepaste pySpark-gegevensbronnen is Databricks Runtime 15.4 LTS en hoger of serverloze omgevingsversie 2 vereist.

Gegevensbronklasse

PySpark DataSource is een basisklasse die methoden biedt voor het maken van gegevenslezers en schrijvers.

De subklasse van de gegevensbron implementeren

Afhankelijk van uw gebruiksscenario moet het volgende worden geïmplementeerd door een subklasse om een gegevensbron leesbaar, beschrijfbaar of beide te maken:

Eigenschap of methode Beschrijving
name Vereist. De naam van de gegevensbron
schema Vereist. Het schema van de gegevensbron die moet worden gelezen of geschreven
reader() Moet een DataSourceReader retourneren om de gegevensbron leesbaar te maken (batch)
writer() Om de gegevenssink schrijfbaar te maken, moet een DataSourceWriter worden geretourneerd (batch).
streamReader() of simpleStreamReader() Moet een DataSourceStreamReader retourneren om de gegevensstroom leesbaar te maken (streaming)
streamWriter() Moet een DataSourceStreamWriter retourneren om de gegevensstroom beschrijfbaar te maken (streaming)

Notitie

De door de gebruiker gedefinieerdeDataSource, DataSourceReader, DataSourceWriteren DataSourceStreamReaderDataSourceStreamWriterhun methoden moeten serialiseerbaar zijn. Met andere woorden, ze moeten een woordenboek of een genest woordenboek zijn die een primitief type bevatten.

De gegevensbron registreren

Nadat u de interface hebt geïmplementeerd, moet u deze registreren en kunt u deze laden of anderszins gebruiken, zoals wordt weergegeven in het volgende voorbeeld:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

Voorbeeld 1: Een PySpark-gegevensbron maken voor batchquery

Als u de mogelijkheden van PySpark DataSource-lezer wilt demonstreren, maakt u een gegevensbron waarmee voorbeeldgegevens worden gegenereerd met behulp van het faker Python-pakket. Raadpleeg de faker voor meer informatie over .

Installeer het faker pakket met behulp van de volgende opdracht:

%pip install faker

Stap 1: De lezer voor een batchquery implementeren

Implementeer eerst de lezerlogica om voorbeeldgegevens te genereren. Gebruik de geïnstalleerde faker-bibliotheek om elk veld in het schema te vullen.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Stap 2: De voorbeeldgegevensbron definiëren

Definieer vervolgens uw nieuwe PySpark DataSource als subklasse DataSource met een naam, schema en lezer. De reader() methode moet worden gedefinieerd om te lezen uit een gegevensbron in een batchquery.

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Stap 3: De voorbeeldgegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. De FakeDataSource heeft standaard drie rijen en het schema bevat de volgende string velden: name, date, zipcode, state. In het volgende voorbeeld wordt de voorbeeldgegevensbron geregistreerd, geladen en uitgevoerd met de standaardinstellingen:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Alleen string velden worden ondersteund, maar u kunt een schema opgeven met velden die overeenkomen met faker de velden van pakketproviders om willekeurige gegevens te genereren voor testen en ontwikkelen. In het volgende voorbeeld wordt de gegevensbron geladen met name en company velden:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Als u de gegevensbron wilt laden met een aangepast aantal rijen, geeft u de numRows optie op. In het volgende voorbeeld worden vijf rijen opgegeven:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Voorbeeld 2: Een PySpark GitHub DataSource maken met behulp van varianten

Om het gebruik van varianten in een PySpark DataSource te demonstreren, wordt in dit voorbeeld een gegevensbron gemaakt die pull-aanvragen van GitHub leest.

Notitie

Varianten worden ondersteund met aangepaste PySpark-gegevensbronnen in Databricks Runtime 17.1 en hoger.

Zie Queryvariantgegevens voor meer informatie over varianten.

Stap 1: De lezer implementeren om pull-aanvragen op te halen

Implementeer eerst de lezerlogica om pull-aanvragen op te halen uit de opgegeven GitHub-opslagplaats.

class GithubVariantPullRequestReader(DataSourceReader):
    def __init__(self, options):
        self.token = options.get("token")
        self.repo = options.get("path")
        if self.repo is None:
            raise Exception(f"Must specify a repo in `.load()` method.")
        # Every value in this `self.options` dictionary is a string.
        self.num_rows = int(options.get("numRows", 10))

    def read(self, partition):
        header = {
            "Accept": "application/vnd.github+json",
        }
        if self.token is not None:
            header["Authorization"] = f"Bearer {self.token}"
        url = f"https://api.github.com/repos/{self.repo}/pulls"
        response = requests.get(url, headers=header)
        response.raise_for_status()
        prs = response.json()
        for pr in prs[:self.num_rows]:
            yield Row(
                id = pr.get("number"),
                title = pr.get("title"),
                user = VariantVal.parseJson(json.dumps(pr.get("user"))),
                created_at = pr.get("created_at"),
                updated_at = pr.get("updated_at")
            )

Stap 2: De GitHub-gegevensbron definiëren

Definieer vervolgens uw nieuwe PySpark GitHub DataSource als een subklasse van DataSource met een naam, schema en methode reader(). Het schema bevat de volgende velden: id, title, user, created_at, . updated_at Het user veld wordt gedefinieerd als een variant.

import json
import requests

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import VariantVal

class GithubVariantDataSource(DataSource):
    @classmethod
    def name(self):
        return "githubVariant"
    def schema(self):
        return "id int, title string, user variant, created_at string, updated_at string"
    def reader(self, schema):
        return GithubVariantPullRequestReader(self.options)

Stap 3: de gegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. In het volgende voorbeeld wordt de gegevensbron geregistreerd, vervolgens geladen, en worden drie rijen met de PR-gegevens van de GitHub-opslagplaats uitgevoerd.

spark.dataSource.register(GithubVariantDataSource)
spark.read.format("githubVariant").option("numRows", 3).load("apache/spark").display()
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+
| id      | title                                               | user                | created_at           | updated_at           |
+---------+---------------------------------------------------- +---------------------+----------------------+----------------------+
|   51293 |[SPARK-52586][SQL] Introduce AnyTimeType             |  {"avatar_url":...} | 2025-06-26T09:20:59Z | 2025-06-26T15:22:39Z |
|   51292 |[WIP][PYTHON] Arrow UDF for aggregation              |  {"avatar_url":...} | 2025-06-26T07:52:27Z | 2025-06-26T07:52:37Z |
|   51290 |[SPARK-50686][SQL] Hash to sort aggregation fallback |  {"avatar_url":...} | 2025-06-26T06:19:58Z | 2025-06-26T06:20:07Z |
+---------+-----------------------------------------------------+---------------------+----------------------+----------------------+

Voorbeeld 3: PySpark DataSource maken voor het streamen van lezen en schrijven

Maak een voorbeeldgegevensbron waarmee twee rijen in elke microbatch worden gegenereerd met behulp van het faker Python-pakket om pySpark DataSource-streamlezer- en schrijfmogelijkheden te demonstreren. Raadpleeg de faker voor meer informatie over .

Installeer het faker pakket met behulp van de volgende opdracht:

%pip install faker

Stap 1: De streamlezer implementeren

Implementeer eerst de voorbeeldlezer voor streaminggegevens waarmee twee rijen in elke microbatch worden gegenereerd. U kunt in plaats daarvan implementeren DataSourceStreamReaderof als de gegevensbron een lage doorvoer heeft en geen partitionering vereist, kunt u in plaats daarvan implementeren SimpleDataSourceStreamReader . simpleStreamReader() of streamReader() moet worden geïmplementeerd en simpleStreamReader() wordt alleen aangeroepen wanneer streamReader() niet is geïmplementeerd.

DataSourceStreamReader-implementatie

Het streamReader exemplaar heeft een gehele offset die met 2 toeneemt in elke microbatch, geïmplementeerd via de DataSourceStreamReader interface.

from pyspark.sql.datasource import InputPartition
from typing import Iterator, Tuple
import os
import json

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

Implementatie van SimpleDataSourceStreamReader

Het SimpleStreamReader exemplaar is hetzelfde als het FakeStreamReader exemplaar dat twee rijen in elke batch genereert, maar geïmplementeerd met de SimpleDataSourceStreamReader interface zonder partitionering.

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

Stap 2: De stream writer implementeren

Implementeer vervolgens de streamingschrijver. Deze streaminggegevensschrijver schrijft de metagegevens van elke microbatch naar een lokaal pad.

from pyspark.sql.datasource import DataSourceStreamWriter, WriterCommitMessage

class SimpleCommitMessage(WriterCommitMessage):
   def __init__(self, partition_id: int, count: int):
       self.partition_id = partition_id
       self.count = count

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data and then returns the commit message for that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

Stap 3: De voorbeeldgegevensbron definiëren

Definieer nu uw nieuwe PySpark DataSource als een subklasse van DataSource met een naam, schema en methoden streamReader() en streamWriter().

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

Stap 4: De voorbeeldgegevensbron registreren en gebruiken

Als u de gegevensbron wilt gebruiken, moet u deze registreren. Nadat deze is geregistreerd, kunt u deze gebruiken in streamingquery's als bron of sink door een korte naam of volledige naam door te geven aan format(). In het volgende voorbeeld wordt eerst de gegevensbron geregistreerd, waarna een query wordt gestart die leest uit de gegevensbron in het voorbeeld en uitvoer naar de console genereert.

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

De volgende code maakt ook gebruik van de voorbeeldstroom als sink en geeft een uitvoerpad op:

spark.dataSource.register(FakeStreamDataSource)

# Make sure the output directory exists and is writable
output_path = "/output_path"
dbutils.fs.mkdirs(output_path)
checkpoint_path = "/output_path/checkpoint"

query = (
    spark.readStream
    .format("fakestream")
    .load()
    .writeStream
    .format("fakestream")
    .option("path", output_path)
    .option("checkpointLocation", checkpoint_path)
    .start()
)

Voorbeeld 4: Een Google BigQuery-streamingconnector maken

In het volgende voorbeeld ziet u hoe u een aangepaste streamingconnector bouwt voor Google BigQuery (BQ) met behulp van een PySpark DataSource. Databricks biedt een Spark-connector voor opname van BigQuery-batches en Lakehouse Federation kan ook op afstand verbinding maken met elke BigQuery-gegevensset en gegevens ophalen via het maken van een externe catalogus, maar biedt geen volledige ondersteuning voor incrementele of continue streamingwerkstromen. Deze connector maakt stapsgewijze incrementele gegevensmigratie mogelijk en bijna realtime migratie van BigQuery-tabellen die worden ingevoerd door streamingbronnen met permanente controlepunten.

Deze aangepaste connector heeft de volgende functies:

  • Compatibel met structured streaming en Lakeflow Spark-declaratieve pijplijnen.
  • Ondersteunt incrementele recordtracking en continue streamingopname en volgt semantiek voor gestructureerd streamen.
  • Maakt gebruik van de BigQuery Storage-API met een RPC-protocol voor snellere, goedkopere gegevensoverdracht.
  • Hiermee worden gemigreerde tabellen rechtstreeks naar Unity Catalog geschreven.
  • Hiermee worden controlepunten automatisch beheerd met behulp van een incrementeel veld op basis van een datum of tijdstempel.
  • Ondersteunt batchverwerking met Trigger.AvailableNow().
  • Hiervoor is geen tussenliggende cloudopslag vereist.
  • Serialiseert BigQuery-gegevensoverdracht met behulp van de Arrow- of Avro-indeling.
  • Verwerkt automatisch parallelisme en distribueert het werk over Spark-workers op basis van het gegevensvolume.
  • Geschikt voor migratie van raw- en bronzen lagen vanuit BigQuery, met ondersteuning voor migraties van Silver- en Gold-lagen met behulp van SCD Type 1- of Type 2-patronen.

Vereiste voorwaarden

Voordat u de aangepaste connector implementeert, moet u de vereiste pakketten installeren:

%pip install faker google.cloud google.cloud.bigquery google.cloud.bigquery_storage

Stap 1: De streamlezer implementeren

Implementeer eerst de streaminggegevenslezer. De DataSourceStreamReader subklasse moet de volgende methoden implementeren:

  • initialOffset(self) -> dict
  • latestOffset(self) -> dict
  • partitions(self, start: dict, end: dict) -> Sequence[InputPartition]
  • read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator[Row]]
  • commit(self, end: dict) -> None
  • stop(self) -> None

Zie Methoden voor meer informatie over elke methode.

import os
from pyspark.sql.datasource import DataSourceStreamReader, InputPartition
from pyspark.sql.datasource import DataSourceStreamWriter
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource
from pathlib import Path
from pyarrow.lib import TimestampScalar
from datetime import datetime
from typing import Iterator, Tuple, Any, Dict, List, Sequence
from google.cloud.bigquery_storage import BigQueryReadClient, ReadSession
from google.cloud import bigquery_storage
import pandas
import datetime
import uuid
import time, logging

start_time = time.time()


class RangePartition(InputPartition):
    def __init__(self, session: ReadSession, stream_idx: int):
        self.session = session
        self.stream_idx = stream_idx


class BQStreamReader(DataSourceStreamReader):

    def __init__(self, schema, options):
        self.project_id = options.get("project_id")
        self.dataset = options.get("dataset")
        self.table = options.get("table")
        self.json_auth_file = "/home/"+options.get("service_auth_json_file_name")
        self.max_parallel_conn = options.get("max_parallel_conn", 1000)
        self.incremental_checkpoint_field = options.get("incremental_checkpoint_field", "")

        self.last_offset = None

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        from datetime import datetime
        logging.info("Inside initialOffset!!!!!")
        # self.increment_latest_vals.append(datetime.strptime('1900-01-01 23:57:12', "%Y-%m-%d %H:%M:%S"))
        self.last_offset = '1900-01-01 23:57:12'

        return {"offset": str(self.last_offset)}

    def latestOffset(self):
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        from datetime import datetime
        from google.cloud import bigquery

        if (self.last_offset is None):
            self.last_offset = '1900-01-01 23:57:12'

        client = bigquery.Client.from_service_account_json(self.json_auth_file)
        # max_offset=start["offset"]
        logging.info(f"************************last_offset: {self.last_offset}***********************")
        f_sql_str = ''
        for x_str in self.incremental_checkpoint_field.strip().split(","):
            f_sql_str += f"{x_str}>'{self.last_offset}' or "
        f_sql_str = f_sql_str[:-3]
        job_query = client.query(
            f"select max({self.incremental_checkpoint_field}) from {self.project_id}.{self.dataset}.{self.table} where {f_sql_str}")
        for query in job_query.result():
            max_res = query[0]

        if (str(max_res).lower() != 'none'):
            return {"offset": str(max_res)}

        return {"offset": str(self.last_offset)}

    def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:

        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        if (self.last_offset is None):
            self.last_offset = end['offset']

        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file

        # project_id = self.auth_project_id

        client = BigQueryReadClient()

        # This example reads baby name data from the public datasets.
        table = "projects/{}/datasets/{}/tables/{}".format(
            self.project_id, self.dataset, self.table
        )
        requested_session = bigquery_storage.ReadSession()
        requested_session.table = table
        if (self.incremental_checkpoint_field != ''):
            start_offset = start["offset"]
            end_offset = end["offset"]
            f_sql_str = ''
            for x_str in self.incremental_checkpoint_field.strip().split(","):
                f_sql_str += f"({x_str}>'{start_offset}' and {x_str}<='{end_offset}') or "
            f_sql_str = f_sql_str[:-3]
            requested_session.read_options.row_restriction = f"{f_sql_str}"

        # This example leverages Apache Avro.
        requested_session.data_format = bigquery_storage.DataFormat.AVRO

        parent = "projects/{}".format(self.project_id)
        session = client.create_read_session(
            request={
                "parent": parent,
                "read_session": requested_session,
                "max_stream_count": int(self.max_parallel_conn),
            },
        )
        self.last_offset = end['offset']
        return [RangePartition(session, i) for i in range(len(session.streams))]

    def read(self, partition) -> Iterator[List]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        from datetime import datetime
        session = partition.session
        stream_idx = partition.stream_idx
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.json_auth_file
        client_1 = BigQueryReadClient()
        # requested_session.read_options.selected_fields = ["census_tract", "clearance_date", "clearance_status"]
        reader = client_1.read_rows(session.streams[stream_idx].name)
        reader_iter = []

        for message in reader.rows():
            reader_iter_in = []
            for k, v in message.items():
                reader_iter_in.append(v)
            # yield(reader_iter)
            reader_iter.append(reader_iter_in)
            # yield (message['hash'], message['size'], message['virtual_size'], message['version'])
        # self.increment_latest_vals.append(max_incr_val)
        return iter(reader_iter)

    def commit(self, end):

        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

Stap 2: De gegevensbron definiëren

Definieer vervolgens de aangepaste gegevensbron. De DataSource subklasse moet de volgende methoden implementeren:

  • name(cls) -> str
  • schema(self) -> Union[StructType, str]

Zie Methoden voor meer informatie over elke methode.

from pyspark.sql.datasource import DataSource
from pyspark.sql.types import StructType
from google.cloud import bigquery

class BQStreamDataSource(DataSource):
    """
    An example data source for streaming data from a public API containing users' comments.
    """

    @classmethod
    def name(cls):
        return "bigquery-streaming"

    def schema(self):
        type_map = {'integer': 'long', 'float': 'double', 'record': 'string'}
        json_auth_file = "/home/" + self.options.get("service_auth_json_file_name")
        client = bigquery.Client.from_service_account_json(json_auth_file)
        table_ref = self.options.get("project_id") + '.' + self.options.get("dataset") + '.' + self.options.get("table")
        table = client.get_table(table_ref)
        original_schema = table.schema
        result = []
        for schema in original_schema:
            col_attr_name = schema.name
            if (schema.mode != 'REPEATED'):
                col_attr_type = type_map.get(schema.field_type.lower(), schema.field_type.lower())
            else:
                col_attr_type = f"array<{type_map.get(schema.field_type.lower(), schema.field_type.lower())}>"
            result.append(col_attr_name + " " + col_attr_type)

        return ",".join(result)
        # return "census_tract double,clearance_date string,clearance_status string"

    def streamReader(self, schema: StructType):
        return BQStreamReader(schema, self.options)

Stap 3: de streamingquery configureren en starten

Registreer ten slotte de connector, configureer de streamingquery en start deze:

spark.dataSource.register(BQStreamDataSource)

# Ingests table data incrementally using the provided timestamp-based field.
# The latest value is checkpointed using offset semantics.
# Without the incremental input field, full table ingestion is performed.
# Service account JSON files must be available to every Spark executor worker
# in the /home folder using --files /home/<file_name>.json or an init script.

query = (
    spark.readStream.format("bigquery-streaming")
    .option("project_id", <bq_project_id>)
    .option("incremental_checkpoint_field", <table_incremental_ts_based_col>)
    .option("dataset", <bq_dataset_name>)
    .option("table", <bq_table_name>)
    .option("service_auth_json_file_name", <service_account_json_file_name>)
    .option("max_parallel_conn", <max_parallel_threads_to_pull_data>)  # defaults to max 1000
    .load()
)

(
    query.writeStream.trigger(processingTime="30 seconds")
    .option("checkpointLocation", "checkpoint_path")
    .foreachBatch(writeToTable)  # your target table write function
    .start()
)

Uitvoeringsorder

De uitvoeringsvolgorde van de functie van de aangepaste stream wordt hieronder beschreven.

Voor het laden van het DataFrame van de Spark-stream:

name(cls)
schema()

Voor een microbatch (n) van een nieuwe query of bij het opnieuw opstarten van een bestaande query (nieuw of bestaand controlepunt):

partitions(end_offset, end_offset)  # loads the last saved offset from the checkpoint at query restart
latestOffset()
partitions(start_offset, end_offset)  # plans partitions and distributes to Python workers
read()  # user’s source read definition, runs on each Python worker
commit()

Voor de volgende (n+1) microbatch van een actieve query op een bestaand controlepunt:

latestOffset()
partitions(start_offset, end_offset)
read()
commit()

Notitie

De latestOffset functie organiseert controlepunten. Deel een controlepuntvariabele van een primitief type tussen functies en retourneer deze als een woordenlijst. Bijvoorbeeld: return {"offset": str(self.last_offset)}

Probleemoplossing

Als de uitvoer de volgende fout is, biedt uw berekening geen ondersteuning voor aangepaste PySpark-gegevensbronnen. U moet Databricks Runtime 15.2 of hoger gebruiken.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000