Abonnieren Sie Google Pub/Sub

Verwenden Sie den integrierten Connector, um Google Pub/Sub zu abonnieren. Dieser Connector stellt genau einmal die Verarbeitungssemantik für Datensätze vom Abonnenten bereit.

Hinweis

Pub/Sub veröffentlicht möglicherweise doppelte Datensätze, oder Datensätze kommen möglicherweise außerhalb der Reihenfolge an den Abonnenten. Schreiben Sie Code zum Behandeln von duplizierten und nicht sortierten Datensätzen.

Konfigurieren eines Pub/Sub-Streams

Im folgenden Codebeispiel wird die grundlegende Syntax zum Konfigurieren eines strukturierten Streaming-Lesevorgangs aus Pub/Sub veranschaulicht.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Weitere Konfigurationsoptionen finden Sie unter Konfigurationsoptionen für Pub/Sub-Streaming-Lesen.

Konfigurieren des Zugriffs auf Pub/Sub

Die von Ihnen konfigurierten Anmeldeinformationen müssen über die folgenden Rollen verfügen.

Rollen Erforderlich oder optional Wie die Rolle verwendet wird
roles/pubsub.viewer oder roles/viewer Erforderlich Überprüft, ob ein Abonnement vorhanden ist und ein Abonnement erhält.
roles/pubsub.subscriber Erforderlich Ruft Daten aus einem Abonnement ab.
roles/pubsub.editor oder roles/editor Wahlfrei Ermöglicht die Erstellung eines Abonnements, wenn keines vorhanden ist, und ermöglicht die Verwendung der deleteSubscriptionOnStreamStop zum Löschen von Abonnements bei der Stream-Beendigung.

Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Die folgenden Optionen sind erforderlich, um eine Verbindung zu autorisieren:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Grundlegendes zum Pub/Sub-Schema

Das Schema für den Datenstrom entspricht den Datensätzen, die aus Pub/Sub abgerufen werden, wie in der folgenden Tabelle beschrieben.

Feld Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurieren von Optionen für den Pub/Sub-Streaming-Lesevorgang

In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pub/Sub unterstützt werden. Alle Optionen werden als Teil eines strukturierten Streaming-Lesevorgangs mit .option("<optionName>", "<optionValue>")-Syntax konfiguriert.

Hinweis

Einige Pub/Sub-Konfigurationsoptionen verwenden das Konzept der Abrufe anstelle von Mikrobatches. Dies spiegelt interne Implementierungsdetails wider, und die Optionen funktionieren ähnlich wie die entsprechenden Optionen in anderen strukturierten Streaming-Connectors, mit der Ausnahme, dass die Datensätze abgerufen und dann verarbeitet werden.

Auswahlmöglichkeit Standardwert Beschreibung
numFetchPartitions Wird auf die Hälfte der Anzahl der bei der Stream-Initialisierung vorhandenen Executors gesetzt. Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen.
deleteSubscriptionOnStreamStop false Wenn true, wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht.
maxBytesPerTrigger none Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird.
maxRecordsPerFetch 1000 Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden.
maxFetchPeriod 10s Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Akzeptiert eine Dauerzeichenfolge, 1s z. B. 1 Sekunde oder 1m 1 Minute. Databricks empfiehlt die Verwendung des Standardwerts.

Verwenden der inkrementellen Batchverarbeitung mit Pub/Sub

Sie können Trigger.AvailableNow verwenden, um verfügbare Datensätze aus Pub/Sub-Quellen als inkrementellen Batch zu konsumieren.

Azure Databricks zeichnet den Zeitstempel auf, wenn Sie einen Lesevorgang mit der Trigger.AvailableNow Einstellung beginnen. Datensätze, die vom Batch verarbeitet werden, umfassen alle zuvor abgerufenen Daten und alle neu veröffentlichten Datensätze mit einem Zeitstempel, der kleiner als der aufgezeichnete Datenstrom-Startzeitstempel ist. Weitere Informationen finden Sie unter AvailableNow: Inkrementelle Batchverarbeitung.

Überwachen von Pub/Sub Streaming-Metriken

Metriken für strukturiertes Streaming melden die Anzahl der abgerufenen und verarbeiteten Datensätze, die Größe der abgerufenen und verarbeiteten Datensätze sowie die Anzahl der Duplikate, die seit dem Start des Datenstroms angezeigt werden. Im Folgenden sehen Sie ein Beispiel für diese Metriken:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Begrenzungen

Pub/Sub unterstützt keine spekulative Ausführung (spark.speculation).