Abonneren op Google Pub/Sub

Gebruik de ingebouwde connector om u te abonneren op Google Pub/Sub. Deze connector biedt exactly-once verwerkingssemantiek voor records van de abonnee.

Notitie

Pub/Sub kan mogelijk dubbele gegevens publiceren of gegevens kunnen mogelijk in de verkeerde volgorde bij de abonnee aankomen. Schrijf code om dubbele en niet-volgordegegevens af te handelen.

Een Pub/Substream configureren

In het volgende codevoorbeeld ziet u de basissyntaxis voor het configureren van een Structured Streaming-leesbewerking uit Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Voor meer configuratieopties, zie Opties configureren voor Pub/Sub-streamingleesbewerkingen.

Toegang tot Pub/Sub configureren

De referenties die u configureert, moeten de volgende rollen hebben.

Rollen Vereist of optioneel Hoe rol wordt gebruikt
roles/pubsub.viewer of roles/viewer Vereist Controleert of het abonnement bestaat en het abonnement ophaalt.
roles/pubsub.subscriber Vereist Hiermee worden gegevens opgehaald uit een abonnement.
roles/pubsub.editor of roles/editor Optioneel Hiermee kunt u een abonnement aanmaken als er nog geen bestaat en kunt u het deleteSubscriptionOnStreamStop gebruiken om abonnementen te verwijderen bij het beƫindigen van een stream.

Databricks raadt het gebruik van geheimen aan bij het leveren van autorisatieopties. De volgende opties zijn vereist om een verbinding te autoriseren:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Inzicht in het pub-/subschema

Het schema voor de stream komt overeen met de records die worden opgehaald uit Pub/Sub, zoals beschreven in de volgende tabel.

Veld Typologie
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Opties configureren voor Pub/Sub-streaminglezen

In de volgende tabel worden de opties beschreven die worden ondersteund voor Pub/Sub. Alle opties worden geconfigureerd als onderdeel van een Structured Streaming-leesbewerking met behulp van .option("<optionName>", "<optionValue>") syntaxis.

Notitie

Bepaalde Pub/Sub-configuratieopties gebruiken het concept van ophalen in plaats van microbatches. Dit weerspiegelt de details van de interne implementatie en opties werken op dezelfde manier als bij andere Structured Streaming-connectors, behalve dat records worden opgehaald en vervolgens verwerkt.

Optie Standaardwaarde Beschrijving
numFetchPartitions Ingesteld op de helft van het aantal uitvoerders dat aanwezig is bij de initialisatie van de stream. Het aantal parallelle Spark-taken waarmee records uit een abonnement worden opgehaald.
deleteSubscriptionOnStreamStop false Als true wordt voldaan, wordt het abonnement dat aan de stream wordt gekoppeld verwijderd wanneer de streamingjob eindigt.
maxBytesPerTrigger none Een zachte limiet voor de batch-grootte die moet worden verwerkt tijdens elke getriggerde microbatch.
maxRecordsPerFetch 1000 Het aantal records dat per taak moet worden opgehaald voordat records worden verwerkt.
maxFetchPeriod 10s De tijdsduur voor elke taak die moet worden opgehaald voordat records worden verwerkt. Accepteert een duurtekenreeks, bijvoorbeeld 1s voor 1 seconde of 1m voor 1 minuut. Databricks raadt aan de standaardwaarde te gebruiken.

Incrementele batchverwerking gebruiken met Pub/Sub

U kunt Trigger.AvailableNow gebruiken om beschikbare records uit de Pub/Sub-bronnen als een incrementele batch te consumeren.

Azure Databricks registreert de tijdstempel wanneer u begint met lezen met de Trigger.AvailableNow instelling. Records die door de batch worden verwerkt, bevatten alle eerder opgehaalde gegevens en alle nieuw gepubliceerde records met een tijdstempel kleiner dan de vastgelegde begintijdstempel van de stream. Zie AvailableNowvoor meer informatie: Incrementele batchverwerking.

Pub/Sub-streamingmetrieken bewaken

Metrische gegevens over de voortgang van gestructureerde streaming rapporteren het aantal records dat is opgehaald en gereed voor verwerking, de grootte van de records die zijn opgehaald en gereed voor verwerking, en het aantal duplicaten dat is gezien sinds de stream wordt gestart. Hier volgt een voorbeeld van deze metrische gegevens:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Beperkingen

Pub/Sub biedt geen ondersteuning voor speculatieve uitvoering (spark.speculation).