Prenumerera på Google Pub/Sub

Använd den inbyggda anslutningsappen för att prenumerera på Google Pub/Sub. Den här anslutningen tillhandahåller semantik för exakt-en-gång bearbetning av poster från prenumeranten.

Anteckning

Pub/Sub kan publicera dubblettposter, eller så kan poster komma till prenumeranten i fel ordning. Skriv kod för att hantera duplicerade och oordnade poster.

Konfigurera en Pub/Sub-ström

I följande kodexempel visas den grundläggande syntaxen för att konfigurera en strukturerad direktuppspelning från Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Fler konfigurationsalternativ finns i Konfigurera alternativ för pub-/underströmningsläsning.

Konfigurera åtkomst till Pub/Sub

De autentiseringsuppgifter som du konfigurerar måste ha följande roller.

Roller Obligatorisk eller valfri Så här används rollen
roles/pubsub.viewer eller roles/viewer Obligatoriskt Kontrollerar om prenumerationen finns och hämtar prenumerationen.
roles/pubsub.subscriber Obligatoriskt Hämtar data från en prenumeration.
roles/pubsub.editor eller roles/editor Valfritt Möjliggör skapandet av en prenumeration om ingen sådan finns och möjliggör användning av deleteSubscriptionOnStreamStop för att radera prenumerationer vid strömavslutning.

Databricks rekommenderar att du använder hemligheter när du tillhandahåller auktoriseringsalternativ. Följande alternativ krävs för att auktorisera en anslutning:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Förstå Pub/Sub-schemat

Schemat för strömmen matchar de poster som hämtas från Pub/Sub, som beskrivs i följande tabell.

Fält Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurera alternativ för Pub/Sub strömmande läsning

I följande tabell beskrivs de alternativ som stöds för Pub/Sub. Alla alternativ konfigureras som en del av en strukturerad strömningsläsning med hjälp av .option("<optionName>", "<optionValue>") syntax.

Anteckning

Vissa Pub/Sub-konfigurationsalternativ använder begreppet hämtningar i stället för mikropartier. Detta återspeglar interna implementeringsdetaljer och alternativen fungerar på samma sätt som korollar i andra anslutningar för strukturerad strömning, förutom att poster hämtas och sedan bearbetas.

Alternativ Standardvärde beskrivning
numFetchPartitions Ange till hälften av antalet utförare som finns vid initieringen av dataströmmen. Antalet parallella Spark-jobb som hämtar dataposter från en prenumeration.
deleteSubscriptionOnStreamStop false Om true är uppfyllt, tas prenumerationen som skickas till dataströmmen bort när strömningsjobbet slutar.
maxBytesPerTrigger none En mjuk gräns för batchstorleken som ska bearbetas under varje utlöst mikrobatch.
maxRecordsPerFetch 1000 Antalet poster som ska hämtas per aktivitet innan poster bearbetas.
maxFetchPeriod 10s Tidsåtgången för varje uppgift att hämta innan bearbetning av poster. Accepterar en varaktighetssträng, till exempel 1s i 1 sekund eller 1m i 1 minut. Databricks rekommenderar att du använder standardvärdet.

Använda inkrementell batchbearbetning med Pub/Sub

Du kan använda Trigger.AvailableNow för att hämta tillgängliga poster från Pub/Sub-källorna som en inkrementell batch.

Azure Databricks registrerar tidsstämpeln när du börjar läsa med inställningen Trigger.AvailableNow . Poster som bearbetas av batchen innehåller alla tidigare hämtade data och eventuella nyligen publicerade poster med en tidsstämpel som är mindre än den inspelade starttidsstämpeln för dataströmmen. Mer information finns i AvailableNow: Inkrementell batchbearbetning.

Övervaka Pub/Sub-strömningsmått

Förloppsmått för strukturerad direktuppspelning rapporterar antalet poster som hämtats och är redo att bearbetas, storleken på de poster som hämtats och är redo att bearbetas samt antalet dubbletter som setts sedan strömmen startade. Följande är ett exempel på dessa mått:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Begränsningar

Pub/Sub stöder inte spekulativ körning (spark.speculation).