Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Een streamingtabel is een Delta-tabel met extra ondersteuning voor streaming of incrementele gegevensverwerking. Een streamingtabel kan worden gericht op een of meer stromen in een pijplijn.
Streamingtabellen zijn een goede keuze voor gegevensopname om de volgende redenen:
- Elke invoerrij wordt slechts één keer verwerkt, waarmee het overgrote deel van de gegevensinvoertaken wordt gemodelleerd (dat wil zeggen door rijen aan een tabel toe te voegen of bij te werken).
- Ze kunnen grote hoeveelheden alleen toevoegbare gegevens verwerken.
Streamingtabellen zijn ook een goede keuze voor streamingtransformaties met lage latentie, omdat ze kunnen redeneren over rijen en tijdvensters, grote hoeveelheden gegevens kunnen verwerken en verwerking met lage latentie kunnen bieden.
In het volgende diagram ziet u hoe stromen uit streamingbronnen lezen en incrementeel naar een streamingtabel in een pijplijn schrijven.
Bij elke update lezen de stromen die zijn gekoppeld aan een streamingtabel de gewijzigde informatie in een streamingbron en voegen nieuwe gegevens toe aan die tabel.
Streamingtabellen zijn eigendom van en worden bijgewerkt door één datapijplijn. U definieert expliciet streamingtabellen in de broncode van de pijplijn. Tabellen die zijn gedefinieerd door een pijplijn, kunnen niet worden gewijzigd of bijgewerkt door een andere pijplijn. U kunt meerdere stromen definiëren die moeten worden toegevoegd aan één streamingtabel.
Azure Databricks maakt interne tabellen ter ondersteuning van verwerking van streamingtabellen. Deze tabellen worden weergegeven in system.information_schema.tables, maar zijn niet zichtbaar in Catalog Explorer of andere gebruikersinterface van de werkruimte.
Opmerking
Wanneer u een streamingtabel buiten een pijplijn maakt met behulp van Databricks SQL, maakt Azure Databricks een pijplijn die wordt gebruikt om de tabel bij te werken. U kunt de pijplijn zien door taken en pijplijnen te selecteren in de linkernavigatiebalk in uw werkruimte. U kunt de kolom Pijplijntype toevoegen aan uw weergave. Streamingtabellen die in een pijplijn zijn gedefinieerd, hebben een type ETL. Streamingtabellen die zijn gemaakt in Databricks SQL hebben een type MV/ST.
Zie Incrementeel gegevens laden en verwerken met Lakeflow Spark-declaratieve pijplijnen voor meer informatie over stromen.
Streamingtabellen voor gegevensinvoer
Streamingtabellen zijn ontworpen voor gegevensbronnen die alleen toevoegingen toelaten en verwerken invoer slechts één keer. Dit maakt ze geschikt voor opnameworkloads waarbij gegevens continu binnenkomen en betrouwbaar moeten worden vastgelegd zonder bestaande records opnieuw te verwerken. Azure Databricks ondersteunt het opnemen van gegevens uit cloudopslag en streamingberichtenbussen.
Bestanden opnemen vanuit cloudopslag
U kunt een streamingtabel gebruiken om nieuwe bestanden op te nemen vanuit cloudopslag. In deze voorbeelden wordt automatisch laadprogramma gebruikt om nieuwe bestanden stapsgewijs te verwerken wanneer ze binnenkomen.
Python
from pyspark import pipelines as dp
# Create a streaming table
@dp.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
Als u een streamingtabel wilt maken, moet de definitie van de gegevensset een stroomtype zijn. Wanneer u de spark.readStream functie in een definitie van een gegevensset gebruikt, wordt er een streaminggegevensset geretourneerd.
SQL
-- Create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
Voor streamingtabellen zijn streamingdatasets vereist. Het STREAM-trefwoord voor read_files dat de query de dataset als een stream behandelt.
Streamingberichten opnemen
U kunt ook streamingtabellen gebruiken om gegevens op te nemen uit berichtenbussen. In het volgende voorbeeld ziet u hoe u een streamingtabel maakt die leest van een Pub/Sub-onderwerp.
Python
@dp.table
def pubsub_raw():
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
return (
spark.readStream
.format("pubsub")
.option("subscriptionId", "my-subscription")
.option("topicId", "my-topic")
.option("projectId", "my-project")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'my-subscription',
projectId => 'my-project',
topicId => 'my-topic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Databricks raadt het gebruik van geheimen aan bij het leveren van autorisatieopties. Zie Toegang tot Pub/Sub configureren voor alle verificatieopties.
Zie Gegevens laden in pijplijnen voor meer informatie over het laden van gegevens in een streamingtabel.
Het volgende diagram illustreert hoe append-only streaming-tabellen werken.
Een rij die al aan een Streamingtabel is toegevoegd, wordt niet opnieuw opgevraagd bij latere updates van de pijplijn. Als u de query wijzigt (bijvoorbeeld van SELECT LOWER (name) naar SELECT UPPER (name)), worden bestaande rijen niet bijgewerkt naar hoofdletters, maar zullen nieuwe rijen in hoofdletters zijn. U kunt een volledige verversing activeren om alle eerdere gegevens uit de brontabel opnieuw op te halen en alle rijen in de streamingtabel bij te werken.
Streamingtabellen en streaming met lage latentie
Streamingtabellen zijn ontworpen voor streaming met lage latentie over gebonden toestand. Streamingtabellen maken gebruik van controlepuntbeheer, waardoor ze geschikt zijn voor streaming met lage latentie. Ze verwachten echter streams die van nature begrensd zijn of voorzien van een watermerk.
Een natuurlijk gebonden stroom wordt geproduceerd door een streaminggegevensbron met een goed gedefinieerd begin en einde. Een voorbeeld van een natuurlijk gebonden stroom is het lezen van gegevens uit een map met bestanden waarin geen nieuwe bestanden worden toegevoegd nadat een eerste batch bestanden is geplaatst. De stroom wordt beschouwd als gebonden omdat het aantal bestanden eindig is en de stroom eindigt nadat alle bestanden zijn verwerkt.
U kunt ook een watermerk gebruiken om een stroom te binden. Een watermerk in Structured Streaming is een mechanisme waarmee late gegevens kunnen worden verwerkt door op te geven hoe lang het systeem moet wachten op vertraagde gebeurtenissen voordat het tijdvenster als voltooid wordt overwogen. Een niet-gebonden stroom die geen watermerk heeft, kan ertoe leiden dat een pijplijn mislukt vanwege geheugendruk.
Zie Stateful verwerking optimaliseren met watermerken voor meer informatie over stateful streamverwerking.
Stream-snapshot-koppelingen
Stream-snapshotkoppelingen verbinden een streaminggegevensset met een dimensietabel die bij het begin van de stream wordt vastgelegd. Omdat de dimensietabel op dat moment als vast wordt behandeld, worden eventuele wijzigingen die eraan zijn aangebracht nadat de stream is gestart, niet doorgevoerd in de join. Dit is acceptabel wanneer kleine verschillen niet van belang zijn, bijvoorbeeld wanneer het aantal transacties vele ordes van grootte groter is dan het aantal klanten.
In het volgende codevoorbeeld wordt een dimensietabel met twee rijen, genaamd customers, samengevoegd met een steeds groter wordende gegevensset, transactions. Het voert een samenvoeging uit tussen deze twee datasets in een tabel met de naam sales_report. Als een extern proces de klantentabel bijwerkt door een nieuwe rij (customer_id=3, name=Zoya) toe te voegen, zal deze nieuwe rij niet aanwezig zijn in de join, omdat de statische dimensietabel was vastgelegd toen de streams werden gestart.
from pyspark import pipelines as dp
@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
return spark.read.table("customers")
@dp.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return facts.join(dims, on="customer_id", how="inner")
Beperkingen voor streaming-tabellen
Streamingtabellen hebben de volgende beperkingen:
-
Beperkte evolutie: U kunt de query wijzigen zonder de hele gegevensset opnieuw te compileren. Zonder een volledige vernieuwing ziet een streamingtabel elke rij slechts één keer, zodat verschillende query's verschillende rijen hebben verwerkt. Als u bijvoorbeeld
UPPER()toevoegt aan een veld in de query, worden alleen rijen die na de wijziging worden verwerkt, in hoofdletters weergegeven. Dit betekent dat u rekening moet houden met alle eerdere versies van de query die worden uitgevoerd op uw gegevensset. Voor het opnieuw verwerken van bestaande rijen die vóór de wijziging zijn verwerkt, is een volledige vernieuwing vereist. - Statusbeheer: Streamingtabellen hebben een lage latentie en vereisen streams die natuurlijk begrensd zijn of afgebakend zijn met een watermark. Zie Stateful verwerking optimaliseren met watermerken voor meer informatie.
- Joins worden niet herberekend: Joins in streamingtabellen worden niet herberekend wanneer dimensies veranderen. Dit kenmerk kan goed zijn voor "snel-maar-fout"-scenario's. Als u wilt dat uw weergave altijd juist is, kunt u een gerealiseerde weergave gebruiken. Gerealiseerde weergaven zijn altijd correct omdat ze joins automatisch opnieuw compileren wanneer dimensies veranderen. Voor meer informatie, zie gerealiseerde weergaven.