Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Verwenden Sie den integrierten Connector, um Google Pub/Sub zu abonnieren. Dieser Connector stellt genau einmal die Verarbeitungssemantik für Datensätze vom Abonnenten bereit.
Hinweis
Pub/Sub veröffentlicht möglicherweise doppelte Datensätze, oder Datensätze kommen möglicherweise außerhalb der Reihenfolge an den Abonnenten. Schreiben Sie Code zum Behandeln von duplizierten und nicht sortierten Datensätzen.
Konfigurieren eines Pub/Sub-Streams
Im folgenden Codebeispiel wird die grundlegende Syntax zum Konfigurieren eines strukturierten Streaming-Lesevorgangs aus Pub/Sub veranschaulicht.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Weitere Konfigurationsoptionen finden Sie unter Konfigurationsoptionen für Pub/Sub-Streaming-Lesen.
Konfigurieren des Zugriffs auf Pub/Sub
Die von Ihnen konfigurierten Anmeldeinformationen müssen über die folgenden Rollen verfügen.
| Rollen | Erforderlich oder optional | Wie die Rolle verwendet wird |
|---|---|---|
roles/pubsub.viewer oder roles/viewer |
Erforderlich | Überprüft, ob ein Abonnement vorhanden ist und ein Abonnement erhält. |
roles/pubsub.subscriber |
Erforderlich | Ruft Daten aus einem Abonnement ab. |
roles/pubsub.editor oder roles/editor |
Wahlfrei | Ermöglicht die Erstellung eines Abonnements, wenn keines vorhanden ist, und ermöglicht die Verwendung der deleteSubscriptionOnStreamStop zum Löschen von Abonnements bei der Stream-Beendigung. |
Databricks empfiehlt die Verwendung von Geheimnissen beim Bereitstellen von Autorisierungsoptionen. Die folgenden Optionen sind erforderlich, um eine Verbindung zu autorisieren:
clientEmailclientIdprivateKeyprivateKeyId
Grundlegendes zum Pub/Sub-Schema
Das Schema für den Datenstrom entspricht den Datensätzen, die aus Pub/Sub abgerufen werden, wie in der folgenden Tabelle beschrieben.
| Feld | Typ |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurieren von Optionen für den Pub/Sub-Streaming-Lesevorgang
In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pub/Sub unterstützt werden. Alle Optionen werden als Teil eines strukturierten Streaming-Lesevorgangs mit .option("<optionName>", "<optionValue>")-Syntax konfiguriert.
Hinweis
Einige Pub/Sub-Konfigurationsoptionen verwenden das Konzept der Abrufe anstelle von Mikrobatches. Dies spiegelt interne Implementierungsdetails wider, und die Optionen funktionieren ähnlich wie die entsprechenden Optionen in anderen strukturierten Streaming-Connectors, mit der Ausnahme, dass die Datensätze abgerufen und dann verarbeitet werden.
| Auswahlmöglichkeit | Standardwert | Beschreibung |
|---|---|---|
numFetchPartitions |
Wird auf die Hälfte der Anzahl der bei der Stream-Initialisierung vorhandenen Executors gesetzt. | Die Anzahl der parallelen Spark-Aufgaben, die Datensätze aus einem Abonnement abrufen. |
deleteSubscriptionOnStreamStop |
false |
Wenn true, wird das an den Datenstrom übergebene Abonnement beim Beenden des Streamingauftrags gelöscht. |
maxBytesPerTrigger |
none |
Eine weiche Grenze für die Batchgröße, die bei jedem ausgelösten Mikrobatch verarbeitet wird. |
maxRecordsPerFetch |
1000 |
Die Anzahl der Datensätze, die pro Aufgabe abgerufen werden sollen, bevor Datensätze verarbeitet werden. |
maxFetchPeriod |
10s |
Die Zeitdauer für jeden Vorgang, der vor der Verarbeitung von Datensätzen abgerufen werden soll. Akzeptiert eine Dauerzeichenfolge, 1s z. B. 1 Sekunde oder 1m 1 Minute. Databricks empfiehlt die Verwendung des Standardwerts. |
Verwenden der inkrementellen Batchverarbeitung mit Pub/Sub
Sie können Trigger.AvailableNow verwenden, um verfügbare Datensätze aus Pub/Sub-Quellen als inkrementellen Batch zu konsumieren.
Azure Databricks zeichnet den Zeitstempel auf, wenn Sie einen Lesevorgang mit der Trigger.AvailableNow Einstellung beginnen. Datensätze, die vom Batch verarbeitet werden, umfassen alle zuvor abgerufenen Daten und alle neu veröffentlichten Datensätze mit einem Zeitstempel, der kleiner als der aufgezeichnete Datenstrom-Startzeitstempel ist. Weitere Informationen finden Sie unter AvailableNow: Inkrementelle Batchverarbeitung.
Überwachen von Pub/Sub Streaming-Metriken
Metriken für strukturiertes Streaming melden die Anzahl der abgerufenen und verarbeiteten Datensätze, die Größe der abgerufenen und verarbeiteten Datensätze sowie die Anzahl der Duplikate, die seit dem Start des Datenstroms angezeigt werden. Im Folgenden sehen Sie ein Beispiel für diese Metriken:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Begrenzungen
Pub/Sub unterstützt keine spekulative Ausführung (spark.speculation).