Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Använd den inbyggda anslutningsappen för att prenumerera på Google Pub/Sub. Den här anslutningen tillhandahåller semantik för exakt-en-gång bearbetning av poster från prenumeranten.
Anteckning
Pub/Sub kan publicera dubblettposter, eller så kan poster komma till prenumeranten i fel ordning. Skriv kod för att hantera duplicerade och oordnade poster.
Konfigurera en Pub/Sub-ström
I följande kodexempel visas den grundläggande syntaxen för att konfigurera en strukturerad direktuppspelning från Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Fler konfigurationsalternativ finns i Konfigurera alternativ för pub-/underströmningsläsning.
Konfigurera åtkomst till Pub/Sub
De autentiseringsuppgifter som du konfigurerar måste ha följande roller.
| Roller | Obligatorisk eller valfri | Så här används rollen |
|---|---|---|
roles/pubsub.viewer eller roles/viewer |
Obligatoriskt | Kontrollerar om prenumerationen finns och hämtar prenumerationen. |
roles/pubsub.subscriber |
Obligatoriskt | Hämtar data från en prenumeration. |
roles/pubsub.editor eller roles/editor |
Valfritt | Möjliggör skapandet av en prenumeration om ingen sådan finns och möjliggör användning av deleteSubscriptionOnStreamStop för att radera prenumerationer vid strömavslutning. |
Databricks rekommenderar att du använder hemligheter när du tillhandahåller auktoriseringsalternativ. Följande alternativ krävs för att auktorisera en anslutning:
clientEmailclientIdprivateKeyprivateKeyId
Förstå Pub/Sub-schemat
Schemat för strömmen matchar de poster som hämtas från Pub/Sub, som beskrivs i följande tabell.
| Fält | Typ |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurera alternativ för Pub/Sub strömmande läsning
I följande tabell beskrivs de alternativ som stöds för Pub/Sub. Alla alternativ konfigureras som en del av en strukturerad strömningsläsning med hjälp av .option("<optionName>", "<optionValue>") syntax.
Anteckning
Vissa Pub/Sub-konfigurationsalternativ använder begreppet hämtningar i stället för mikropartier. Detta återspeglar interna implementeringsdetaljer och alternativen fungerar på samma sätt som korollar i andra anslutningar för strukturerad strömning, förutom att poster hämtas och sedan bearbetas.
| Alternativ | Standardvärde | beskrivning |
|---|---|---|
numFetchPartitions |
Ange till hälften av antalet utförare som finns vid initieringen av dataströmmen. | Antalet parallella Spark-jobb som hämtar dataposter från en prenumeration. |
deleteSubscriptionOnStreamStop |
false |
Om true är uppfyllt, tas prenumerationen som skickas till dataströmmen bort när strömningsjobbet slutar. |
maxBytesPerTrigger |
none |
En mjuk gräns för batchstorleken som ska bearbetas under varje utlöst mikrobatch. |
maxRecordsPerFetch |
1000 |
Antalet poster som ska hämtas per aktivitet innan poster bearbetas. |
maxFetchPeriod |
10s |
Tidsåtgången för varje uppgift att hämta innan bearbetning av poster. Accepterar en varaktighetssträng, till exempel 1s i 1 sekund eller 1m i 1 minut. Databricks rekommenderar att du använder standardvärdet. |
Använda inkrementell batchbearbetning med Pub/Sub
Du kan använda Trigger.AvailableNow för att hämta tillgängliga poster från Pub/Sub-källorna som en inkrementell batch.
Azure Databricks registrerar tidsstämpeln när du börjar läsa med inställningen Trigger.AvailableNow . Poster som bearbetas av batchen innehåller alla tidigare hämtade data och eventuella nyligen publicerade poster med en tidsstämpel som är mindre än den inspelade starttidsstämpeln för dataströmmen. Mer information finns i AvailableNow: Inkrementell batchbearbetning.
Övervaka Pub/Sub-strömningsmått
Förloppsmått för strukturerad direktuppspelning rapporterar antalet poster som hämtats och är redo att bearbetas, storleken på de poster som hämtats och är redo att bearbetas samt antalet dubbletter som setts sedan strömmen startade. Följande är ett exempel på dessa mått:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Begränsningar
Pub/Sub stöder inte spekulativ körning (spark.speculation).